About Python bricks
Python scripting bricks give you access to Python’s extensive list of supported libraries and allow you to process your messages with only a few lines of code.
Configuring a Python brick#
Python bricks have the following configuration parameters:
| Parameter | Description |
|---|---|
Script | You can add and edit your Python script in this field |
Script (URL) | Alternatively you can put a link to a gist of your script in this field |
Requirements | A space separated list of packages |
Python scripting bricks can be used for all brick types (source, transform, and action).
| Name | Brick Type |
|---|---|
python-source-ng | Source |
python-transform-ng | Transform |
python-action-ng | Action |
Writing a script brick using process()#
The most basic way to create a Python brick is by defining a function called process.
This function defines the logic of how you want to process incoming messages.
An example of a transform brick using process:
In this example, a field is added to the payload before it is produced.
Controlling Workflow behaviour with process()#
The output of process(msg) determines how the underlying Workflow client will handle the message. The table below shows the rules mapping outputs to Workflow client behaviour.
process output | What happens with the message |
|---|---|
| Process returns a message | The input message will get acked and the returned message is produced to TOPIC_OUT |
| Process yields several messages | Each yielded message will get produced to TOPIC_OUT. The input message is acked. |
Process returns None | The input message will get acked, but nothing is produced. |
| Process raises an exception | The input message will not get acknowledged and nothing is produced. |
Producing multiple messages using process()#
The following examples shows how to produce multiple messages from a single input message using yield:
Filtering messages using process()#
The next example shows how to filter out messages using process:
Advanced#
Brick variables#
Every brick has access to the following variables:
| Variable | Description |
|---|---|
| RAVEN_URL | The workflow URL |
| NAME | ID of the brick itself |
| SUBSCRIPTION | Subscription ID |
| TOPIC_IN | Topic ID of the input topic |
| TOPIC_OUT | Topic ID of the output topic |
These can be accessed directly without importing them.
The Message object#
A message has the following attributes:
| Attribute | Description |
|---|---|
| payload | The message payload. By default consumers use a JSON deserializer, but it's possible to use different deserializers for other payload types |
| message_id | The uuid of the message |
| ack_id | The ack id of the message |
| timestamp | The timestamp of the message |
The Consumer object#
Consumers can be initialized like this:
The parameters host, subscription, topic, and name can kept blank. They will default to $RAVEN_URL, $SUBSCRIPTION, $TOPIC_IN, and $NAME respectively.
It is also possible to define what deserializer to use:
The Producer object#
Producers can be initialized like this:
You can also keep the params blank. It will default to using $RAVEN_URL, $TOPIC_OUT, and $NAME.
It is also possible to define what serializer to use. By default the consumer will use JsonDeserializer.
Explicit consume / produce loop#
The consumer object can be used as a generator. Looping over this generator consumes messages whenever new messages are available. The example below shows a simple consume/produce loop that simply passes the message to TOPIC_OUT.
Potential pitfalls when using yield in process()#
When the process outputs its messages using yield, it is not possible to also use return. Having defined a yield means the output of process will be treated as a generator, so also returning a message will either break the brick or lead to incorrect output messages.
If process raises an Exception after having yielded messages, the input message will not be acked. This will result in duplicate message being produced to the output topic.
Persisting data#
Python script bricks have access to a key value store. The example below shows how to insert data into the store and how to retrieve it:
Using the reader#
In some scenarios it can be useful to read batches of data instead of going through the messages one by one. You can use Workflow.Reader for this. The code below shows how to use the reader.
The reader will go through the messages in a first-in-first-out order by default. You can change this to LIFO by setting the size argument of reader.batch_get to a negative number.
Write access#
Python bricks are only allowed to write to /tmp.