Call a Python function on JSON from RabbitMQ. Supports context managers for resource management and coroutines for getting fancy.
The simplest way to use rmq_py_caller is the CLI. To start, install the package:
pip install git+https://github.com/wbadart/rmq_py_caller.gitYou can now call the package directly from the command line with python -m rmq_py_caller, just make sure to set the following environment variables:
| Environment Variable | Description |
|---|---|
PY_TARGET |
The name of the function or coroutine to call* |
PY_SETUP |
(Optional) Initialization code, such as importing the function |
ARG_ADAPTER |
(Default: [.]) A jq program mapping input data to *args list |
(* PY_TARGET can also identify a context manager, provided its __enter__
method returns the function of interest. This is useful if the function
requires some setup before running — such as loading a data file —
or teardown afterwards. See
examples/basic_contextmanager/ for details.)
For example:
PY_TARGET='len' python -m rmq_py_caller < data.ndjsonwill compute the length of each array or object listed in the
newline-delimited JSON file data.ndjson. ARG_ADAPTER should be an
array which arranges the arguments to PY_TARGET (the default value, [.],
passes the whole input object as the first argument to PY_TARGET). To
illustrate, here's a slightly more involved example where PY_TARGET takes two
arguments:
PY_SETUP='from operator import add' \
PY_TARGET='add' \
ARG_ADAPTER='[.a, .b]' \
python -m rmq_py_caller < data.ndjsonThis setup will, for each object in data.ndjson, compute the sum of the
object's a and b attributes (corresponding to the call add(obj["a"], obj["b"]), aka obj["a"] + obj["b"]).
Processing JSON data from stdin is useful for debugging, but you're probably here because you want to process JSON data from RabbitMQ. If this is the case, the provided Docker image is the way to go.
The container uses the environment variables listed above, and also expects:
| Environment Variable | Description |
|---|---|
INPUT_QUEUE |
Queue from which to grab input data |
OUTPUT_EXCHANGE |
Exchange to which results are published |
OUTPUT_ROUTING_KEY |
(Optional) Routing key with which to publish results |
RABTAP_AMQPURI |
Location of broker (see the rabtap URI spec) |
INPUT_ADAPTER |
(Optional) Adjusts how RabbitMQ metadata is passed to Python |
OUTPUT_ADAPTER |
(Default: .result) jq post-processing program |
PUBLISH_NULL |
(Default: 0) If 1, publish null results too |
To continue the len example from above:
docker run --rm -it \
-e PY_TARGET='len' \
-e INPUT_QUEUE='data_in' \
-e OUTPUT_EXCHANGE='data_out' \
-e RABTAP_AMQPURI='amqp://guest:[email protected]:5672/' \
wbadart/rmq_py_callerThis says: for each array or object posted to the data_in queue, compute its
length and publish the result to the data_out exchange. Note that inside a
container, host.docker.internal resolves to the IP address of the Docker
host; you can use it to access services you'd get via localhost outside the
container.
By default, rmq_py_caller publishes return value of PY_TARGET to the given
OUTPUT_EXCHANGE. Use the OUTPUT_ADAPTER variable to customize the published
result. OUTPUT_ADAPTER is a jq program which takes a JSON object with two
keys, result (return value of PY_TARGET) and orig (the input data), and
returns the desired result for publishing. For example, you can enrich the
input object with the result with a merge operation (*):
docker run --rm -it \
-e OUTPUT_ADAPTER='.orig * {enrichments: {num_keys: .result}}' \
-e PY_TARGET='len' \
-e INPUT_QUEUE='data_in' \
-e OUTPUT_EXCHANGE='data_out' \
-e RABTAP_AMQPURI='amqp://guest:[email protected]:5672/' \
wbadart/rmq_py_callerNote in this case that inputs should be objects so that orig can be merged
with the object {enrichments: {num_keys: ...}}.
Finally, here's a sample docker-compose.yml file you could use to run the
above as a Docker service:
version: "3.4"
services:
my_len_enrichment:
image: wbadart/rmq_py_caller
environment:
OUTPUT_ADAPTER: >-
.orig * {
enrichments: {
num_keys: .result
}
}
PY_TARGET: "len"
INPUT_QUEUE: "data_in"
OUTPUT_EXCHANGE: "data_out"
RABTAP_AMQPURI: "amqp://guest:[email protected]:5672/"I use a YAML multi-line string here to show how you might handle more
involved OUTPUT_ADAPTERs or ARG_ADAPTERs.
Please check out examples/ for further instruction and
inspiration.