Python Bricks
Purpose | Dive into this guide for a deeper understanding of Python Bricks and their setup process. We will walk you through different elements you can employ in Python and provide step-by-step guidance on their configuration. |
---|
Last Updated | May 18, 2024 |
---|
About Python Bricks
Python scripting Bricks give you access to Python’s extensive list of supported libraries and allow you to process your messages with only a few lines of code.
Configuring a Python Brick
Configuration parameters for Python Bricks include the following:
Parameter | Description |
---|---|
Script | You can add and edit your Python script in this field |
Script (URL) | Alternatively you can put a link to a gist of your script in this field |
Requirements | A space separated list of packages |
Writing a script Brick using process()
Creating a Python Brick is most straightforward when defining a function named process()
. This function defines the logic of how you want to process incoming messages.
An example of a transform Brick using process()
:
def process(msg):
msg.payload = {**msg.payload, "new_field": "new_value"}
return msg
In this example, a field is added to the payload before it is produced.
Controlling Workflow behavior with process()
The result of process(msg)
dictates the handling of the message by the underlying Workflow client.
The table below shows the rules mapping outputs to Workflow client behaviour.
process output | What happens with the message |
---|---|
Process returns a message | The input message will get acked and the returned message is produced to TOPIC_OUT |
Process yields several messages | Each yielded message will get produced to TOPIC_OUT . The input message is acked. |
Process returns None | The input message will get acked, but nothing is produced. |
Process raises an exception | The input message will not get acknowledged and nothing is produced. |
Producing multiple messages using process()
The next examples show how to produce multiple messages from a single input message using the yield
statement:
def process(msg):
for i in range(10):
yield Message({'xyz' : i})
Filtering messages using process()
The following example demonstrates filtering out messages using the process()
function:
def process(msg):
if msg.payload.get("filter_field") == "filter_value":
return None
else:
return msg
Advanced Python Brick configuration
Brick variables
Every Brick has access to the following variables:
Variable | Description |
---|---|
RAVEN_URL | The workflow URL |
NAME | ID of the Brick itself |
SUBSCRIPTION | Subscription ID |
TOPIC_IN | Topic ID of the input topic |
TOPIC_OUT | Topic ID of the output topic |
These variables can be accessed directly without importing them.
The Message
object
A message has the following attributes:
Attribute | Description |
---|---|
payload | By default consumers use a JSON deserializer, but it's possible to use different deserializers for other payload types |
message_id | The uuid of the message |
ack_id | The ack id of the message |
timestamp | The timestamp of the message |
The Consumer
object
Consumers can be initialized like this:
from workflow import Consumer
consumer = Consumer.connect(host=RAVEN_URL,
subscription=SUBSCRIPTION,
topic=TOPIC_IN,
name=NAME)
The parameters host
, subscription
, topic
, and name
can kept blank. They will default to $RAVEN_URL
, $SUBSCRIPTION
, $TOPIC_IN
, and $NAME
respectively.
It is also possible to define what deserializer to use:
from workflow import Consumer, RawDeserializer
consumer = Consumer.connect(host=RAVEN_URL,
subscription=SUBSCRIPTION,
topic=TOPIC_IN,
name=NAME,
deserializer=RawDeserializer)
The Producer
object
Producers can be initialized like this:
from workflow import Producer
producer = Producer.connect(host=RAVEN_URL,
topic=TOPIC_OUT,
name=NAME)
You can also keep the parameters blank. It will default to using $RAVEN_URL
, $TOPIC_OUT
, and $NAME
.
It is also possible to define what serializer to use. By default the consumer will use JsonDeserializer
.
from workflow import Producer, RawSerializer, JsonDeserializer
#serializer = lambda val: val
#serializer = json.loads
#serializer = JsonDeserializer
serializer = RawSerializer
producer = Producer.connect(host=RAVEN_URL,
topic=TOPIC_OUT,
name=NAME,
serializer=serializer)
Explicit consume / produce loop
The consumer object can be used as a generator. Looping over this generator
consumes messages whenever new messages are available. The example
below shows a simple consume/produce loop that simply passes the message
to TOPIC_OUT
.
from workflow import Consumer, Producer
consumer = Consumer.connect(host=RAVEN_URL,
subscription=SUBSCRIPTION,
topic=TOPIC_IN,
name=NAME)
producer = Producer.connect(host=RAVEN_URL,
topic=TOPIC_OUT,
name=NAME)
try:
for msg in consumer:
msg.payload = msg.payload
producer.produce(msg)
consumer.ack(msg)
except Exception as exc:
import traceback
traceback.print_exc()
consumer.close()
producer.close()
Potential pitfalls when using yield
in process()
When the process()
outputs its messages using yield
, it is not possible to also use return
. Having defined a yield
means the output of process()
will be treated as a generator, so also returning a message will either break the Brick or lead to incorrect output messages.
If process()
raises an Exception
after having yielded messages, the input message will not be backed. This will result in duplicate message being produced to the output topic.
Persisting data
Python script Bricks can use a key value store. The example below shows how to insert data into the store and how to retrieve it:
store.insert("key", "value")
print(store.get("key"))
this will print "value"
Using the Reader:
In some scenarios, it can be useful to read batches of data instead of going through the messages one by one. You can use Workflow.Reader
for this.
from workflow import Reader
with Reader.connect(timeout=1, topic=topic) as reader:
reader.batch_get(size, offset)
for msg in reader:
print(msg.payload)
The reader will go through the messages in a first-in-first-out order by default. You can change this to LIFO by setting the size
argument of reader.batch_get
to a negative number.
Write access
Write access for Python Bricks is restricted to the /tmp
directory.