Within a Benthos configuration, in between input
and output
, is a pipeline
section. This section describes an array of processors that are to
be applied to all messages, and are not bound to any particular input or
output.
If you have processors that are heavy on CPU and aren’t specific to a certain input or output they are best suited for the pipeline section. It is advantageous to use the pipeline section as it allows you to set an explicit number of parallel threads of execution which should ideally match the number of available logical CPU cores.
If a buffer is chosen these processors are applied to messages read from it. It is therefore possible to use buffers as a way of distributing messages from a single input across multiple parallel processing threads.
The following are some examples of how to get good performance out of your processing pipelines.
Sometimes a source of data can only have a single consuming client. In these cirumstances it is still possible to have the single stream of data processed on parallel processing threads by using a buffer.
For example, say we have an input stream foo
with only a single connected
client. Our goal is to read the stream as fast as possible, perform mutations on
the JSON payload using the jmespath processor, and write
the resulting stream to bar
.
The messages from foo
are At-Most-Once, and so we are not
concerned with delivery guarantees and want to focus on performance instead. We
have four logical CPU cores on our server and wish to dedicate them all to
processing the data. We believe that the bar
output will be fast enough to
keep up with the stream with a single connection.
We set our number of processing threads to four in order to match the CPU cores
available. We also chose a memory
buffer since it is the fastest buffer
option, with a size of 5MB which we have determined to be more than enough to
fit four messages of the stream at any given time.
input:
type: foo
buffer:
type: memory
memory:
limit: 5000000
pipeline:
threads: 4
processors:
- type: jmespath
jmespath:
query: "reservations[].instances[].[tags[?Key=='Name'].Values[] | [0], type, state.name]"
output:
type: bar
With this config the pipeline within our Benthos instance would look something like the following:
foo -> memory buffer ---> processor ---> bar
( 5MB ) \--> processor -/
\--> processor -/
\--> processor -/
Sometimes our source of data can have many multiple connected clients and will distribute a stream of messages amongst them. In these circumstances it is possible to fully utilise a set of parallel processing threads without a buffer, provided that the number of consumers is greater than the number of threads. Ideally the number of consumers would be significantly higher than the number of threads in order to compensate for IO blocking.
For example, imagine we have a similar requirement to example 1 but are
consuming from an input baz
, which is At-Least-Once and supports
multiple connected clients. We wish to take advantage of the delivery guarantees
of the source and therefore want acknowledgements to flow directly from our
output sink all the way up the pipeline to the input source.
For this purpose we would be able to utilise our processing threads without the
need for a buffer. We choose four processing threads like before, and choose to
use eight parallel consumers of the input baz
.
input:
type: broker
broker:
copies: 8
inputs:
- type: baz
buffer:
type: none
pipeline:
threads: 4
processors:
- type: jmespath
jmespath:
query: "reservations[].instances[].[tags[?Key=='Name'].Values[] | [0], type, state.name]"
output:
type: bar
With this config the pipeline within our Benthos instance would look something like the following:
baz -\
baz -\
baz ---> processor ---> bar
baz ---> processor -/
baz ---> processor -/
baz ---> processor -/
baz -/
baz -/