Skip to main content

Python Bricks

PurposeDive into this guide for a deeper understanding of Python Bricks and their setup process. We will walk you through different elements you can employ in Python and provide step-by-step guidance on their configuration.
Last UpdatedMay 18, 2024

About Python Bricks

Python scripting Bricks give you access to Python’s extensive list of supported libraries and allow you to process your messages with only a few lines of code.

Configuring a Python Brick

Configuration parameters for Python Bricks include the following:

ParameterDescription
ScriptYou can add and edit your Python script in this field
Script (URL)Alternatively you can put a link to a gist of your script in this field
RequirementsA space separated list of packages

Writing a script Brick using process()

Creating a Python Brick is most straightforward when defining a function named process(). This function defines the logic of how you want to process incoming messages.

An example of a transform Brick using process():

def process(msg):

msg.payload = {**msg.payload, "new_field": "new_value"}

return msg

In this example, a field is added to the payload before it is produced.

Controlling Workflow behavior with process()

The result of process(msg) dictates the handling of the message by the underlying Workflow client.

The table below shows the rules mapping outputs to Workflow client behaviour.

process outputWhat happens with the message
Process returns a messageThe input message will get acked and the returned message is produced to TOPIC_OUT
Process yields several messagesEach yielded message will get produced to TOPIC_OUT. The input message is acked.
Process returns NoneThe input message will get acked, but nothing is produced.
Process raises an exceptionThe input message will not get acknowledged and nothing is produced.

Producing multiple messages using process()

The next examples show how to produce multiple messages from a single input message using the yield statement:

def process(msg):

for i in range(10):

yield Message({'xyz' : i})

Filtering messages using process()

The following example demonstrates filtering out messages using the process() function:

def process(msg):

if msg.payload.get("filter_field") == "filter_value":

return None

else:

return msg

Advanced Python Brick configuration

Brick variables

Every Brick has access to the following variables:

VariableDescription
RAVEN_URLThe workflow URL
NAMEID of the Brick itself
SUBSCRIPTIONSubscription ID
TOPIC_INTopic ID of the input topic
TOPIC_OUTTopic ID of the output topic

These variables can be accessed directly without importing them.

The Message object

A message has the following attributes:

AttributeDescription
payloadBy default consumers use a JSON deserializer, but it's possible to use different deserializers for other payload types
message_idThe uuid of the message
ack_idThe ack id of the message
timestampThe timestamp of the message

The Consumer object

Consumers can be initialized like this:

from workflow import Consumer

consumer = Consumer.connect(host=RAVEN_URL,

subscription=SUBSCRIPTION,

topic=TOPIC_IN,

name=NAME)

The parameters host, subscription, topic, and name can kept blank. They will default to $RAVEN_URL, $SUBSCRIPTION, $TOPIC_IN, and $NAME respectively.

It is also possible to define what deserializer to use:

from workflow import Consumer, RawDeserializer

consumer = Consumer.connect(host=RAVEN_URL,

subscription=SUBSCRIPTION,

topic=TOPIC_IN,

name=NAME,

deserializer=RawDeserializer)

The Producer object

Producers can be initialized like this:

from workflow import Producer

producer = Producer.connect(host=RAVEN_URL,

topic=TOPIC_OUT,

name=NAME)

You can also keep the parameters blank. It will default to using $RAVEN_URL, $TOPIC_OUT, and $NAME.

It is also possible to define what serializer to use. By default the consumer will use JsonDeserializer.

from workflow import Producer, RawSerializer, JsonDeserializer

#serializer = lambda val: val

#serializer = json.loads

#serializer = JsonDeserializer

serializer = RawSerializer

producer = Producer.connect(host=RAVEN_URL,

topic=TOPIC_OUT,

name=NAME,

serializer=serializer)

Explicit consume / produce loop

The consumer object can be used as a generator. Looping over this generator consumes messages whenever new messages are available. The example below shows a simple consume/produce loop that simply passes the message to TOPIC_OUT.

from workflow import Consumer, Producer

consumer = Consumer.connect(host=RAVEN_URL,

subscription=SUBSCRIPTION,

topic=TOPIC_IN,

name=NAME)

producer = Producer.connect(host=RAVEN_URL,

topic=TOPIC_OUT,

name=NAME)

try:

for msg in consumer:

msg.payload = msg.payload

producer.produce(msg)

consumer.ack(msg)

except Exception as exc:

import traceback

traceback.print_exc()

consumer.close()

producer.close()

Potential pitfalls when using yield in process()

When the process() outputs its messages using yield, it is not possible to also use return. Having defined a yield means the output of process() will be treated as a generator, so also returning a message will either break the Brick or lead to incorrect output messages.

If process() raises an Exception after having yielded messages, the input message will not be backed. This will result in duplicate message being produced to the output topic.

Persisting data

Python script Bricks can use a key value store. The example below shows how to insert data into the store and how to retrieve it:

store.insert("key", "value")

print(store.get("key"))

this will print "value"

Using the Reader:

In some scenarios, it can be useful to read batches of data instead of going through the messages one by one. You can use Workflow.Reader for this.

from workflow import Reader

with Reader.connect(timeout=1, topic=topic) as reader:

reader.batch_get(size, offset)

for msg in reader:

print(msg.payload)

The reader will go through the messages in a first-in-first-out order by default. You can change this to LIFO by setting the size argument of reader.batch_get to a negative number.

Write access

Write access for Python Bricks is restricted to the /tmp directory.