Configuring HTTP Polling Source
Anypoint Connector for HTTP Connector (HTTP Connector) provides a Polling Source event source that executes a scheduled HTTP request to a given HTTP server. This source does not wait for an HTTP request from a user, as does the HTTP Listener source.
In the HTTP Polling Source event source, you can configure:
-
A scheduled frequency that can be a fixed frequency or a cron expression.
-
The event source to split the HTTP response, so every item in the response is executed in a different execution of the flow.
-
How to retrieve watermarking.
Relationship with HTTP Request Operation
Because the HTTP Polling Source event source performs an HTTP request (such as the HTTP Request operation), configure the source by using HTTP request global configuration, including configuring the URL, port, base path, protocol, and any custom request configuration that your implementation requires
The source executes an HTTP request and, with the resulting HTTP response, creates one or more events that trigger its flow.
The following configuration example shows a polling source that polls every 10 hours.
In the Configuration XML view, the <http:request-config>
, <http:polling-source>
, and <scheduling-strategy>
configurations look like this:
<http:request-config name="requestConfig">
<http:request-connection host="localhost" port="8081"/>
</http:request-config>
<flow name="example-polling-source">
<http:polling-source config-ref="requestConfig" path="/test">
<scheduling-strategy >
<fixed-frequency frequency="10" timeUnit="HOURS"/>
</scheduling-strategy>
</http:polling-source>
<logger message="#[payload]"/>
</flow>
Configuring Polling Frequency
Configure the schedule strategy as either a fixed frequency or cron expression in the Scheduling strategy (<scheduling-strategy>
) field.
In the Configuration XML editor, the <scheduling-strategy>
and <cron>
configurations look like this:
<flow name="example-polling-source">
<http:polling-source config-ref="requestConfig" path="/test">
<scheduling-strategy>
<cron expression="0/1 * * * * ?"/>
</scheduling-strategy>
</http:polling-source>
<logger message="#[payload]"/>
</flow>
Configuring Parameters
In the same way that you can for the HTTP Request operation, you can also configure the body, headers, URI parameters, and query parameters for the HTTP Polling Source event source.
The following example shows how to configure the Body (polling-request-body
), Headers (polling-request-headers
), URI parameters (polling-request-uri-param
), and Query parameters (polling-request-query-param
) fields with values to which you can add expressions. These expressions can depend on the watermark.
In the Configuration XML editor, the polling-request-body
, polling-request-headers
, polling-request-uri-param
, and polling-request-query-param
configurations look like this:
<flow name="basic-polling-full-params">
<http:polling-source config-ref="requestConfig" path="/{testPath}">
<scheduling-strategy >
<fixed-frequency frequency="2" timeUnit="SECONDS"/>
</scheduling-strategy>
<http:polling-request-body ><![CDATA[{'someBodyValue': 10}]]></http:polling-request-body>
<http:polling-request-headers >
<http:polling-request-header key="content" value="application/json" />
<http:polling-request-header key="testHeader" value="Messi" />
</http:polling-request-headers>
<http:polling-request-uri-params >
<http:polling-request-uri-param key="testPath" value="test" />
</http:polling-request-uri-params>
<http:polling-request-query-params >
<http:polling-request-query-param key="testQueryParam" value="parameter value" />
</http:polling-request-query-params>
</http:polling-source>
<logger message="#[payload]"/>
</flow>
Configuring Split Expressions
After the HTTP Polling Source event source executes the HTTP request and retrieves an HTTP response, you can perform a transformation and split the HTTP response in such a way that every item in that response is executed in a different execution of the flow.
For example, imagine that you poll a server that gets its data from a database, and the server returns 50 results. You want to send each result separately to the flow without splitting the response between the flow and the use of a For each component. To split the response, configure the Split expression (splitExpression
) field.
In the splitting expression, refer to the response as payload
, as you would do after an HTTP Request operation.
The following example shows a server that returns a JSON payload in which the items are in an array. The set-payload
component configuration looks like this:
<flow name="jsonListenerFlow">
<http:listener config-ref="listenerConfig" path="/json"/>
<set-payload value="#[output application/json --- [{'name': 'Mondi'}, {'name': 'Viena'}]]"/>
</flow>
In this case, the corresponding expression to split the array is #[payload]
. In the splitting expression, the referred payload
is the entire response, already treated as a JSON file. Meanwhile, the payload
referred to in the logger
processor refers to the corresponding payload for its execution: on one execution {'name': 'Mondi'}
, and on the other execution {'name': 'Viena'}
). The splitExpression
configuration looks like this:
<flow name="jsonPollingFlow">
<http:polling-source config-ref="requestConfig" path="json" splitExpression="#[payload]">
<scheduling-strategy>
<fixed-frequency frequency="10" timeUnit="SECONDS"/>
</scheduling-strategy>
</http:polling-source>
<logger message="#[payload]"/>
</flow>
The following example shows two concurrent executions of the flow after a single polling: one execution with <name>Mondi</name>
and another execution with <name>Viena</name>
. After 10 hours, the polling is executed again. In this example, the server returns the same result:
<flow name="xmlListenerFlow">
<http:listener config-ref="listenerConfig" path="/xml"/>
<set-payload value="#[output application/xml --- {'something': {'element': [{name: 'Mondi'}, {name: 'Viena'}]}}]"/>
</flow>
<flow name="xmlPollingFlow">
<http:polling-source config-ref="requestConfig" path="xml"
splitExpression="#[payload.something.*element]">
<scheduling-strategy>
<fixed-frequency frequency="10" timeUnit="HOURS"/>
</scheduling-strategy>
</http:polling-source>
<logger message="#[payload]"/>
</flow>
Configuring Watermarking Expressions
In the previous simple examples, the server always returns the same HTTP response. In more complex scenarios the server needs to know that it has to send the next response. You can send headers, URI parameters, body, or query parameters, but if these parameters always contain the same values in the HTTP request, the server does not know what would be the next response.
For polling scenarios like this, you implement watermarking, in which the server returns a watermark value either for the entire payload or for every item in that payload separately. For example, the watermark value can be a timestamp that refers to the entire collection, or every item could have its own timestamp. In either case, you provide a watermark expression by using the watermark
placeholder.
The expression extracts the watermark from the response and subsequently uses this watermark to send the requests to the server. You can use expressions for the body, headers, URI parameters, and query parameters values in the watermark
placeholder.
Note that in the first execution, the watermark
placeholder value is null
, which you might want to consider in the server or the expression where the placeholder is used.
To refer to the entire payload in the watermark expression, use the payload
placeholder. To refer to an item, use the item
placeholder, which applies the watermarking expression each item individually.
The following XML configuration example shows an HTTP Listener flow. In the first polling iteration, when there is no watermark, a payload is returned with a watermark value set in the wm
property. In the second polling iteration, a watermark value is expected, so the payload is different. In this case, coming from the request’s payload:
<flow name="watermarkInPayloadListenerFlow">
<http:listener config-ref="watermarkListenerConfig" path="/watermark-payload"/>
<choice>
<when expression="#[payload.watermark == null]">
<set-payload value="#[output application/json --- {'items': [{'name': 'Eze'}, {'name': 'Fabi'}, {'name': 'Sofi'}], 'wm': 0}]"/>
</when>
<when expression="#[payload.watermark == '0']">
<set-payload value="#[output application/json --- {'items': [{'name': 'Euge'}, {'name': 'Juli'}], 'wm': 1}]"/>
</when>
<when expression="#[payload.watermark == '1']">
<set-payload value="#[output application/json --- {'items': [{'name': 'Pablo'}, {'name': 'MartÃn'}], 'wm': 2}]"/>
</when>
<otherwise>
<set-payload value="#[output application/json --- {'items': [], 'wm': 3}]"/>
</otherwise>
</choice>
</flow>
The following XML configuration example shows how to extract the watermark value from the entire payload and then use the value in the body of the request. The configuration uses the HTTP Polling Source event source instead of the HTTP Listener source:
<flow name="watermarkInPayloadPollingFlow">
<http:polling-source config-ref="watermarkRequestConfig" path="watermark-payload"
splitExpression="#[payload.items]" watermarkExpression="#[payload.wm]">
<scheduling-strategy>
<fixed-frequency frequency="5" timeUnit="MINUTES"/>
</scheduling-strategy>
<http:polling-request-body><![CDATA[#[output application/json --- {'watermark': watermark}]]]></http:polling-request-body>
<http:polling-request-headers >
<http:polling-request-header key="Content-Type" value="application/json" />
</http:polling-request-headers>
</http:polling-source>
<logger message="#[payload]"/>
</flow>
-
The
watermarkExpression
has thewatermark
placeholder that retrieves the watermark from the propertywm
of the response. -
The first polling iteration includes three executions of the flow, one execution with
{name: 'Eze'}
, another execution with{name: 'Fabi'}
, and the last execution with{name: 'Sofi'}
. -
In the second polling iteration, which occurs five minutes later, there are two executions, one execution with
{name: 'Euge'}
and another execution with{name: 'Juli'}
). -
In the second polling iteration, which occurs five minutes later, there are two executions, one execution with
{name: 'Euge'}
and another execution with{name: 'Juli'}
). -
Afterward the third iteration, polling continues, but because the results are empty, there are no flow executions.
The following XML configuration example shows how to extract the watermark value from each item and then use that value in the query parameters. The behavior depends completely on how the HTTP server uses the watermarking value:
<flow name="watermarkIntoQueryParamsListenerFlow">
<http:listener config-ref="watermarkListenerConfig" path="/watermark-into-query"/>
<choice>
<when expression="#[attributes.queryParams.watermark == '0']">
<set-payload value="#[output application/json --- {'items': [{'name': 'Rodro', 'wm': 1}, {'name': 'Steve', 'wm': 2}, {'name': 'Juan', 'wm': 3}]}]"/>
</when>
<when expression="#[attributes.queryParams.watermark == '2']">
<set-payload value="#[output application/json --- {'items': [{'name': 'Axel', 'wm': 4}, {'name': 'Mariano', 'wm': 5}]}]"/>
</when>
<when expression="#[attributes.queryParams.watermark == '5']">
<set-payload value="#[output application/json --- {'items': [{'name': 'Ivan', 'wm': 6}, {'name': 'Hyeran', 'wm': 7}]}]"/>
</when>
<otherwise>
<set-payload value="#[output application/json --- {'items': []}]"/>
</otherwise>
</choice>
</flow>
<flow name="watermarkIntoQueryParamsPollingFlow">
<http:polling-source config-ref="watermarkRequestConfig" path="watermark-into-query"
splitExpression="#[payload.items]" watermarkExpression="#[item.wm]">
<scheduling-strategy>
<fixed-frequency frequency="1" timeUnit="SECONDS"/>
</scheduling-strategy>
<http:polling-request-query-params >
<http:polling-request-query-param key="watermark" value="#[watermark default 0]" />
</http:polling-request-query-params>
</http:polling-source>
<logger message="#[payload]"/>
</flow>
Configuring Idempotency Expressions
To avoid the concurrent execution of two flows with the same payload identification, configure idempotency for the HTTP Polling Source event source. This configuration ensures that payloads with the same ID are not processed concurrently.
To configure idempotency for the event source, add an ID expression to retrieve the ID from the item, similar to the watermarking expression configuration. Use payload
and item
placeholders in the idempotency expression idExpression
.
In the following XML configuration example, the first three items execute the flow concurrently and the fourth item starts its process only after the first item finishes processing.
<flow name="identityWithoutWatermarkListenerFlow">
<http:listener config-ref="watermarkListenerConfig" path="/identity-no-watermark"/>
<set-payload value="#[output application/json --- {'items': [{'name': 'Rodro', 'value': 5}, {'name': 'Eze', 'value': 8}, {'name': 'MG', 'value': 7}, {'name': 'Rodro', 'value': 14}]}]"/>
</flow>
<flow name="identityWithoutWatermarkPollingFlow">
<http:polling-source config-ref="watermarkRequestConfig" path="identity-no-watermark"
splitExpression="#[payload.items]" idExpression="#[item.name]">
<scheduling-strategy>
<fixed-frequency frequency="1" timeUnit="HOURS"/>
</scheduling-strategy>
</http:polling-source>
<logger message="#[payload]"/>
</flow>
Configuring Response Validators
Validate the responses received by configuring and using a response validator.
The following XML configuration example shows a situation in which the server always returns a status code of 301, when the response validator expects a default status code between 200 and 299 and, as a result, the HTTP response always fails. This indicates that the flow does not execute, and the payload is considered valid only when the response validator defines 301 as the status code (the splitting, watermarking, and idempotency configurations are applied then):
<flow name="responseErrorListenerFlow">
<http:listener config-ref="responseListenerConfig" path="/response-error">
<http:response statusCode="301"/>
</http:listener>
<set-payload value="#[output application/json --- [{'name': 'ex1'}, {'name': 'ex2'}, {'name': 'ex3'}]]"/>
</flow>
<flow name="responseErrorPollingFlow">
<http:polling-source config-ref="responseRequestConfig" path="response-error"
splitExpression="#[payload]">
<scheduling-strategy>
<fixed-frequency frequency="10" timeUnit="SECONDS"/>
</scheduling-strategy>
<http:response-validator>
<http:success-status-code-validator values="200..299" />
</http:response-validator>
</http:polling-source>
<logger message="#[payload]"/>
</flow>