Nav

Building a Batch Enabled Connector

One of Mule’s abilities is to process messages in batches. This functionality, which is provided by the <batch:commit> block, is particularly useful when you need to collect a subset of records for bulk upsert to an external source or service. However, unless the connector in question supports bulk operations, it cannot be used inside a batch block.

Batch benefits include:

  • Parallel processing of records

  • Handling of large quantities of incoming data from an API into a legacy system

  • Synchronizing data sets between business applications

Some of the already existing connectors, such as Salesforce and NetSuite, have been supporting batch operations for years. However, with the introduction of the new batch module, developers no longer need to parse the response of the connector to determine which of the records failed. Instead, this is handled automatically by <batch:commit>.

This document explains how you can leverage your own connector to take advantage of this feature introduced in the Batch Processing documentation.

Prerequisites

This document assumes you are familiar with Batch Processing, that you have created a connector project, and have defined the operations to use.

How It Works

The implementation of this feature for new connectors is pretty straightforward. It consists of applying slight modifications to the connector such that all bulk operations return a BulkOperationResult. BulkOperationResult allows the Batch module to understand all types of bulk operations results.


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
 * This class is used to provide item level information about a bulk operation. This
 * master entity represents the bulk operation as a whole, while the detail entity
 * {@link BulkItem} represents the operation status for each individual data piece.
 * The {@link #items} list defines a contract in which the ordering of those items
 * needs to match the ordering of the original objects. For example, if the bulk
 * operation consisted of 10 person objects in which number X corresponded to the
 * person 'John Doe', then the Xth item in the {@link #items} list must reference to
 * the result of processing the same 'John Doe'
 */
public final class BulkOperationResult<T> implements Serializable
{
    /**
     * The operation ID
     */
    public Serializable getId();

    /**
     * Whether or not the operation was successful. Should be <code>true</code> if
     * and only if all the child {@link BulkItem} entities were also successful
     */
    public boolean isSuccessful();

    /**
     * An ordered list of {@link BulkItem}, one per each item in the original
     * operation, no matter if the record was successful or not
     */
    public List<BulkItem<T>> getItems();

    /**
     * A custom property stored under the given key
     *
     * @param key the key of the custom property
     * @return a {@link Serializable} value
     */
    public Serializable getCustomProperty(String key);
}

Basically, the above class is a Master-Detail relationship in which:

  • BulkOperationResult represents the operation as a whole, playing the role of the master.

  • BulkItem represents the result for each individual record, playing the role of the detail.

  • Both classes are immutable.

  • There’s an ordering relationship between the master and the detail. The first item in the BulkItem list has to correspond to the first record in the original bulk. The second has to correspond to the second one, and so forth.


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * This class represents an individual data piece in the context of a bulk operation
 */
public final class BulkItem<T> implements Serializable
{
    /**
     * The item ID
     */
    public Serializable getId();

    /**
     * Wether or not it was successful. Notice that this should be <code>false</code>
     * if {@link #exception} is not <code>null</code>, however there might not be an
     * exception but the item could still not be successful for other reasons.
     */
    public boolean isSuccessful();

    /**
     * Message to add context on this item. Could be an error description, a warning
     * or simply some info related to the operation
     */
    public String getMessage();

    /**
     * An optional status code
     */
    public String getStatusCode();

    /**
     * An exception if the item was failed
     */
    public Exception getException();

    /**
     * The actual data this entity represents
     */
    public T getPayload();

    /**
     * A custom property stored under the given key
     *
     * @param key the key of the custom property
     * @return a {@link Serializable} value
     */
    public Serializable getCustomProperty(String key);
}

Backwards Compatibility

When considering existing connectors, you need to take a few extra steps to ensure they remain backward compatible with any Mule applications in which they are being used. Typically, these Mule applications would be handling the output of bulk operations themselves. As a result, these connectors would need to incorporate a transformer.

Each connector needs to translate its own bulk operation representation to a BulkOperationResult object. This translation differs from one connector to the other, given the different API implementations.


         
      
1
public List<BatchResult> batchContacts(String batchId, List<NestedProcessor> operations) throws Exception;

In the above snippet, notice that the operation returns a list of BatchResult objects. Thus, the transformer has to transform BatchResults objects to BulkOperationResult.


         
      
1
2
3
4
@Start
public void init() {
 this.muleContext.getRegistry().registerTransformer(new BatchResultToBulkOperationTransformer());
}

         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class BatchResultToBulkOperationTransformer extends AbstractDiscoverableTransformer {

    public BatchResultToBulkOperationTransformer() {
        this.registerSourceType(DataTypeFactory.create(List.class, BatchResult.class, null));
        this.setReturnDataType(DataTypeFactory.create(BulkOperationResult.class));
    }

    @Override
    protected Object doTransform(Object src, String enc) throws TransformerException {
        List<BatchResult> results = (List<BatchResult>) src;

        BulkOperationResultBuilder<BaseEntry<?>> builder = BulkOperationResult.<BaseEntry<?>>builder();

        if (results != null) {
            for (BatchResult result : results) {
                BatchStatus status = result.getStatus();
                int code = status.getCode();

                builder.addItem(BulkItem.<BaseEntry<?>>builder()
                        .setRecordId(result.getId())
                        .setPayload(result.getEntry())
                        .setMessage(status.getContent())
                        .setStatusCode(String.format("%d - %s", code, status.getReason()))
                        .setSuccessful(code == 200 || code == 201 || code == 204)
                    );
            }
        }

        return builder.build();
    }

There are some important things to notice about the above transformer:

  • It needs to extend the AbstractDiscoverableTransformer class. This is so that the batch module can dynamically find it at runtime.

  • It defines the source and target data types on its constructor.

  • The doTransform() method does the transformation process.

  • Notice how BulkOperationResult and BulkItem classes provide convenient Builder objects to decouple their inner representations from your connector’s code.

See Also