Promise Module

Features

Promises are primary building blocks in the DataKernel async programming model which can be compared to Java Futures (CompletionStage to be more exact).

If you are not familiar with the Promises concept, the following paragraph is for you. If otherwise, you can skip this part and move directly to the next section.

Promises Basics

  • In general, Promise represents the result of an operation that hasn’t been completed yet, but will be at some undetermined point of time in the future. It is used for deferred and asynchronous computations.
  • Promise is a high-performance Java Future alternative. It doesn’t only represent a future result of an asynchronous computation, but also allows to transform and process the unspecified incoming result using a chaining mechanism. Moreover, such results can be combined with the help of the provided combinators.
  • Unlike Java Future, Promises were naturally designed to work within a single eventloop thread. They are extremely lightweight, have no multithreading overhead, and thus are capable of processing millions of calls per second.

Creating Promises

Using DataKernel, we can primarily manage Promises with the basic methods:

  • of(T value) - creates a successfully completed promise, like CompletableFuture.completedFuture().
  • ofException() - creates an exceptionally completed promise.
  • complete() - creates a successfully completed Promise<Void>, a shortcut to Promise.of(null).
Promise<Integer> firstNumber = Promise.of(10);
Promise.of("Hello World");
Promise.ofException(new Exception("Something went wrong"));

Chaining Promises

Promise will succeed or fail at some unspecified time and you need to chain methods that will be executed in both cases:

  • then() - returns a new Promise which, when this Promise completes successfully, is executed with this Promise as an argument, like CompletionStage.thenCompose().
  • map() - returns a new Promise which, when this Promise completes successfully, is executed with its result as an argument, like CompletionStage.thenApply().
  • whenResult() - subscribes to execute given action after this Promise completes successfully, like CompletionStage.thenAccept().

In addition, to handle errors the following methods are provided:

  • thenEx() - returns a new Promise which is executed with the Promise result as the argument when Promise completes either successfully or with an exception.
  • whenException() - subscribe to execute given action after this Promise completes exceptionally and returns a new Promise.

When we have multiple asynchronous calls, we need to execute them in order. That way we can just chain methods together to create a sequence.

doSomeProcess()
		.whenResult(result -> System.out.println(String.format("Result of some process is '%s'", result)))
		.whenException(e -> System.out.println(String.format("Exception after some process is '%s'", e.getMessage())))
		.map(String::toLowerCase)
		.mapEx((result, e) -> e == null ? String.format("The mapped result is '%s'", result) : e.getMessage())
		.whenResult(System.out::println);

See full example on GitHub.

Combine Promises

There are cases when you need to execute some Promises and combine their results. For this purpose, consider the following methods:

  • combine() - returns a new Promise that, when both Promises are completed, is executed with the two results as arguments.
  • all() - returns a Promise that completes when all of the provided promises are completed.
  • any() - returns one of the first completed Promises.
Promise<Integer> firstNumber = Promise.of(10);
Promise<Integer> secondNumber = Promises.delay(2000, 100);

Promise<Integer> result = firstNumber.combine(secondNumber, Integer::sum);
result.whenResult(res -> System.out.println("The first result is " + res));

See full example on GitHub.

  • delay() - delays completion of provided Promise for a defined period of time.
Promise<String> strPromise = Promises.delay("result", Duration.seconds(10))

Optimization Features

DataKernel Promises are heavily GC-optimized:

  • An internal representation of a typical Promise consists of 1-2 objects with a bare minimum of fields inside
  • After it is fulfilled, the result is passed to their subscribers and discarded afterwards

In order to optimize Promises, there are several implementations of the Promise interface:

graph TD Promise --> AbstractPromise Promise --> CompleteExceptionallyPromise Promise --> CompletePromise AbstractPromise --> NextPromise AbstractPromise --> SettablePromise CompletePromise --> CompleteResultPromise CompletePromise --> CompleteNullPromise
  • Promise - root interface which represents promises behaviour.
  • SettablePromise - a class which can be used as a root for a chain of Promises. Allows to wrap operations in Promises, can be completed manually even before actual completion.
  • AbstractPromise, NextPromise - helper classes which enable creating chains of stateless Promises. You can treat these chains as pipes which pass values through, but don’t store them.
  • CompletePromise - an abstract class which represents a successfully completed Promise.
  • CompleteExceptionallyPromise, CompleteResultPromise, CompleteNullPromise - helper classes.

You can add a Promise module to your project by inserting dependency in pom.xml:

<dependency>
    <groupId>io.datakernel</groupId>
    <artifactId>datakernel-promise</artifactId>
    <version>3.1.0</version>
</dependency>

Benchmarks

We’ve compared DataKernel Promise to Java CompletableFuture in different scenarios:

  1. DataKernel Promise/Java CompletableFuture executes operations with one promise/future.
  2. DataKernel Promise/Java CompletableFuture combines several promises/futures.

We used JMH as the benchmark tool and ran benchmarks in AverageTime mode. All the measurements are represented in nanoseconds.

DataKernel Promise oneCallMeasure 
Cnt: 10; Score: 12.952; Error: ± 0.693; Units: ns/op;

DataKernel Promise combineMeasure 
Cnt: 10; Score: 34.112; Error: ± 1.869; Units: ns/op;

Java CompletableFuture oneCallMeasure 
Cnt: 10; Score: 85.151; Error: ± 1.781; Units: ns/op;

Java CompletableFuture combineMeasure
Cnt: 10; Score: 153.645; Error: ± 4.491; Units: ns/op;

You can find benchmark sources on GitHub.

Examples

Note: To run the examples, you need to clone DataKernel from GitHub:
$ git clone https://github.com/softindex/datakernel
And import it as a Maven project. Check out branch v3.1. Before running the examples, build the project.
These examples are located at datakernel -> examples -> core -> promise

PromiseChainExample

You can create chains of Promises even before they are completed and you don’t know yet if they will complete successfully or with an exception. In this example we have a doSomeProcess which returns a Promise that has equal chances to complete either successfully or with an exception. So we create a chain which will handle both cases:

public class PromiseChainExample {
	private static final Eventloop eventloop = Eventloop.create().withCurrentThread();

	public static void main(String[] args) {
		//[START REGION_1]
		doSomeProcess()
				.whenResult(result -> System.out.println(String.format("Result of some process is '%s'", result)))
				.whenException(e -> System.out.println(String.format("Exception after some process is '%s'", e.getMessage())))
				.map(String::toLowerCase)
				.mapEx((result, e) -> e == null ? String.format("The mapped result is '%s'", result) : e.getMessage())
				.whenResult(System.out::println);
		//[END REGION_1]
		Promise.complete()
				.then($ -> loadData())
				.whenResult(result -> System.out.println(String.format("Loaded data is '%s'", result)));
		eventloop.run();
	}

	private static Promise<String> loadData() {
		return Promise.of("Hello World");
	}

	public static Promise<String> doSomeProcess() {
		return Promises.delay(1000, Math.random() > 0.5 ?
				Promise.of("Hello World") :
				Promise.ofException(new RuntimeException("Something went wrong")));
	}
}

If you run the example, you will receive either this output (if doSomeProcess finishes successfully):

Loaded data is 'Hello World'
Result of some process is 'Hello World'
The mapped result is 'hello world'

Or this, if it finishes with an exception:

Loaded data is 'Hello World'
Exception after some process is 'Something went wrong'
Something went wrong

Note that the first line is

Loaded data is 'Hello World'

This is due to the 1 second delay we set up in doSomeProcess.

PromiseAdvanceExample

You can combine several Promises, for example:

Promise<Integer> firstNumber = Promise.of(10);
Promise<Integer> secondNumber = Promises.delay(2000, 100);

Promise<Integer> result = firstNumber.combine(secondNumber, Integer::sum);
result.whenResult(res -> System.out.println("The first result is " + res));

There are also several ways to delay Promise:

int someValue = 1000;
int delay = 1000;     // in milliseconds
int interval = 2000;  // also in milliseconds
Promise<Integer> intervalPromise = Promises.interval(interval, Promise.of(someValue));
Promise<Integer> schedulePromise = Promises.schedule(someValue * 2, Instant.now());
Promise<Integer> delayPromise = Promises.delay(delay, someValue);

Promise<Integer> result = intervalPromise
		.combine(schedulePromise, (first, second) -> first - second)
		.combine(delayPromise, Integer::sum);

result.whenResult(res -> System.out.println("The second result is " + res));

PromisesExamples

Promises is a helper class which allows to efficiently manage multiple Promises. This example will demonstrate three use cases.

1.In the following example we use the Promises loop, which resembles Java for loop, but has async capabilities, which are provided by Promise:

Promises.loop(0,
		i -> i < 5,
		i -> {
			System.out.println("This is iteration #" + i);
			return Promise.of(i + 1);
		});

The output is:

Looping with condition:
This is iteration #1
This is iteration #2
This is iteration #3
This is iteration #4
This is iteration #5

2.Another example creates a list of Promises results using Promises toList method:

Promises.toList(Promise.of(1), Promise.of(2), Promise.of(3), Promise.of(4), Promise.of(5), Promise.of(6))
		.whenResult(list -> System.out.println("Size of collected list: " + list.size() + "\nList: " + list));

The output is:

Collecting group of **Promises** to list of **Promises**' results:
Size of collected list: 6
List: [1, 2, 3, 4, 5, 6]

3.In the last example Promises toArray method is utilized, which reduces promises to array of provided data type (in this case, Integers):

Promises.toArray(Integer.class, Promise.of(1), Promise.of(2), Promise.of(3), Promise.of(4), Promise.of(5), Promise.of(6))
		.whenResult(array -> System.out.println("Size of collected array: " + array.length + "\nArray: " + Arrays.toString(array)));

And the final output is:

Collecting group of **Promises** to array of **Promises**' results:
Size of collected array: 6
Array: [1, 2, 3, 4, 5, 6]

See full example on GitHub.

AsyncFileServiceExample

Also, you can use Promises to work with file system. When you run this example :

@NotNull
private static Promise<Void> writeToFile() {
	try {
		FileChannel channel = FileChannel.open(PATH, set(WRITE, APPEND));

		byte[] message1 = "Hello\n".getBytes();
		byte[] message2 = "This is test file\n".getBytes();
		byte[] message3 = "This is the 3rd line in file".getBytes();

		return fileService.write(channel, 0, message1, 0, message1.length)
				.then($ -> fileService.write(channel, 0, message2, 0, message2.length))
				.then($ -> fileService.write(channel, 0, message3, 0, message3.length))
				.toVoid();
	} catch (IOException e) {
		return Promise.ofException(e);
	}
}

@NotNull
private static Promise<ByteBuf> readFromFile() {
	byte[] array = new byte[1024];
	FileChannel channel;
	try {
		channel = FileChannel.open(PATH, set(READ));
	} catch (IOException e) {
		return Promise.ofException(e);
	}

	return fileService.read(channel, 0, array, 0, array.length)
			.map(bytesRead -> {
				ByteBuf buf = ByteBuf.wrap(array, 0, bytesRead);
				System.out.println(buf.getString(UTF_8));
				return buf;
			});
}

… you’ll receive the following output, which represents content of the created file:

Hello
This is test file
This is the 3rd line in file

See full example on GitHub