CSP Module

CSP (stands for Communicating Sequential Process) provides I/O communication between channels and was inspired by the Go language approach.

Features:

  • High performance and throughput speed
  • Optimized for working with medium-sized objects (like ByteBufs)
  • CSP has reach DSL, which provides a simple programming model
  • Has an asynchronous back pressure management

Channel Supplier and Channel Consumer

CSP communication is conducted with ChannelSupplier and ChannelConsumer, which provide and accept some data respectively. Each consecutive request to these channels should be called only after the previous request finishes, and Promises are utilized to manage it.

ChannelSupplier has a get() method that returns a Promise of provided value. Until this Promise is completed either with a result or with an exception, the method shouldn’t be called again. Also note, that if get() returns Promise of null, this represents the end of the stream and no additional data should be requested from this supplier.

ChannelConsumer has an accept(@Nullable T value) method which returns a Promise of null as a marker of completion of the accepting. Until this Promise is completed, accept() method should not be called again. By analogy with the ChannelSupplier, if a null value is accepted, it represents the end of the stream.

Here is an example of communication between Consumer and Supplier:

protected void doProcess() {
	input.get()
			.whenComplete((data, e) -> {
				if (data == null) {
					output.accept(null)
							.whenResult($ -> completeProcess());
				} else {
					data = data.toUpperCase() + '(' + data.length() + ')';

					output.accept(data)
							.whenResult($ -> doProcess());
				}
			});
}

Channel Queue

Another important concept of CSP is ChannelQueue interface and its implementations: ChannelBuffer and ChannelZeroBuffer. They provide communication between Consumers and Suppliers and allow them to create chains of these pipes if needed. Basically, these buffers pass objects which were consumed by Consumer to Supplier as soon as the queue gets a free space. This process is controlled by Promises. You can manually set the size for ChannelBuffer. ChannelZeroBuffer doesn’t store any values but simply passes them one by one from Consumer to Supplier.

Here is a simple example of working with buffers of items:

public void accept(T item) {
	buffer.add(item);
	if (buffer.isSaturated()) {
		getSupplier().suspend();
	}
}

void produce() {
	while (!buffer.isEmpty()) {
		T item = buffer.poll();
		if (item != null) {
			send(item);
		} else {
			sendEndOfStream();
		}
	}
}

Comparison to Datastream

CSP has a lot in common with the Datastream module. Although they were both designed for I/O processing, there are several important distinctions:

  Datastream CSP
Overhead: Extremely low: stream can be started with 1 virtual call, short-circuit evaluation optimizes performance No short-circuit evaluation, overhead is higher
Throughput speed: Extremely fast Fast, but slower than Datastream
Programming model: More complicated Simple and convenient

To provide maximum efficiency, our framework widely utilizes combinations of CSP and Datastream. For this purpose, ChannelSupplier, ChannelConsumer, StreamSupplier and StreamConsumer have transformWith() methods and special Transformer interfaces. Using them, you can seamlessly transform channels into other channels or datastreams and vice versa, creating chains of such transformations.

Benchmark

We’ve measured CSP performance (ChannelSupplier streams 50M Integer objects to ChannelConsumer scenario) and received the following result:

Time: 4720ms; Average time: 472.0ms; Best time: 469ms; Worst time: 475ms; Operations per second: 105932203 

We’ve also measured TCP server performance that uses both CSP and Datastream and got the average result of 47495905 requests per second.

Integration

You can add CSP module to your project by inserting dependency in pom.xml:

<dependency>
    <groupId>io.datakernel</groupId>
    <artifactId>datakernel-csp</artifactId>
    <version>3.1.0</version>
</dependency>

Examples

Note: To run the examples, you need to clone DataKernel from GitHub:
$ git clone https://github.com/softindex/datakernel
And import it as a Maven project. Check out branch v3.1. Before running the examples, build the project.
These examples are located at datakernel -> examples -> core -> csp.

Basic Channel Example

Channel Example shows the interaction between suppliers and consumers using streamTo and some helper methods:

private static void supplierOfValues() {
	ChannelSupplier.of("1", "2", "3", "4", "5")
			.streamTo(ChannelConsumer.ofConsumer(System.out::println));
}

private static void supplierOfList(List<String> list) {
	ChannelSupplier.ofIterable(list)
			.streamTo(ChannelConsumer.ofConsumer(System.out::println));
}

private static void map() {
	ChannelSupplier.of(1, 2, 3, 4, 5)
			.map(integer -> integer + " times 10 = " + integer * 10)
			.streamTo(ChannelConsumer.ofConsumer(System.out::println));
}

private static void toCollector() {
	ChannelSupplier.of(1, 2, 3, 4, 5)
			.toCollector(Collectors.toList())
			.whenResult(System.out::println);
}

private static void filter() {
	ChannelSupplier.of(1, 2, 3, 4, 5, 6)
			.filter(integer -> integer % 2 == 0)
			.streamTo(ChannelConsumer.ofConsumer(System.out::println));
}

Thus, if you run this example, you’ll receive the following output:

1
2
3
4
5
One
Two
Three
1 times 10 = 10
2 times 10 = 20
3 times 10 = 30
4 times 10 = 40
5 times 10 = 50
[1, 2, 3, 4, 5]
2
4
6
See full example on GitHub

CSP Example

This example represents an AsyncProcess between ChannelSupplier and ChannelConsumer. In this example ChannelSupplier represents an input and ChannelConsumer - output:

public final class CspExample extends AbstractCommunicatingProcess implements WithChannelTransformer<CspExample, String, String> {
	private ChannelSupplier<String> input;
	private ChannelConsumer<String> output;

	@Override
	public ChannelOutput<String> getOutput() {
		return output -> {
			this.output = output;
			if (this.input != null && this.output != null) startProcess();
		};
	}

	@Override
	public ChannelInput<String> getInput() {
		return input -> {
			this.input = input;
			if (this.input != null && this.output != null) startProcess();
			return getProcessCompletion();
		};
	}

	@Override
	//[START REGION_1]
	protected void doProcess() {
		input.get()
				.whenComplete((data, e) -> {
					if (data == null) {
						output.accept(null)
								.whenResult($ -> completeProcess());
					} else {
						data = data.toUpperCase() + '(' + data.length() + ')';

						output.accept(data)
								.whenResult($ -> doProcess());
					}
				});
	}
	//[END REGION_1]

	@Override
	protected void doClose(Throwable e) {
		System.out.println("Process has been closed with exception: " + e);
		input.close(e);
		output.close(e);
	}

	public static void main(String[] args) {
		CspExample process = new CspExample();
		ChannelSupplier.of("hello", "world", "nice", "to", "see", "you")
				.transformWith(process)
				.streamTo(ChannelConsumer.ofConsumer(System.out::println));
	}
}

This process takes a string, sets it to upper-case and adds string’s length in parentheses:

HELLO(5)
WORLD(5)
NICE(4)
TO(2)
SEE(3)
YOU(3)
See this example on GitHub

Channel Buffer Example

As it was mentioned before, there are two ChannelQueue implementations: ChannelBuffer and ChannelZeroBuffer, both of them manage communication between Providers and Suppliers. You can manually set the size of ChannelBuffer, whereas ChannelZeroBuffer size is always 0.

To give you a better understanding of how all these Buffers work, let’s have a simple example. Assume there is a Granny who wants to give her Grandson 25 Apples. That’s quite a lot, so she first puts the Apples on a big Plate, which can place up to 10 apples simultaneously. When the Plate is full, Grandson should first take at least one apple, and only after that Granny can put a new Apple to the Plate:

static final class ChannelBufferStream {
	public static void main(String[] args) {
		Eventloop eventloop = Eventloop.create().withCurrentThread();

		ChannelBuffer<Integer> plate = new ChannelBuffer<>(5, 10);
		ChannelSupplier<Integer> granny = plate.getSupplier();
		Promises.loop(0,
				apple -> apple < 25,
				apple -> plate.put(apple).map($ -> {
					System.out.println("Granny gives apple   #" + apple);
					return apple + 1;
				}));
		granny.streamTo(ChannelConsumer.ofConsumer(apple -> System.out.println("Grandson takes apple #" + apple)));
		eventloop.run();
	}
}

On the next day Granny wants to give Apples to her Grandson again, but this time there are only 10 Apples. So there is no real need of the plate: Granny can simply pass the Apples to her Grandson one by one:

static final class ChannelBufferZeroExample {
	public static void main(String[] args) {
		Eventloop eventloop = Eventloop.create().withCurrentThread();

		ChannelQueue<Integer> buffer = new ChannelZeroBuffer<>();
		ChannelSupplier<Integer> granny = buffer.getSupplier();

		Promises.loop(0,
				apple -> apple < 10,
				apple -> buffer.put(apple).map($ -> {
					System.out.println("Granny gives apple   #" + apple);
					return apple + 1;
				}));

		granny.streamTo(ChannelConsumer.<Integer>ofConsumer((apple) ->
				System.out.println("Grandson takes apple #" + apple)).async());

		eventloop.run();
	}
}
See full example on GitHub


ChannelSplitter Example

In this example we use predefined ChannelSplitter. The Splitter allows splitting data from one input to several outputs. In our case output will be split into three ChannelConsumers:

public class SplitterExample {
	public static void main(String[] args) {
		Eventloop eventloop = Eventloop.create().withCurrentThread();
		List<Integer> integers = Stream.iterate(1, (i) -> i + 1)
				.limit(5)
				.collect(Collectors.toList());

		Queue<Integer> result = new ConcurrentLinkedQueue<>();
		ChannelSplitter<Integer> splitter = ChannelSplitter.create(ChannelSupplier.ofIterable(integers));

		for (int i = 0; i < 3; i++) {
			splitter.addOutput()
					.set(ChannelConsumer.of(AsyncConsumer.of(result::offer)).async());
		}

		eventloop.run();
		while (!result.isEmpty()) {
			System.out.println(result.poll());
		}
	}
}
See full example on GitHub

CSP Transformations

You can create chains of transformations of data that is provided by ChannelSupplier. Use transformWith method and predefined CSP chunkers, compressors, decompressors etc. In this example we will transform suppliers ByteBufs, chunk, compress and decompress them:

int buffersCount = 100;

List<ByteBuf> buffers = IntStream.range(0, buffersCount).mapToObj($ -> createRandomByteBuf()).collect(toList());
byte[] expected = buffers.stream().map(ByteBuf::slice).collect(ByteBufQueue.collector()).asArray();

ChannelSupplier<ByteBuf> supplier = ChannelSupplier.ofIterable(buffers)
		.transformWith(ChannelByteChunker.create(MemSize.of(64), MemSize.of(128)))
		.transformWith(ChannelLZ4Compressor.createFastCompressor())
		.transformWith(ChannelByteChunker.create(MemSize.of(64), MemSize.of(128)))
		.transformWith(ChannelLZ4Decompressor.create());

ByteBuf collected = await(supplier.toCollector(ByteBufQueue.collector()));
assertArrayEquals(expected, collected.asArray());
See full example on GitHub

Channel File Example

This example demonstrates how to work with files with asynchronous approach using Promises and CSP built-in consumers and suppliers. This example writes two lines to the file with ChannelFileWriter, and then reads and prints them out utilizing ChannelFileReader:

@NotNull
private static Promise<Void> writeToFile() {
	return ChannelSupplier.of(
			ByteBufStrings.wrapAscii("Hello, this is example file\n"),
			ByteBufStrings.wrapAscii("This is the second line of file\n"))
			.streamTo(ChannelFileWriter.open(executor, PATH, WRITE));
}

@NotNull
private static Promise<Void> readFile() {
	return ChannelFileReader.open(executor, PATH)
			.map(cfr -> cfr.withBufferSize(MemSize.bytes(10)))
			.then(cfr -> cfr.streamTo(ChannelConsumer.ofConsumer(buf -> System.out.print(buf.asString(UTF_8)))));

}

If you run the example, you’ll see the content of the created file:

Hello, this is example file
This is the second line of file
See full example on GitHub

Custom CSP

This example describes how to create a custom communicating process with one input and two outputs channels. It will bifurcate an input item and send it to both output channels.

In order to create a custom CSP, you need to create a class which extends AbstractCommunicationProcess and has one input and two outputs:

public class ChannelBifurcator<T> extends AbstractCommunicatingProcess
		implements WithChannelInput<ChannelBifurcator<T>, T> {

	ChannelConsumer<T> first;
	ChannelConsumer<T> second;

	ChannelSupplier<T> input;

	private ChannelBifurcator() {
	}

	public static <T> ChannelBifurcator<T> create() {
		return new ChannelBifurcator<>();
	}

	// TODO: change constructor, I am just lazy pig to provide more that two output sources.
	public static <T> ChannelBifurcator<T> create(ChannelSupplier<T> supplier, ChannelConsumer<T> first, ChannelConsumer<T> second) {
		return new ChannelBifurcator<T>().withInput(supplier).withOutputs(first, second);
	}

Next, we want to initialize our input and output channels using chain call. We need to implement withOutputs method first:

public ChannelBifurcator<T> withOutputs(ChannelConsumer<T> firstOutput, ChannelConsumer<T> secondOutput) {
	this.first = sanitize(firstOutput);
	this.second = sanitize(secondOutput);
	tryStart();
	return this;
}

After this we can create a bifurcator in the following way:

bifurcator = ChannelBifurcator.create().withInput(input).withOutputs(first, second);

Next, we implement getInput() in our class:

@Override
public ChannelInput<T> getInput() {
	return input -> {
		checkState(!isProcessStarted(), "Can't configure bifurcator while it is running");
		this.input = sanitize(input);
		tryStart();
		return getProcessCompletion();
	};
}

doProcess is the main method where item delivering logic should be described. In our case, it is duplicating and sending an input item:

@Override
protected void doProcess() {
	if (isProcessComplete()) {
		return;
	}

	input.get()
			.whenComplete((item, e) -> {
				if (item != null) {
					first.accept(trySlice(item)).both(second.accept(trySlice(item)))
							.whenComplete(($, e1) -> {
								if (e1 == null) {
									doProcess();
								} else {
									close(e1);
								}
							});
					tryRecycle(item);
				} else {
					first.accept(null).both(second.accept(null))
							.whenComplete(($, e2) -> completeProcess(e2));
				}
			});
}

doClose method is called when communication process lifetime ends. We must remember to close all the channels:

@Override
protected void doClose(Throwable e) {
	input.close(e);
	first.close(e);
	second.close(e);
}

Finally, we define bifuractor startup:

private void tryStart() {
	if (input != null && first != null && second != null) {
		getCurrentEventloop().post(this::startProcess);
	}
}

Here it is, your own CS process is ready!

See full example on GitHub

Now let’s test our bifurcator out:

public void simpleCase() {
	List<String> expected = new ArrayList<>();
	expected.add("1st");
	expected.add("2nd");
	expected.add("3rd");
	expected.add("4th");
	expected.add("5th");

	Eventloop eventloop = Eventloop.create().withCurrentThread();

	List<String> firstResults = new ArrayList<>();
	List<String> secondResults = new ArrayList<>();

	ChannelBifurcator.<String>create()
			.withInput(ChannelSupplier.ofIterable(expected))
			.withOutputs(ChannelConsumer.of(AsyncConsumer.<String>of(firstResults::add)).async(),
					ChannelConsumer.of(AsyncConsumer.<String>of(secondResults::add)).async())
			.getProcessCompletion()
			.whenComplete(assertComplete());

	eventloop.run();

	assertEquals(expected, firstResults);
	assertEquals(firstResults, secondResults);
	assertEquals(expected, secondResults);
}