Contact Us 1-800-596-4880

Database Connector Examples - Mule 4

Anypoint Connector for Database (Database Connector) examples help you configure database connections, query a database, execute stored procedures, execute DDL statements, execute scripts, execute database transactions, use bulk operations, and define custom data types:

  • Configure a Database Connection
    Connect to both popular and generic database. Additionally, configure the connector to connect to a global data source, set a JDBC driver, configure connection pooling, and connection to an Oracle database with TNS.

  • Query a Database
    Configure the Select operation to query data from a database.

  • Insert, Update, and Delete Data
    Configure the Insert, Update and Delete operations to manage data.

  • Execute Stored Procedures
    Configure the Stored Procedure operation to combine input, output, and input-output parameters. Additionally, configure dates on stored procedures.

  • Call an Oracle Stored Procedure That Uses UDTs
    Configure the Stored procedure operation to call an Oracle database stored procedure that uses UDTs for its input and output parameters. Additionally, use createStruct and createArray DataWeave functions to map application data to your custom types.

  • Execute DDL Statements
    Configure the Execute DDL operation to execute DDL statements that create or modify data within tables or other data structures.

  • Execute Bulk Operations
    Configure the Bulk insert, Bulk update, and Bulk delete operations for use when each input parameter can take only one value.

  • Execute Script
    Configure the Execute script operation to execute a script as a single statement.

  • Define Custom Data Types
    Define custom data types to use when connected to a particular connection provider.

  • Execute Database Transactions
    Execute database operations in the context of a transaction.

Query a Database

When you configure a database operation using Database Connector, there are several ways to add variable values to the SQL statement you execute in the database.

Use Input Parameters to Protect Database Queries

Use the select operation to retrieve information from the RDBMS. The primary goal of this operation is to supply a SQL query and use DataWeave to supply the parameters:

<flow name="selectParameterizedQuery">
  <db:select config-ref="dbConfig">
    <db:sql>SELECT * FROM PLANET WHERE name = :name</db:sql>
    <db:input-parameters>
      #[{'name' : payload}]
    </db:input-parameters>
  </db:select>
</flow>

In this example, you supply input parameters as key-value pairs, which you create by embedding a DataWeave script. The keys are used with the colon character (:) to reference a parameter value by name. This is the recommended approach for using parameters in your query.

The advantages of using input parameters to configure the WHERE clause in a SELECT statement are that it makes the query immune to SQL injection attacks and enables optimizations that are not possible otherwise, which improves the app’s overall performance.

Since version 1.4.0 (escaping colons): If you need to use the colon character (:) in your SQL query, you can escape it by putting a backslash before it. This is useful when using PostgreSQL type casting, which requires two colons before the type you are casting to, for example:

<db:sql>SELECT price\:\:float8 FROM PRODUCT</db:sql>

Since version 1.8.0 (SQL Casting for PostgreSQL and Snowflake): Database Connector now accepts SQL Casting PostgreSQL and Snowflake syntax’s, through the double colon (::) expression without the need to escape each colon (this feature doesn’t affect the escaping colons behavior). For example:

<db:sql>SELECT MAX(modified_date)::DATE FROM sales</db:sql>

For security reasons, do not directly write <db:sql>SELECT * FROM PLANET WHERE name = #[payload] </db:sql>.

DataSense is available for the operation’s input and output. The connector analyzes the query and automatically calculates the structure of the query’s output by analyzing the projection section of the SQL statement. At the same time, by comparing the conditions in the WHERE clause to the table structure, it generates DataSense input to help you build the DataWeave script that generates the input parameters.

Dynamic Queries

Sometimes, you need to parameterize not only the WHERE clause, but also parts of the query itself. Examples of use cases for this are queries that need to access online versus historic tables that depend on a condition, or complex queries for which the project table columns need to vary.

In the following example, you can see how a full expression is used to produce the query by building a string in which the table depends on a variable $(vars.table). An important thing to notice is that although some of the query text is dynamic ("SELECT * FROM $(vars.table)), the WHERE clause is still using the best practice of defining the WHERE condition using input parameters, in this case, WHERE name = :name:

<set-variable variableName="table" value="PLANET"/>
<db:select config-ref="dbConfig">
    <db:sql>#["SELECT * FROM $(vars.table) WHERE name = :name"]</db:sql>
    <db:input-parameters>
        #[{'name' : payload}]
    </db:input-parameters>
</db:select>

Dynamic queries are necessary in the above example and cannot be treated like Input parameters; Input parameters can be applied only to parameters in a WHERE clause. To modify any other part of the query, you should use DataWeave’s interpolation operator.

In Mule 3, the concept of select was split in parameterized and dynamic queries, and you couldn’t use both at the same time. You had to choose between having a dynamic query or having the advantages of using parameters (SQL Injection protection, PreparedStatement optimization, and so on). Furthermore, the syntax to do one or the other was different, so you had to learn two different ways of doing the same thing. But with Database Connector in Mule 4, you can now use both methods at the same time by using expressions in the query.

Stream Large Results

Use streaming with queries that return many records, such as in integration use cases. In Mule 4, streaming is transparent and always enabled.

For example, if you submit a query that returns 10K rows, attempting to fetch all those rows at once results in both performance degradation, due to the big pull from the network, and the risk of running out of memory, since all the information must be loaded into RAM.

With streaming, the connector fetches and processes only part of the query at one time, which reduces load on the network and memory. This means that the connector does not fetch the 10K rows at once; instead, it fetches a smaller chunk, and once that chunk is consumed, it fetches the rest.

You can also use the new repeatable streams mechanism, which means you can have DataWeave and other components process the same stream many times, even in parallel. For more information on repeatable streams, see Streaming in Mule 4.0.

Limit Results

Mule allows the connector to handle streaming gracefully. However, that does not mean that it’s a good idea to move large chunks of data from the database to Mule. Even with streaming, a typical SQL query can return many rows, each one containing a lot of information.

The select operation provides parameters (fetchSize and maxRows) to help with this:

<db:select fetchSize="200" maxRows="1000" config-ref="dbConfig">
  <db:sql>select * from some_table</db:sql>
</db:select>

This syntax instructs the connector to fetch no more than 1000 rows (the maxRows value), no more than 200 rows at a time (the fetchSize value), significantly reducing network and memory load. The fetchSize value is enforced differently by different JDBC driver providers and often defaults to 10.

The combination limits the total amount of information that is retrieved (the maxRows value) and guarantees that the data is returned from the database over the network in smaller chunks (the fetchSize value).

Query Timeout

Sometimes database queries take a long time to execute. The following factors often cause delays in query execution:

  • An inefficient query, such as one having improper indexing that iterates over many rows

  • A busy RDBMS or network

  • A lock contention

Generally, it’s recommended to set a timeout on the query. To manage timeouts, configure queryTimeout and queryTimeoutUnit. The following example shows how to set a timeout for the Select operation, but all operations support setting a timeout:

<db:select queryTimeout="0" queryTimeoutUnit="SECONDS" config-ref="dbConfig">
   <db:sql>select * from some_table</db:sql>
</db:select>

Insert, Update, and Delete Data

Database Connector supports insert, update, and delete operations. Similar to the select operation, these operations support dynamic queries and parameterization by using embedded DataWeave transformations while also supporting fetchSize, maxRows, and timeout parameters. Although examples of input parameterization, dynamic queries, and parameter usage are provided, you can consult the SELECT operation documentation for more information.

Insert

Suppose there is a database schema named Products that has a table named electronic. The electronic table contains columns named id, name, description, price, and discount.

The following SQL statement creates the table:

CREATE TABLE electronic(
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(100),
    description VARCHAR(255),
    price SMALLINT,
    discount TINYINT
)

Given that the table is already created and there is a connection configuration named dbConfig, the following operation inserts a new record in the electronic table:

<db:insert config-ref="dbConfig">
  <db:sql>
    INSERT INTO electronic(name, description, price, discount)
    VALUES ('Coffee Machine', 'Model: XYZ99. Uses small size capsules.', 120, 5)
  </db:sql>
</db:insert>

The id parameter is not passed in the query because it is an autoincrement key that is generated automatically by the database.

To benefit from the advantages of input parameters, structure the equivalent query as follows:

<db:insert config-ref="dbConfig">
    <db:sql>
        INSERT INTO electronic(name, description, price, discount)
        VALUES (:name, :description, :price, :discount)
    </db:sql>
    <db:input-parameters>#[{
        name: 'Coffee Machine',
        description: 'Model:XYZ99. Uses small size capsules.',
        price: 120,
        discount: 5}]
    </db:input-parameters>
</db:insert>

The insert operation outputs a Statement Result object that contains two fields: an affectedRows integer that indicates how many rows were affected by the query, and a generatedKeys map that contains the autogenerated keys. In this example, the output is as follows:

{
  "affectedRows": 1,
  "generatedKeys": {

  }
}

The generatedKeys map is empty, although a row is successfully added to the table. The map is empty because this information is disabled by default to avoid overhead. To activate the information and generate keys, add the autoGenerateKeys parameter, and then set it to true:

<db:insert config-ref="dbConfig" autoGenerateKeys="true">
   <db:sql>
       INSERT INTO electronic(name, description, price, discount)
       VALUES (:name, :description, :price, :discount)
   </db:sql>
   <db:input-parameters>#[{
       name: 'Coffee Machine',
       description: 'Model:XYZ99. Uses small size capsules.',
       price: 120,
       discount: 5}]
   </db:input-parameters>
</db:insert>

If the statement generates multiple keys per affected row, you can specify which columns should be returned by setting the parameter autoGeneratedKeysColumnNames to a list that contains the desired column names. For example, to ensure that only the id column is returned, set the parameter to a DataWeave list with a single string element:

<db:insert config-ref="dbConfig" autoGenerateKeys="true" autoGeneratedKeysColumnNames="#[['id']]">
    ...
</db:insert>

Update

Given the table that is defined in the insert example, the operation to update the electronic table to set the discount to 10 percent for all items that have a price value above 100 is:

<db:update config-ref="dbConfig">
    <db:sql><![CDATA[#["UPDATE electronic SET discount = :discount WHERE price > :price"]]]></db:sql>
	<db:input-parameters>#[{
        discount: 10,
        price: 100
    }]</db:input-parameters>
</db:update>

The <![CDATA[…​]]> wrapper allows you to use special characters, such as > or ", in the query. Otherwise, you must use XML-escaped versions of those characters, such as &gt; and &quot;.

When you use the Anypoint Studio visual user interface for connectors instead of the XML code view, you can type special characters directly into the SQL Query Text box, and Anypoint Studio automatically changes the character to its escaped version in the XML view.

Delete

Given the table defined in the example for insert, the following operation deletes the record with id: 1 from the table:

<db:delete config-ref="dbConfig">
    <db:sql>DELETE FROM electronic WHERE id = :id</db:sql>
    <db:input-parameters>#[{
        id: 1
    }]</db:input-parameters>
</db:delete>

Execute Stored Procedures

Invoke stored procedures that combine input, output`, and input-output parameters.

For example:

<!-- Invoke a procedure with input parameters -->
<db:stored-procedure config-ref="dbConfig">
    <db:sql>{ call updatePlanetDescription('Venus', :description) }</db:sql>
    <db:input-parameters>
        #[{'description' : payload}]
    </db:input-parameters>
</db:stored-procedure>

<!-- Invoke a procedure with input-output parameters -->
<db:stored-procedure config-ref="dbConfig">
  <db:sql>{ call doubleMyInt(:myInt) }</db:sql>
  <db:in-out-parameters>
      <db:in-out-parameter key="myInt" value="3"/>
  </db:in-out-parameters>
</db:stored-procedure>

<!-- Invoke a procedure with both input AND output parameters -->
<db:stored-procedure config-ref="dbConfig">
    <db:sql>{ call multiplyInts(:int1, :int2, :result1, :int3, :result2) }</db:sql>
    <db:input-parameters>
        #[{
            'int1' : 3,
            'int2' : 4,
            'int3' : 5
        }]
    </db:input-parameters>
    <db:output-parameters>
        <db:output-parameter key="result1" type="INTEGER"/>
        <db:output-parameter key="result2" type="INTEGER"/>
        <db:output-parameter key="myInt" type="INTEGER"/>
    </db:output-parameters>
</db:stored-procedure>

Many combinations are possible.

After execution, the resulting values of the output and input-output parameters are available in the operation’s result. Leverage DataSense to help you get them.

For Database Connector version 1.4.0 or later, make sure that no whitespace is between the name of the stored procedure and the first parentheses.

Database Connector supports the use of callable statements with the format { call procedureName(:param1, :param2, …​, :paramN) } where parameters matches positionally, that is to say, :paramN matches the Nth parameter in the stored procedure declaration.

Suppose you are using an Oracle database which you have initialized using with the following table and stored procedure:

    CREATE TABLE SYSTEM.employees(
        employee_id INTEGER GENERATED BY DEFAULT AS IDENTITY,
        employee_name VARCHAR2(100),
        employee_age INTEGER,
        employee_birthday TIMESTAMP,
        PRIMARY KEY(employee_id)
    );


    CREATE PROCEDURE createEmployee(e_name VARCHAR2, e_age NUMBER, e_birth_date DATE) AS
    BEGIN
        INSERT INTO SYSTEM.employees(employee_name, employee_age, employee_birthday) VALUES(e_name, e_age, e_birth_date);
    END;

The connector does not support named parameters in callable statements, so attempting to match parameters by name ( { call createEmployee(employee_age ⇒ :age, e_birth_date ⇒ :date, e_name ⇒ :name) } ) does not work. Parameters must be provided in the appropriate order: { call createEmployee(e_name ⇒ :name, employee_age ⇒ :age, e_birth_date ⇒ :date) }.

Use Dates on Stored Procedures

Database Connector does not support the use of engine-specific embedded functions. For example, if you want to change a date to a specific format before calling a procedure in an Oracle database, the following approach does not work: { call createEmployee(e_name ⇒ :name, employee_age ⇒ :age, e_birth_date ⇒ TO_DATE(:date, 'YYYY-MM-DD HH:mm:ss')) }.
Use DataWeave for all data transformations before the invocation to the callable statement. Using the previous example, for instance, perform the following transformation:

<db:stored-procedure doc:name="Create Employee" config-ref="Database_Config">
    <db:sql>{ call createEmployee(:name, :age, :date) }</db:sql>
		<db:input-parameters>
		 	<![CDATA[#[%dw 2.0
				output application/json
				fun format(d: DateTime) = d as String { format: "yyyy-MM-dd HH:mm:ss" }
				---
				{ 'date': format(|2019-10-31T13:00:00.000Z|), 'name': 'rick', 'age': 60 }
			]]]>
		</db:input-parameters>
	</db:stored-procedure>

Execute DDL Statements

Data Definition Language (DDL) statements, are special types of SQL statements that do not directly access or modify data. Instead, they create, modify, or destroy data structures, stored procedures, and so on. You can also use DDL statements to create or modify data within tables or other data structures.

Create Table Example

Suppose you have a database schema named Products, and you also have a connection named dbConfig that is configured to access Products. The following operation, named Execute DDL, creates a new table named electronic that has columns named id, name, description, price, and discount:

<db:execute-ddl config-ref="dbConfig">
    <db:sql>
        CREATE TABLE electronic(
            id INT AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(100),
            description VARCHAR(255),
            price SMALLINT,
            discount TINYINT
        )
    </db:sql>
</db:execute-ddl>

The Execute DDL operation creates a table and returns 0 unless there is an error.

Execute Bulk Operations

The insert, update, and delete operations can be used for the cases in which each input parameter can take only one value. Alternatively, bulk operations allow you to run a single query using a set of parameters values.

You can avoid unnecessary steps by doing a bulk operation so that:

  • The query is parsed only once.

  • Only one database connection is required since a single statement is executed.

  • Network overhead is minimized.

  • RDBMS can execute the bulk operation atomically.

For these use cases, the connector offers three operations: <db:bulk-insert>, <db:bulk-update> and <db:bulk-delete>.

These operations are similar to their single counterparts, except that instead of receiving input parameters as key-value pairs, the operations expect them as a list of key-value pairs.

For example:

<db:bulk-insert config-ref="dbConfig" >
  <db:bulk-input-parameters>
    #[[{'id': 2, 'name': 'George', 'lastName': 'Costanza'}, {'id': 3, 'name': 'Cosmo', 'lastName': 'Kramer'}]]
  </db:bulk-input-parameters>
  <db:sql>
    insert into customers (id, name, lastName) values (:id, :name, :lastName)
  </db:sql>
</db:bulk-insert>

If you don’t use bulk operations, when performing a delete operation, many rows could match the criteria and get deleted if only one criteria (POSITION = X) is provided. The same concept applies for update. If you use UPDATE PRODUCTS set PRICE = PRICE * 0.9 where PRICE > :price, you may want to apply a 10% discount on many products, but the price input parameter accepts only one value.

If you want to apply different discount rates on products that have different prices, you can execute many operations.

The following example is a payload that is a list of objects of the following structure { price : number, discountRate: number}:

<foreach>
  <db:update config-ref="dbConfig">
    <db:input-parameters>
     #[
      {
        'discountRate' : payload.discountRate,
        'price' : payload.price,
      }
    ]
    </db:input-parameters>
    <db:sql>
      UPDATE PRODUCTS set PRICE = PRICE * :discountRate where PRICE > :price
    </db:sql>
  </db:update>
</foreach>

The previous operation accomplishes the task but is inefficient. For each element in the list, one query must be executed for each element of the operation:

  • The query is parsed.

  • Parameters are resolved.

  • A connection to the database is acquired (either by getting one from the pool or establishing a new one).

  • All the network overhead is paid.

  • The RDBMS processes the query and applies changes.

  • The connection is released.

If an error arises while executing one of the operations (for example, if bulk insert fails to insert 1 out of 100 rows), a single exception is thrown.

It may happen that while some statements in the bulk operation can be successfully executed, some may result in an error. When this occurs, it will be up to the driver to either:

  1. Stop execution immediately and ignore all remaining operations, or

  2. Continue to execute the remaining statements.

In both cases, whenever an error occurs you can examine your application logs to see which caused the failure. When this occurs a single exception is thrown describing what went wrong.

Database Connector Data Types Reference

The following examples include information about database input parameters and defining custom data types you can use when connected to a particular connection provider.

Force Parameter Types

Under uncommon circumstances, you might need to force the underlying JDBC driver to coerce an input parameter to a specific type. For example, if your JDBC driver cannot determine the type of the input, or a parameter is of a custom type, you might need to apply force by specifying the type of each input parameter:

<db:bulk-insert queryTimeout="0" queryTimeoutUnit="SECONDS">
     <db:sql>INSERT INTO PLANET(POSITION, NAME) VALUES (:position, :name)</db:sql>
     <db:parameter-types>
         <db:parameter-type key="name" type="VARCHAR" />
         <db:parameter-type key="position" type="INTEGER" />
     </db:parameter-types>
 </db:bulk-insert>

Define Custom Data Types

There are exclusive and common parameters for each connection provider, such as Derby and Oracle. A child element of the connection provider element defines custom data types you can use when connected to a particular provider. For example:

<db:config name="dbConfig">
   <db:derby-connection url="jdbc:derby:muleEmbeddedDB;create=true">
       <db:column-types>
           <!-- Derby uses JAVA_OBJECT for UDT-->
           <db:column-type typeName="CONTACT_DETAILS" id="2000"/>
       </db:column-types>
   </db:derby-connection>
</db:config>

Database Transactions

You can execute database operations in the context of a transaction. Each operation has a transactionalAction value that specifies the type of joining action that operations can take regarding the active transaction if there is one. For example, the select operation has the following possible actions:

  • ALWAYS_JOIN
    Expects a transaction to be in progress when a message is received. If there is no transaction, an error is raised.

  • JOIN_IF_POSSIBLE
    Joins the current transaction if one is available. Otherwise, no transaction occurs.

  • NOT_SUPPORTED
    Executes outside any existing transaction.

Group Operations

Sometimes you need to execute several queries atomically in the context of the same transaction. For example, during a bank account transfer, you need to subtract money from one account and add it in another, but if any of the two operations fail, roll back both:

<db:update config-ref=”db”>
 <db:sql>UPDATE ACCOUNT set BALANCE = BALANCE - :money where ID = :source</db:sql>
 <db:input-parameters>#[{‘money’ : payload.money, ‘source’: payload.source}]</db:input-parameters>
</db:update>

<db:update config-ref=”db”>
 <db:sql>UPDATE ACCOUNT set BALANCE = BALANCE + :money where ID = :target</db:sql>
 <db:input-parameters>#[{‘money’ : payload.money, ‘target’’: payload.target}]</db:input-parameters>
</db:update>

If these queries are executed in the context of an already existing transaction, the queries belong to the same transaction. If there’s no active transaction, you can start one by using the <try> scope:

<try transactionalAction="ALWAYS_BEGIN">
 <db:update config-ref="db">
   <db:sql>UPDATE ACCOUNT set BALANCE = BALANCE - :money where ID = :source</db:sql>
   <db:input-parameters>#[{'money' : payload.money, 'source': payload.source}]</db:input-parameters>
 </db:update>

 <db:update config-ref="db">
   <db:sql>UPDATE ACCOUNT set BALANCE = BALANCE + :money where ID = :target</db:sql>
   <db:input-parameters>#[{'money' : payload.money, 'target'': payload.target}]</db:input-parameters>
 </db:update>
</try>

Execute Script Reference

This operation executes a script of random length as a single statement. Execute script differs from other operations in the following ways:

  • The script can contain multiple statements.

  • Statements can be of different types.

  • No input or output parameters are accepted.

The execute-script operation runs any script that does not involve a SQL projection. You can use <db:execute-script> in the following ways:

  • Embed execute-script in an operation.

  • Reference execute-script from a file.

You cannot use both ways of executing a script simultaneously.

Executing a script returns an array of integer numbers, one element per each executed statement. Each number represents the number of objects affected by the statement.

Embed in an Operation

<db:execute-script config-ref="dbConfig">
   <db:sql>
       update PLANET set NAME='Mercury' where POSITION=0;
       update PLANET set NAME='Mercury' where POSITION=4
   </db:sql>
</db:execute-script>

Reference From a File

<flow name="executeScriptFromFile">
   <db:execute-script config-ref="dbConfig" file="integration/executescript/bulk-script.sql" />
</flow>

The execute script operation frequently creates schemas and tables, inserts data, and performs data rotation. Data rotation is performed by nightly jobs that move and archive data into historic tables and purge on-line tables.

Execute script and bulk operations are intended for different uses. For example, although you can build a script that inserts many rows into the database, you could not conveniently provide dynamic parameters to the script. Also, you need to ensure that the script protects against a SQL injection attack.

Executing a select statement in a script returns no data and poses a data input problem.

View on GitHub