Free MuleSoft CONNECT Keynote & Expo Pass Available!

Register now+
Nav

Triggering a flow for each row in a table

A common use case is to trigger a flow for every row in a table. Although this could easily be achieves by combining a <scheduler> with a <db:select> operation, the <db:listener> component allows to simplify that automatically doing the following:

  • Generating the query

  • Handling watermark (optional)

  • Handling idemmpotency accross concurrent requests

The <db:listener> element shows as On Table Row in Studio and Flow Designer

         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
<flow name="onEachRow">
  <db:listener table="EVENTS" config-ref="dbConfig"> (1)
    <scheduling-strategy>
      <fixed-frequency frequency="60000"/>
    </scheduling-strategy>
  </db:listener>

  <flow-ref name="doProcess"> (2)

  <db:delete config-ref="dbConfig"> (3)
    <db:sql>DELETE FROM EVENTS WHERE ID = :id</db:sql>
    <db:input-parameters>#[{'id': payload.id]</db:input-parameters>
  </db:delete>
</flow>
1 Configure the <db:listener> to check for rows in the EVENTS table once every minute. It will automatically run the SELECT * FROM EVENTS query
2 The flow will be executed per each row, and in this example, processed through a flow ref
3 Finally the processed row is deleted to make sure it’s not picked up again in the next poll

Watermark

Not in all cases it makes sense to delete the row. In many cases, you just need to make sure that the row is not picked again in the next poll. One classic option for that is to use watermarking:


         
      
1
2
3
4
5
6
7
8
9
<flow name="onEachRow">
  <db:listener table="EVENTS" watermarkColumn="TIMESTAMP" config-ref="dbConfig">
    <scheduling-strategy>
      <fixed-frequency frequency="60000"/>
    </scheduling-strategy>
  </db:listener>

  <flow-ref name="doProcess">
</flow>

This example is pretty much the same as before, other than now we are using the TIMESTAMP column as a watermark. On each poll, the component will go through all the obtained rows and store the maximum value obtained. In the next poll, the query will look like this: SELECT * FROM EVENTS WHERE TIMESTAMP > :watermark

Idemmpotency

If the poll interval is small, or the amount of rows is big, or if procesing one single row takes too much time, you run into the risk of a new poll being executed before the watermark was updated or the row being deleted.

To help you with this, the component allows you to specify an ID column. That column will be considered as a unique identifier for the row and the listener will make sure that the row is not picked up again if it has already being found AND processing of it hasn’t finished yet. For example:


         
      
1
2
3
4
5
6
7
8
9
<flow name="onEachRow">
  <db:listener table="EVENTS" watermarkColumn="TIMESTAMP" idColumn="ID" config-ref="dbConfig">
    <scheduling-strategy>
      <fixed-frequency frequency="60000"/>
    </scheduling-strategy>
  </db:listener>

  <flow-ref name="doProcess">
</flow>
Notice that watermakColumn and idColumn are both optional and separate. You can use them together, separately or not at all. This depends on your use case

DataSense

DataSense is available for the listeners’s output. Studio and Flow Designer will automatically hint the table structure for you, making it easy to write expressions and mappings.