Azure Event Hubs Connector 1.2 の例

Azure Event Hubs Connector の Mule フローの例を示します。

始める前に

  • Java 8、11、または 17

  • Anypoint Studio 7.5 以降

  • Mule Runtime Engine (Mule) 4.3.0 以降

  • DataWeave

接続の設定ファイルを作成する

接続のプロパティが含まれる設定ファイルを作成します。

  1. /src/main/resources/​ フォルダー内に ​mule-app.properties​ という名前のファイルを作成します。

  2. mule-app.properties​ ファイルに、次のような接続の一連のプロパティを作成します。括弧で囲まれたテキスト (括弧を含む) を各自の設定の適切な値に置き換えます。

    config.namespace = <namespace>
    	config.eventHuName = <eventHuName>
    	ad-config.tenantId = <tenantId>
    	ad-config.clientId = <clientId>
    	ad-config.ClientSecret = <ClientSecret>
    
    	sas-config.sharedAccessKeyName = <sharedAccessKeyName>
    	sas-config.sharedAccessKey = <sharedAccessKey>
    
    	bs-config.containerName = <containerName>
    	bs-config.accountName = <accountName>
    	bs-config.accountKey = <accountKey>

プロパティファイルの作成についての詳細は、​「プロパティプレースホルダーの設定」​を参照してください。

1 つのイベントの送信と EventHub Listener を使用したリスン

この Mule フローでは、1 つのイベントをイベントハブに送信します。結果はコンソールに表示されます。

この例では、次の操作を使用します。

  • HTTP Listener
    HTTP 要求からデータを受け取ります。

  • Send a single event
    1 つのイベントを送信します。

  • Logger
    パブリッシュされたイベントを表示します。

  • Eventhub listener
    イベントハブの名前空間のイベントを読み取ります。

  • Logger
    リスンされたイベントを記録します。

「Send a Single Event (1 つのイベントを送信)」フローの Studio フロー

この例の XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
	xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns:file="http://www.mulesoft.org/schema/mule/file"
	xmlns:sftp="http://www.mulesoft.org/schema/mule/sftp"
	xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:email="http://www.mulesoft.org/schema/mule/email" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core" xmlns:kafka="http://www.mulesoft.org/schema/mule/kafka" xmlns:azure-event-hub-management-connector="http://www.mulesoft.org/schema/mule/azure-event-hub-management-connector" xmlns:azure-eventhubs="http://www.mulesoft.org/schema/mule/azure-eventhubs" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/azure-eventhubs http://www.mulesoft.org/schema/mule/azure-eventhubs/current/mule-azure-eventhubs.xsd
http://www.mulesoft.org/schema/mule/azure-event-hub-management-connector http://www.mulesoft.org/schema/mule/azure-event-hub-management-connector/current/mule-azure-event-hub-management-connector.xsd
http://www.mulesoft.org/schema/mule/kafka http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/email http://www.mulesoft.org/schema/mule/email/current/mule-email.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/sftp http://www.mulesoft.org/schema/mule/sftp/current/mule-sftp.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd">


	<configuration-properties doc:name="Configuration properties" file="mule-app.properties" />

	<http:listener-config name="HTTP_config" doc:name="HTTP Listener config"  >
		<http:listener-connection host="0.0.0.0" port="8081" />
	</http:listener-config>

	<azure-eventhubs:azure-eventhubs-config name="ActiveDirectory" doc:name="Azure Event Hubs Connector Azure Event Hubs" >
		<azure-eventhubs:active-directory-authentication-connection namespace="${config.namespace}" eventHubName="${config.eventHubName}">

	<azure-eventhubs:token-credential-properties tenantId="${ad-config.tenantId}" clientId="${ad-config.clientId}" clientSecret="${ad-config.clientSecret}" />
			<azure-eventhubs:checkpoint-store-type>
				<azure-eventhubs:azure-blob-storage-sas-authentication containerName="${bs-config.containerName}" accountName="${bs-config.accountName}" accountKey="${bs-config.accountKey}" />
			</azure-eventhubs:checkpoint-store-type>
		</azure-eventhubs:active-directory-authentication-connection>
	</azure-eventhubs:azure-eventhubs-config>

		<flow name="publish-single-event"  >
		<http:listener doc:name="events"  config-ref="HTTP_Listener_config" path="/events" allowedMethods="GET"/>
		<azure-eventhubs:publish doc:name="Send a single event"  config-ref="ActiveDirectory">
		    <azure-eventhubs:event ><![CDATA[#[{
            "body":"event to send",
            "metadata": {
                "param1": "param 1",
            "param2": "param 2",
            }
            }]]]>
                </azure-eventhubs:event>
		</azure-eventhubs:publish>
		<logger level="INFO" doc:name="Logger" message="messagePublished"/>
	</flow>


	<flow name="EventHubListner" >
		<azure-eventhubs:eventhub-listener doc:name="Eventhub listener"  config-ref="ActiveDirectory" checkpointFrequency="1" consumerGroup="consumer-group-1 "/>
		<logger level="INFO" doc:name="Logger" message="Message Received on Event Hub!" />
	</flow>
</mule>

この例の実行手順

  1. コネクタが設定されていることを確認します。

  2. プロジェクトを保存します。

  3. Web ブラウザーから​「http://localhost:8081/events」​と入力してアプリケーションをテストします。

イベントのバッチのパブリッシュと Partition Listener を使用したリスン

この Mule フローでは、複数のイベントを同時にイベントハブに送信します。結果はコンソールに表示されます。

この例では、次の操作を使用します。

  • HTTP Listener
    HTTP 要求からデータを受け取ります。

  • Publish in bulk
    イベントのバッチを送信します。

  • Logger
    パブリッシュされたイベントを表示します。

  • Partition listener
    特定のイベントハブパーティションの名前空間のイベントを読み取ります。

  • Logger
    リスンされたイベントを 2 回記録します。

「Publish a batch of Events (イベントのバッチをパブリッシュ)」フローの Studio フロー

この例の XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:tls="http://www.mulesoft.org/schema/mule/tls"
	xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns:file="http://www.mulesoft.org/schema/mule/file"
	xmlns:sftp="http://www.mulesoft.org/schema/mule/sftp"
	xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:email="http://www.mulesoft.org/schema/mule/email" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core" xmlns:kafka="http://www.mulesoft.org/schema/mule/kafka" xmlns:azure-event-hub-management-connector="http://www.mulesoft.org/schema/mule/azure-event-hub-management-connector" xmlns:azure-eventhubs="http://www.mulesoft.org/schema/mule/azure-eventhubs" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/tls http://www.mulesoft.org/schema/mule/tls/current/mule-tls.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/azure-eventhubs http://www.mulesoft.org/schema/mule/azure-eventhubs/current/mule-azure-eventhubs.xsd
http://www.mulesoft.org/schema/mule/azure-event-hub-management-connector http://www.mulesoft.org/schema/mule/azure-event-hub-management-connector/current/mule-azure-event-hub-management-connector.xsd
http://www.mulesoft.org/schema/mule/kafka http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/email http://www.mulesoft.org/schema/mule/email/current/mule-email.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/sftp http://www.mulesoft.org/schema/mule/sftp/current/mule-sftp.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd">


	<http:listener-config name="HTTP_config_" doc:name="HTTP Listener config"  >
		<http:listener-connection host="0.0.0.0" port="8081" />
	</http:listener-config>

	<azure-eventhubs:azure-eventhubs-config name="SAS" doc:name="Azure Event Hubs Connector Azure Event Hubs" >
		<azure-eventhubs:sas-authentication-connection namespace="${config.namespace}" sharedAccessKeyName="${sas-config.sharedAccessKey}" sharedAccessKey="${sas-config.sharedAccessKeyName}" eventHubName="${config.eventHuName}" >
		</azure-eventhubs:sas-authentication-connection>
	</azure-eventhubs:azure-eventhubs-config>

	<configuration-properties doc:name="Configuration properties" file="mule-app.properties" />

		<flow name="Partition-Listener" >
		<azure-eventhubs:partition-listener doc:name="Partition listener"  config-ref="SAS" partitionId="1">
			<azure-eventhubs:event-position-type >
				<azure-eventhubs:latest />
			</azure-eventhubs:event-position-type>
		</azure-eventhubs:partition-listener>
		<logger level="INFO" doc:name="Logger" message='Message received in partition'/>
	</flow>
	<flow name="Publish-in-Bulk" >
		<http:listener doc:name="events"  config-ref="HTTP_Listener_config" path="/bulk" allowedMethods="GET" />
		<azure-eventhubs:bulk-publish doc:name="Publish in bulk"  config-ref="SAS" maxBatchSizeInBytes="2" partitionId="1">
			<azure-eventhubs:events ><![CDATA[#[[{
	"body": "body event1"
},
{
	"body": "body event2"
}]]]]></azure-eventhubs:events>
		</azure-eventhubs:bulk-publish>
		<logger level="INFO" doc:name="Logger" message='A Batch  of messages published' />
	</flow>

</mule>

この例の実行手順

  1. コネクタが設定されていることを確認します。

  2. プロジェクトを保存します。

  3. Web ブラウザーから​「http://localhost:8081/bulk」​と入力してアプリケーションをテストします。

コンテンツタイプを使用した 1 つのイベントの送信

この Mule フローでは、コンテンツタイプが JSON の 1 つのイベントをイベントハブに送信します。

Studio の [Content Type (コンテンツタイプ)] 設定を使用した 1 つのイベントの送信

この例の XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

<flow name="Send-single-event-with-content-type" >
		<http:listener doc:name="events" config-ref="HTTP_Listener_config" path="/content"  />
		<azure-eventhubs:publish doc:name="Send a single event" config-ref="ActiveDirectory" partitionId="1" contentType="application/json">
			<azure-eventhubs:event ><![CDATA[#[{
 "body": write(payload, "application/json"),
}]]]></azure-eventhubs:event>
		</azure-eventhubs:publish>
		<logger level="INFO" doc:name="Logger" message="Event Published" />
	</flow>