Datastream Module

Datastream Module is useful for intra and inter-server communication and asynchronous data processing. It is an important building block for other DataKernel modules.

DataStream is:

  • Modern implementation of async reactive streams (unlike streams in Java 8 and traditional thread-based blocking streams)
  • Asynchronous with extremely efficient back-pressure control, to handle a natural imbalance in the speed of data sources
  • Composable stream operations (mappers, reducers, filters, sorters, mergers/splitters, compression, serialization)
  • Stream-based network and file I/O on top of Eventloop module
  • Compatibility with CSP module

Datastream has a lot in common with the CSP module. Although they both were designed for I/O processing, there are several important distinctions:

  Datastream CSP
Overhead: Extremely low: stream can be started with 1 virtual call, short-circuit evaluation optimizes performance No short-circuit evaluation, overhead is higher
Throughput speed: Extremely fast Fast, but slower than Datastream
Optimized for: Small pieces of data Medium-sized objects, ByteBufs
Programming model: More complicated Simple and convenient

To provide maximum efficiency, our framework widely utilizes combinations of CSP and Datastream. For this purpose, ChannelSupplier, ChannelConsumer, StreamSupplier and StreamConsumer have transformWith() methods and special Transformer interfaces. Using them, you can seamlessly transform channels into other channels or datastreams and vice versa, creating chains of such transformations.


We’ve measured Datastream performance (StreamSupplier streams 100M Integer objects to StreamConsumer scenario) and received the following result:

Time: 2771ms; Average time: 277.1ms; Best time: 275ms; Worst time: 281ms; Operations per second: 360880548

We’ve also measured TCP server performance that uses both Datastream and CSP and got the average result of 47495905 requests per second.


You can add Datastream module to your project by inserting dependency in pom.xml:



  • Simple Supplier - shows how to create a simple custom Supplier and stream some data to Consumer.
  • Simple Consumer - shows how to create a simple custom Consumer.
  • Custom Transformer - shows how to create a custom StreamTransformer, which takes strings and transforms them to their length if it is less than MAX_LENGTH.
  • Built-in Stream Nodes Example - demonstrates some of the built-in Datastream possibilities, such as filtering, sharding, and mapping.
Note: To run the examples, you need to clone DataKernel from GitHub:
$ git clone
And import it as a Maven project. Check out branch v3.1. Before running the examples, build the project.
These examples are located at datakernel -> examples -> core -> datastream.

Simple Supplier

When you run SupplierExample, you’ll see the following output:

Consumer received: [0, 1, 2, 3, 4]

This output represents the data that our custom StreamSupplier provided to StreamConsumer. Let’s have a look at the implementation:

public final class SupplierExample {
	public static void main(String[] args) {

		//create an eventloop for streams operations
		Eventloop eventloop = Eventloop.create().withCurrentThread();
		//create a supplier of some numbers
		StreamSupplier<Integer> supplier = StreamSupplier.of(0, 1, 2, 3, 4);
		//creating a consumer for our supplier
		StreamConsumerToList<Integer> consumer = StreamConsumerToList.create();

		//streaming supplier's numbers to consumer

		//when stream completes, streamed data is printed out
		consumer.getResult().whenResult(result -> System.out.println("Consumer received: " + result));

		//start eventloop;

Simple Consumer

When you run ConsumerExample, you’ll see the following output:

received: 1
received: 2
received: 3
End of stream received

ConsumerExample extends AbstractStreamConsumer and just prints out received data. The stream process is managed with overridden methods onStarted(), onEndOfStream() and onError():

public final class ConsumerExample<T> extends AbstractStreamConsumer<T> {
	protected void onStarted() {
		getSupplier().resume(x -> System.out.println("received: " + x));

	protected Promise<Void> onEndOfStream() {
		System.out.println("End of stream received");
		return Promise.complete();

	protected void onError(Throwable t) {
		System.out.println("Error handling logic must be here. No confirmation to upstream is needed");

Custom Transformer

TransformerExample shows how to create a custom StreamTransformer which takes strings from input stream and transforms them to their length if it is less than defined MAX_LENGTH. First, we define AbstractStreamConsumer and AbstractStreamSupplier:

private final AbstractStreamConsumer<String> inputConsumer = new AbstractStreamConsumer<String>() {

	protected Promise<Void> onEndOfStream() {
		return Promise.complete();

	protected void onError(Throwable t) {
		System.out.println("Error handling logic must be here. No confirmation to upstream is needed");

private final AbstractStreamSupplier<Integer> outputSupplier = new AbstractStreamSupplier<Integer>() {

	protected void onSuspended() {

	protected void produce(AsyncProduceController async) {
				.resume(item -> {
					int len = item.length();
					if (len < MAX_LENGTH) {

	protected void onError(Throwable t) {
		System.out.println("Error handling logic must be here. No confirmation to upstream is needed");

Now we define the main method, which creates a supplier of test data, an instance of TransformerExample and StreamConsumerToList. Next, we define the sequence of transformation and output:

public static void main(String[] args) {
	Eventloop eventloop = Eventloop.create().withCurrentThread().withFatalErrorHandler(rethrowOnAnyError());

	StreamSupplier<String> source = StreamSupplier.of("testdata", "testdata1", "testdata1000");
	TransformerExample transformer = new TransformerExample();
	StreamConsumerToList<Integer> consumer = StreamConsumerToList.create();


So, if you run the example, you’ll receive the following output:

[8, 9]

Built-in Stream Nodes

BuiltinStreamNodesExample demonstrates some simple examples of utilizing built-in datastream nodes. If you run the example, you’ll receive the following output:

[1 times ten = 10, 2 times ten = 20, 3 times ten = 30, 4 times ten = 40, 5 times ten = 50, 6 times ten = 60, 7 times ten = 70, 8 times ten = 80, 9 times ten = 90, 10 times ten = 100]
third: [2, 5, 8]
second: [1, 4, 7, 10]
first: [3, 6, 9]
[1, 3, 5, 7, 9]

The first line is a result of StreamMapper:

private static void mapper() {
	//creating a supplier of 10 numbers
	StreamSupplier<Integer> supplier = StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

	//creating a mapper for the numbers
	StreamMapper<Integer, String> simpleMap = StreamMapper.create(x -> x + " times ten = " + x * 10);

	//creating a consumer which converts received values to list
	StreamConsumerToList<String> consumer = StreamConsumerToList.create();

	//applying the mapper to supplier and streaming the result to consumer

	//when consumer completes receiving values, the result is printed out

The next three lines of the output are results of utilizing StreamSharder:

private static void sharder() {
	StreamSupplier<Integer> supplier = StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

	//creating a sharder of three parts for three consumers
	StreamSharder<Integer> sharder = StreamSharder.create(new HashSharder<>(3));

	StreamConsumerToList<Integer> first = StreamConsumerToList.create();
	StreamConsumerToList<Integer> second = StreamConsumerToList.create();
	StreamConsumerToList<Integer> third = StreamConsumerToList.create();



	first.getResult().whenResult(x -> System.out.println("first: " + x));
	second.getResult().whenResult(x -> System.out.println("second: " + x));
	third.getResult().whenResult(x -> System.out.println("third: " + x));

The last line of the output is a result of utilizing StreamFilter, which filters numbers and leaves only odd ones:

private static void filter() {
	StreamSupplier<Integer> supplier = StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

	StreamFilter<Integer> filter = StreamFilter.create(input -> input % 2 == 1);

	StreamConsumerToList<Integer> consumer = StreamConsumerToList.create();