To run the examples, you should execute those lines in the console in appropriate folder:
Or this lines:
Note that for network demo you should first launch the server and then the client.
There are dozens of builtin primitives which you just have to wire to each other (above example from the picture proves it).
Here is a list of them with short descriptions:
StreamProducer.idle() - does not send any data nor closes itself.
StreamProducer.closing() - does not send any data but closes itself immediately after binding.
StreamProducer.closingWithError(Throwable) - closes with given error after binding.
StreamProducer.endOfStreamOnError() - a wrapper which closes itself whether given stream closes with error or not.
StreamProducer.noEndOfStream() - a wrapper which never closes.
StreamProducer.of(values…) - sends given values and then closes.
StreamProducer.ofIterator(iterator) - sends values from given iterator until it finishes and then closes.
StreamProducer.ofIterable(iterable) - same as above.
StreamProducer.ofStage(stage) - wrapper which unwraps producer from a CompletionStage (starts sending data from producer from stage when stage is complete).
StreamProducer.ofStageWithResult(CompletionStage) - same as above but for producers that have a result.
StreamProducer.withEndOfStreamAsResult(StreamProducer) - wrapper which returns empty result when stream closes.
StreamProducer.withResult(StreamProducer, CompletionStage) - wrapper which assigns given CompletionStage as a result to given producer.
StreamProducer.concat(StreamProducer…) - wrapper which concatenates given producers.
StreamFileReader - producer which allows to non-blockingly read data from file.
StreamConsumer.idle() - does nothing, when wired producer finishes sets its status as finished too.
StreamConsumer.errorDecorator(StreamConsumer, Predicate, Supplier) - wrapper which closes with given error when predicate fails on consumed item.
StreamConsumer.suspendDecorator(StreamConsumer, Predicate, …) - wrapper which suspends when predicate fails on consumed item.
StreamFileWriter - consumer which allows to non-blockingly write data to file.
CountingStreamForwarder - does not transform data, but counts how many items passed through it.
StreamBinarySerializer/Deserializer - transforms data to/from ByteBuf’s using given serializer.
StreamByteChunker - transforms only ByteBuf’s into ByteBuf’s, slicing them in smaller chunks.
StreamFilter - passes through only those items which matched given predicate.
StreamForwarder - not really a transformer, does nothing to data, but wires together producer and consumer wrapped in CompletionStage’s.
StreamFunction - converts given items into other using given function (equivalent of the map operation).
StreamJoin - complicated transfomer which joins more than one producer into one consumer with strategies and mapping functions.
StreamLZ4Compressor/Decompressor - transforms only ByteBuf’s into ByteBuf’s, compressing/decompressing them with LZ4 algorithm.
StreamMap - smarter version of StreamFunction which can transform one item into various number of other items (equivalent of the flatMap operation).
StreamMerger - Merges streams sorted by keys and streams their sorted union.
StreamReducer - Performs aggregative functions on the elements from input streams sorted by keys. Searches key of item with key function, selects elements with some key, reduces it and streams the results sorted by key.
StreamReducerSimple - Performs a reduction on the elements of input streams using the key function.
StreamSharder - Divides input stream into groups with some key function, and sends obtained streams to consumers.
StreamSorter - Receives data and saves it, and on end of stream sorts it and streams to the destination.
StreamSplitter - Sends received items into multiple consumers at once.
StreamUnion - Unions all input streams and streams their items in order of receiving them to the destination.
We have measured the performance of our streams under various use scenarios.
Results are shown in the table below.
In every scenario producer generates 1 million numbers from 1 to 1,000,000.
Columns describe the different behaviour of the consumer (backpressure): whether it suspends and how often.
Numbers denote how many items has been processed by each stream graph per second (on a single core).
* Typically, suspend/resume occurs very infrequently, only when consumers are saturated or during network congestions. In most cases intermediate buffering alleviates the suspend/resume cost and brings amortized complexity of your data processing pipeline to maximum throughput figures shown here.