About Python bricks

Python scripting bricks give you access to Python’s extensive list of supported libraries and allow you to process your messages with only a few lines of code.

Configuring a Python brick#

Python bricks have the following configuration parameters:

ParameterDescription
ScriptYou can add and edit your Python script in this field
Script (URL)Alternatively you can put a link to a gist of your script in this field
RequirementsA space separated list of packages

Python scripting bricks can be used for all brick types (source, transform, and action).

NameBrick Type
python-source-ngSource
python-transform-ngTransform
python-action-ngAction

Writing a script brick using process()#

The most basic way to create a Python brick is by defining a function called process. This function defines the logic of how you want to process incoming messages.

An example of a transform brick using process:

def process(msg):
msg.payload = {**msg.payload, "new_field": "new_value"}
return msg

In this example, a field is added to the payload before it is produced.

Controlling Workflow behaviour with process()#

The output of process(msg) determines how the underlying Workflow client will handle the message. The table below shows the rules mapping outputs to Workflow client behaviour.

process outputWhat happens with the message
Process returns a messageThe input message will get acked and the returned message is produced to TOPIC_OUT
Process yields several messagesEach yielded message will get produced to TOPIC_OUT. The input message is acked.
Process returns NoneThe input message will get acked, but nothing is produced.
Process raises an exceptionThe input message will not get acknowledged and nothing is produced.

Producing multiple messages using process()#

The following examples shows how to produce multiple messages from a single input message using yield:

def process(msg):
for i in range(10):
yield Message({'xyz' : i})

Filtering messages using process()#

The next example shows how to filter out messages using process:

def process(msg):
if msg.payload.get("filter_field") == "filter_value":
return None
else:
return msg

Advanced#

Brick variables#

Every brick has access to the following variables:

VariableDescription
RAVEN_URLThe workflow URL
NAMEID of the brick itself
SUBSCRIPTIONSubscription ID
TOPIC_INTopic ID of the input topic
TOPIC_OUTTopic ID of the output topic

These can be accessed directly without importing them.

The Message object#

A message has the following attributes:

AttributeDescription
payloadThe message payload. By default consumers use a JSON deserializer, but it's possible to use different deserializers for other payload types
message_idThe uuid of the message
ack_idThe ack id of the message
timestampThe timestamp of the message

The Consumer object#

Consumers can be initialized like this:

from workflow import Consumer
consumer = Consumer.connect(host=RAVEN_URL,
subscription=SUBSCRIPTION,
topic=TOPIC_IN,
name=NAME)

The parameters host, subscription, topic, and name can kept blank. They will default to $RAVEN_URL, $SUBSCRIPTION, $TOPIC_IN, and $NAME respectively.

It is also possible to define what deserializer to use:

from workflow import Consumer, RawDeserializer
consumer = Consumer.connect(host=RAVEN_URL,
subscription=SUBSCRIPTION,
topic=TOPIC_IN,
name=NAME,
deserializer=RawDeserializer)

The Producer object#

Producers can be initialized like this:

from workflow import Producer
producer = Producer.connect(host=RAVEN_URL,
topic=TOPIC_OUT,
name=NAME)

You can also keep the params blank. It will default to using $RAVEN_URL, $TOPIC_OUT, and $NAME.

It is also possible to define what serializer to use. By default the consumer will use JsonDeserializer.

from workflow import Producer, RawSerializer, JsonDeserializer
#serializer = lambda val: val
#serializer = json.loads
#serializer = JsonDeserializer
serializer = RawSerializer
producer = Producer.connect(host=RAVEN_URL,
topic=TOPIC_OUT,
name=NAME,
serializer=serializer)

Explicit consume / produce loop#

The consumer object can be used as a generator. Looping over this generator consumes messages whenever new messages are available. The example below shows a simple consume/produce loop that simply passes the message to TOPIC_OUT.

from workflow import Consumer, Producer
consumer = Consumer.connect(host=RAVEN_URL,
subscription=SUBSCRIPTION,
topic=TOPIC_IN,
name=NAME)
producer = Producer.connect(host=RAVEN_URL,
topic=TOPIC_OUT,
name=NAME)
try:
for msg in consumer:
msg.payload = msg.payload
producer.produce(msg)
consumer.ack(msg)
except Exception as exc:
import traceback
traceback.print_exc()
consumer.close()
producer.close()

Potential pitfalls when using yield in process()#

When the process outputs its messages using yield, it is not possible to also use return. Having defined a yield means the output of process will be treated as a generator, so also returning a message will either break the brick or lead to incorrect output messages.

If process raises an Exception after having yielded messages, the input message will not be acked. This will result in duplicate message being produced to the output topic.

Persisting data#

Python script bricks have access to a key value store. The example below shows how to insert data into the store and how to retrieve it:

store.insert("key", "value")
print(store.get("key"))
# this will print "value"

Using the reader#

In some scenarios it can be useful to read batches of data instead of going through the messages one by one. You can use Workflow.Reader for this. The code below shows how to use the reader.

from workflow import Reader
with Reader.connect(timeout=1, topic=topic) as reader:
reader.batch_get(size, offset)
for msg in reader:
print(msg.payload)

The reader will go through the messages in a first-in-first-out order by default. You can change this to LIFO by setting the size argument of reader.batch_get to a negative number.

Write access#

Python bricks are only allowed to write to /tmp.