バッチ対応コネクタの作成

Mule の機能の 1 つは、Batch Processingでメッセージを処理することです。<batch:commit> ブロックによって提供されるこの機能は、外部ソースまたはサービスへの一括更新/挿入のためにレコードのサブセットを収集する必要がある場合、特に役立ちます。ただし、問題のコネクタが一括操作をサポートしていない限り、バッチブロック内で使用することはできません。

バッチには次の利点があります。

  • レコードの並列処理

  • API から従来のシステムへの大量の受信データを処理する

  • ビジネスアプリケーション間でデータセットを同期する

Salesforce ConnectorNetSuite などのいくつかの既存のコネクタは、長年にわたってバッチ操作をサポートしています。ただし、新しいバッチモジュールの導入により、開発者はコネクタの応答を解析して失敗したレコードを判断する必要がなくなりました。代わりに、<batch:commit> によって自動的に処理されます。

このドキュメントでは、Batch Processingドキュメントで紹介されているこの機能を利用するために、独自のコネクタを使用する方法について説明します。

前提条件

このドキュメントは、読者がBatch Processingに精通し、コネクタプロジェクトを作成済みで、使用する操作を定義済みであることを前提としています。

しくみ

新しいコネクタでのこの機能の実装は非常に簡単です。すべての一括操作が BulkOperationResult を返すようにコネクタを若干変更します。BulkOperationResult では、バッチモジュールがあらゆる種類の一括操作の結果を理解できます。

/**
 * This class is used to provide item level information about a bulk operation. This
 * master entity represents the bulk operation as a whole, while the detail entity
 * {@link BulkItem} represents the operation status for each individual data piece.
 * The {@link #items} list defines a contract in which the ordering of those items
 * needs to match the ordering of the original objects. For example, if the bulk
 * operation consisted of 10 person objects in which number X corresponded to the
 * person 'John Doe', then the Xth item in the {@link #items} list must reference to
 * the result of processing the same 'John Doe'
 */
public final class BulkOperationResult<T> implements Serializable
{
    /**
     * The operation ID
     */
    public Serializable getId();

    /**
     * Whether or not the operation was successful. Should be <code>true</code> if
     * and only if all the child {@link BulkItem} entities were also successful
     */
    public boolean isSuccessful();

    /**
     * An ordered list of {@link BulkItem}, one per each item in the original
     * operation, no matter if the record was successful or not
     */
    public List<BulkItem<T>> getItems();

    /**
     * A custom property stored under the given key
     *
     * @param key the key of the custom property
     * @return a {@link Serializable} value
     */
    public Serializable getCustomProperty(String key);
}

基本的に、上記のクラスは次の主従関係にあります。

  • BulkOperationResult は操作全体を表し、主の役割を果たす。

  • BulkItem は各レコードの結果を表し、従の役割を果たす。

  • 両方のクラスは不変。

  • 主と従の間には順序関係がある。BulkItem リスト内の最初の項目は、元の一括の最初のレコードに対応している必要があります。同様に、2 番目は 2 番目というように対応している必要があります。

/**
 * This class represents an individual data piece in the context of a bulk operation
 */
public final class BulkItem<T> implements Serializable
{
    /**
     * The item ID
     */
    public Serializable getId();

    /**
     * Wether or not it was successful. Notice that this should be <code>false</code>
     * if {@link #exception} is not <code>null</code>, however there might not be an
     * exception but the item could still not be successful for other reasons.
     */
    public boolean isSuccessful();

    /**
     * Message to add context on this item. Could be an error description, a warning
     * or simply some info related to the operation
     */
    public String getMessage();

    /**
     * An optional status code
     */
    public String getStatusCode();

    /**
     * An exception if the item was failed
     */
    public Exception getException();

    /**
     * The actual data this entity represents
     */
    public T getPayload();

    /**
     * A custom property stored under the given key
     *
     * @param key the key of the custom property
     * @return a {@link Serializable} value
     */
    public Serializable getCustomProperty(String key);
}

後方互換性

既存のコネクタを検討している場合、それらが使用されている Mule アプリケーションとの後方互換性を保つため、いくつかの追加手順が必要です。通常、これらの Mule アプリケーションは一括操作の出力を独自に処理します。そのため、これらのコネクタはトランスフォーマを組み込む必要があります。

各コネクタは、独自の一括操作表現を BulkOperationResult オブジェクトに変換する必要があります。API の実装が異なるため、この変換はコネクタごとに異なります。

public List<BatchResult> batchContacts(String batchId, List<NestedProcessor> operations) throws Exception;

上記のスニペットでは、操作が BatchResult オブジェクトのリストを返しています。そのため、トランスフォーマは BatchResults オブジェクトを BulkOperationResult に変換する必要があります。

@Start
public void init() {
 this.muleContext.getRegistry().registerTransformer(new BatchResultToBulkOperationTransformer());
}
public class BatchResultToBulkOperationTransformer extends AbstractDiscoverableTransformer {

    public BatchResultToBulkOperationTransformer() {
        this.registerSourceType(DataTypeFactory.create(List.class, BatchResult.class, null));
        this.setReturnDataType(DataTypeFactory.create(BulkOperationResult.class));
    }

    @Override
    protected Object doTransform(Object src, String enc) throws TransformerException {
        List<BatchResult> results = (List<BatchResult>) src;

        BulkOperationResultBuilder<BaseEntry<?>> builder = BulkOperationResult.<BaseEntry<?>>builder();

        if (results != null) {
            for (BatchResult result : results) {
                BatchStatus status = result.getStatus();
                int code = status.getCode();

                builder.addItem(BulkItem.<BaseEntry<?>>builder()
                        .setRecordId(result.getId())
                        .setPayload(result.getEntry())
                        .setMessage(status.getContent())
                        .setStatusCode(String.format("%d - %s", code, status.getReason()))
                        .setSuccessful(code == 200 || code == 201 || code == 204)
                    );
            }
        }

        return builder.build();
    }

上記のトランスフォーマに関して注意すべき重要な点がいくつかあります。

  • AbstractDiscoverableTransformer クラスを拡張する必要がある。これは、バッチモジュールが実行時にそれを動的に見つけることができるようにするためです。

  • そのコンストラクタで変換元と変換先のデータ型を定義する。

  • doTransform() メソッドは変換処理を実行する。

  • BulkOperationResult および BulkItem クラスは、コネクタのコードから内部表現を分離するための便利な Builder オブジェクトを提供する。

関連情報

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub