Datastream Module

Datastream Module is useful for intra- and inter-server communication and asynchronous data processing. It is an important building block for other DataKernel modules.

DataStream is:

  • Modern implementation of async reactive streams (unlike streams in Java 8 and traditional thread-based blocking streams)
  • Asynchronous with extremely efficient congestion control, to handle natural imbalance in speed of data sources
  • Composable stream operations (mappers, reducers, filters, sorters, mergers/splitters, compression, serialization)
  • Stream-based network and file I/O on top of Eventloop module
  • Compatibility with CSP module

Datastream has a lot in common with CSP module. Although they both were designed for I/O processing, there are several important distinctions:

  Datastream CSP
Overhead: Extremely low: stream can be started with 1 virtual call, short-circuit evaluation optimizes performance No short-circuit evaluation, overhead is higher
Throughput speed: Extremely fast Fast, but slower than Datastream
Optimized for: Small pieces of data Medium-sized objects, ByteBufs
Programming model: More complicated Simple and convenient

To provide maximum efficiency, our framework widely utilizes combinations of CSP and Datastream. For this purpose, ChannelSupplier, ChannelConsumer, StreamSupplier and StreamConsumer have transformWith() methods and special Transformer interfaces. Using them, you can seamlessly transform channels into other channels or datastreams and vice versa, creating chains of such transformations.

You can add Datastream module to your project by inserting dependency in pom.xml:

<dependency>
    <groupId>io.datakernel</groupId>
    <artifactId>datakernel-datastream</artifactId>
    <version>3.0.0-SNAPSHOT</version>
</dependency>


Examples

  • Simple Supplier - shows how to create a simple custom Supplier and stream some data to Consumer.
  • Simple Consumer - shows how to create a simple custom Consumer.
  • Custom Transformer - shows how to create a custom StreamTransformer, which takes strings and transforms them to their length if it is less than MAX_LENGTH.
  • Built-in Stream Nodes Example - demonstrates some of built-in Datastream possibilities, such as filtering, sharding and mapping.
Note: To run the examples, you need to clone DataKernel from GitHub:
$ git clone https://github.com/softindex/datakernel
And import it as a Maven project. Before running the examples, build the project.

Simple Supplier

When you run SupplierExample, you’ll see the following output:

Consumer received: [0, 1, 2, 3, 4]

This output represents, what data our custom StreamSupplier provided to StreamConsumer. Let’s have a look at the implementation:

public final class SupplierExample {
	public static void main(String[] args) {

		//create an eventloop for streams operations
		Eventloop eventloop = Eventloop.create().withCurrentThread();
		//create a supplier of some numbers
		StreamSupplier<Integer> supplier = StreamSupplier.of(0, 1, 2, 3, 4);
		//creating a consumer for our supplier
		StreamConsumerToList<Integer> consumer = StreamConsumerToList.create();

		//streaming supplier's numbers to consumer
		supplier.streamTo(consumer);

		//when stream completes, streamed data is printed out
		consumer.getResult().whenResult(result -> System.out.println("Consumer received: " + result));

		//start eventloop
		eventloop.run();
	}
}

Simple Consumer

When you run ConsumerExample, you’ll see the following output:

received: 1
received: 2
received: 3
End of stream received

ConsumerExample extends AbstractStreamConsumer and just prints out received data. The stream process is managed with overridden methods onStarted(), onEndOfStream() and onError():

public final class ConsumerExample<T> extends AbstractStreamConsumer<T> {
	@Override
	protected void onStarted() {
		getSupplier().resume(x -> System.out.println("received: " + x));
	}

	@Override
	protected Promise<Void> onEndOfStream() {
		System.out.println("End of stream received");
		return Promise.complete();
	}

	@Override
	protected void onError(Throwable t) {
		System.out.println("Error handling logic must be here. No confirmation to upstream is needed");
	}

Custom Transformer

TransformerExample shows how to create a custom StreamTransformer which takes strings from input stream and transforms them to their length if it is less than defined MAX_LENGTH. First, we define AbstractStreamConsumer and AbstractStreamSupplier:

private final AbstractStreamConsumer<String> inputConsumer = new AbstractStreamConsumer<String>() {

	@Override
	protected Promise<Void> onEndOfStream() {
		outputSupplier.sendEndOfStream();
		return Promise.complete();
	}

	@Override
	protected void onError(Throwable t) {
		System.out.println("Error handling logic must be here. No confirmation to upstream is needed");
	}
};

private final AbstractStreamSupplier<Integer> outputSupplier = new AbstractStreamSupplier<Integer>() {

	@Override
	protected void onSuspended() {
		inputConsumer.getSupplier().suspend();
	}

	@Override
	protected void produce(AsyncProduceController async) {
		inputConsumer.getSupplier()
				.resume(item -> {
					int len = item.length();
					if (len < MAX_LENGTH) {
						send(len);
					}
				});
	}

	@Override
	protected void onError(Throwable t) {
		System.out.println("Error handling logic must be here. No confirmation to upstream is needed");
	}
};

Now we define main method, which creates a supplier of test data, an instance of TransformerExample and StreamConsumerToList. Next, we define the sequence of transformation and output:

public static void main(String[] args) {
	Eventloop eventloop = Eventloop.create().withCurrentThread().withFatalErrorHandler(rethrowOnAnyError());

	StreamSupplier<String> source = StreamSupplier.of("testdata", "testdata1", "testdata1000");
	TransformerExample transformer = new TransformerExample();
	StreamConsumerToList<Integer> consumer = StreamConsumerToList.create();

	source.transformWith(transformer).streamTo(consumer);
	consumer.getResult().whenResult(System.out::println);

	eventloop.run();
}

So, if you run the example, you’ll receive the following output:

[8, 9]

Built-in Stream Nodes

BuiltinStreamNodesExample demonstrates some simple examples of utilizing built-in datastream nodes. If you run the example, you’ll receive the following output:

[1 times ten = 10, 2 times ten = 20, 3 times ten = 30, 4 times ten = 40, 5 times ten = 50, 6 times ten = 60, 7 times ten = 70, 8 times ten = 80, 9 times ten = 90, 10 times ten = 100]
third: [2, 5, 8]
second: [1, 4, 7, 10]
first: [3, 6, 9]
[1, 3, 5, 7, 9]

The first line is a result of StreamMapper:

private static void mapper() {
	//creating a supplier of 10 numbers
	StreamSupplier<Integer> supplier = StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

	//creating a mapper for the numbers
	StreamMapper<Integer, String> simpleMap = StreamMapper.create(x -> x + " times ten = " + x * 10);

	//creating a consumer which converts received values to list
	StreamConsumerToList<String> consumer = StreamConsumerToList.create();

	//applying the mapper to supplier and streaming the result to consumer
	supplier.transformWith(simpleMap).streamTo(consumer);

	//when consumer completes receiving values, the result is printed out
	consumer.getResult().whenResult(System.out::println);
}

The next three lines of the output are results of utilizing StreamSharder:

private static void sharder() {
	StreamSupplier<Integer> supplier = StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

	//creating a sharder of three parts for three consumers
	StreamSharder<Integer> sharder = StreamSharder.create(new HashSharder<>(3));

	StreamConsumerToList<Integer> first = StreamConsumerToList.create();
	StreamConsumerToList<Integer> second = StreamConsumerToList.create();
	StreamConsumerToList<Integer> third = StreamConsumerToList.create();

	sharder.newOutput().streamTo(first);
	sharder.newOutput().streamTo(second);
	sharder.newOutput().streamTo(third);

	supplier.streamTo(sharder.getInput());

	first.getResult().whenResult(x -> System.out.println("first: " + x));
	second.getResult().whenResult(x -> System.out.println("second: " + x));
	third.getResult().whenResult(x -> System.out.println("third: " + x));
}

The last line of the output is a result of utilizing StreamFilter, which filters numbers and leaves only odd ones:

private static void filter() {
	StreamSupplier<Integer> supplier = StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

	StreamFilter<Integer> filter = StreamFilter.create(input -> input % 2 == 1);

	StreamConsumerToList<Integer> consumer = StreamConsumerToList.create();

	supplier.transformWith(filter).streamTo(consumer);

	consumer.getResult().whenResult(System.out::println);
}