CSP (stands for Communicating Sequential Process) provides I/O communication between channels and was inspired by Go
High performance and throughput speed
Optimized for working with medium-sized objects (like ByteBufs)
CSP has reach DSL, which provides a simple programming model
Has an asynchronous back pressure management
Channel Supplier and Channel Consumer
CSP communication is conducted with ChannelSupplier and ChannelConsumer, which provide and accept some data
respectively. Each consecutive request to these channels should be called only after the previous request finishes, and
Promises are utilized to manage it.
ChannelSupplier has a get() method which returns a Promise of provided value. Until this Promise is
completed either with a result or with an exception, the method shouldn’t be called again. Also note, that if get() returns
Promise of null, this represents end of stream and no additional data should be requested from this supplier.
ChannelConsumer has an accept(@Nullable T value) method which returns a Promise of null as a marker of
completion of the accepting. Until this Promise is completed, accept() method mustn’t be called again. By analogy
with the ChannelSupplier, if null value is accepted, it represents end of stream.
Here is a example of communication between Consumer and Supplier:
Another important concept of CSP is ChannelQueue interface and its implementations: ChannelBuffer and
ChannelZeroBuffer. They provide communication between Consumers and Suppliers and allow to create chains of these
pipes if needed.
Basically, these buffers pass objects which were consumed by Consumer to Supplier as soon as the queue
gets a free space. This process is controlled by Promises. You can manually set the size for ChannelBuffer.
ChannelZeroBuffer doesn’t store any values but simply passes them one by one from Consumer to Supplier.
Here is a simple example of working with buffers of items:
Comparison to Datastream
CSP has a lot in common with Datastream module.
Although they were both designed for I/O processing, there are several important distinctions:
Extremely low: stream can be started with 1 virtual call, short-circuit evaluation optimizes performance
No short-circuit evaluation, overhead is higher
Fast, but slower than Datastream
Simple and convenient
To provide maximum efficiency, our framework widely utilizes combinations of CSP and Datastream. For this purpose,
ChannelSupplier, ChannelConsumer, StreamSupplier and StreamConsumer have transformWith() methods and special
Transformer interfaces. Using them, you can seamlessly transform channels into other channels or datastreams and vice
versa, creating chains of such transformations.
You can add CSP module to your project by inserting dependency in pom.xml:
To run the examples, you need to clone DataKernel from GitHub:
$ git clone https://github.com/softindex/datakernel And import it as a Maven project. Before running the examples, build the project.
Basic Channel Example
Channel Example shows interaction between suppliers and consumers using streamTo and some helper methods:
Thus, if you run this example, you’ll receive the following output:
As it was mentioned before, there are two ChannelQueue implementations: ChannelBuffer and ChannelZeroBuffer,
both of them manage communication between Providers and Suppliers.
You can manually set the size of ChannelBuffer, whereas ChannelZeroBuffer size is always 0.
To give you a better understanding of how all these Buffers work, let’s have a simple example. Assume there is a Granny
who wants to give her Grandson 25 Apples. That’s quite a lot, so she first puts the Apples on a big Plate,
which can place up to 10 apples simultaneously. When the Plate is full, Grandson should first take at least one apple,
and only after that Granny can put a new Apple to the Plate:
On the next day Granny wants to give Apples to her Grandson again, but this time there are only 10
Apples. So there is no real need in the plate: Granny can simply pass the Apples to her Grandson
one by one:
You can create chains of transformations of data that is provided by ChannelSupplier. Use transformWith method
and predefined CSP chunkers, compressors, decompressors etc. In this example we will transform suppliers ByteBufs,
chunk, compress and decompress them:
This example demonstrates how to work with files with asynchronous approach using Promises and CSP built-in
consumers and suppliers. This example writes two lines to the file with ChannelFileWriter, and then reads and prints
them out utilizing ChannelFileReader:
If you run the example, you’ll see the content of the created file: