Promise Module

Features

Promises are primary building blocks in DataKernel async programming model, and can be compared to Java Futures (CompletionStages to be more exact).

If you are not familiar with the Promises concept, the following paragraph is for you. Otherwise you can skip this part and move directly to the next section.

Promises Basics

  • In general, a Promise represents the result of an operation that hasn’t completed yet, but will at some undetermined point of time in the future. It is used for deferred and asynchronous computations.
  • Promise is a high-performance Java Future alternative. It not only represents a future result of an asynchronous computation, but also allows to transform and process the unspecified yet result using chaining mechanism. Moreover, such results can be combined with the help of the provided combinators.
  • Unlike Java Future, Promises were naturally designed to work within single eventloop thread. They are extremely lightweight, have no multithreading overhead and capable of processing millions of calls per second

Creating Promises

Using DataKernel we can primary manage Promises with the basic methods:

  • of(T value) - creates successfully completed promise, like CompletableFuture.completedFuture().
  • ofException() - creates an exceptionally completed promise.
  • complete() - creates successfully completed Promise<Void>, a shortcut to Promise.of(null).
Promise<Integer> firstNumber = Promise.of(10);
Promise.of("Hello World");
Promise.ofException(new Exception("Something went wrong"));

Chaining Promises

The Promise will succeed or fail at some unspecified time and you need a chain methods that will be executed in both cases:

  • then() - returns a new Promise which, when this Promise completes successfully, is executed with this Promise as an argument, like CompletionStage.thenCompose().
  • map() - returns a new Promise which, when this Promise completes successfully, is executed with its result as an argument, like CompletionStage.thenApply().
  • whenResult() - subscribes to execute given action after this Promise completes successfully, like CompletionStage.thenAccept().

In addition, to handle errors the following methods are provided:

  • thenEx() - returns a new Promise which is executed with the Promise result as the argument when Promise completes either successfully or with an exception.
  • whenException() - subscribe to execute given action after this Promise completes exceptionally and returns a new Promise.


When we have multiple asynchronous calls, we need to execute them in order. So we can just chain methods together to create a sequence.

doSomeProcess()
		.whenResult(result -> System.out.println(String.format("Result of some process is '%s'", result)))
		.whenException(e -> System.out.println(String.format("Exception after some process is '%s'",e.getMessage())))
		.map(String::toLowerCase)
		.mapEx((result, e) -> e == null ? String.format("The mapped result is '%s'", result) : e.getMessage())
		.whenResult(System.out::println);
See full example on GitHub.

Combine Promises

There are cases when you need to execute some Promises and combine their results. For this purpose, consider the following methods:

  • combine() - returns a new Promise that, when both Promises are completed, is executed with the two results as arguments.
  • all() - returns a Promise that completes when all of the provided promises are completed.
  • any() - returns one of the first completed Promises.
Promise<Integer> firstNumber = Promise.of(10);
Promise<Integer> secondNumber = Promises.delay(2000, 100);

Promise<Integer> result = firstNumber.combine(secondNumber, Integer::sum);
result.whenResult(res -> System.out.println("The first result is " + res));
See full example on GitHub.
  • delay() - delays completion of provided Promise for the defined period of time.
Promise<String> strPromise = Promises.delay("result", Duration.seconds(10))

Features

DataKernel Promises are heavily GC-optimized:

  • internal representation of typical Promise consists of 1-2 objects with bare minimum of fields inside
  • all intermediate Promises are disposable and do not even store computation results inside
  • after fulfilling, the result is passed to their subscribers and discarded afterwards


In order to optimise Promises, there are several implementations of Promise interface:

                                        Promise
                                          | |
                                          | |
                         AbstractPromise _| |_ MaterializedPromise
                               | |                   | | |
                               | |                   | | |
                  NextPromise _| |_ SettablePromise _| | |_ CompleteExceptionallyPromise
                                                       |
                                                       |
                                                CompletePromise
                                                      | |
                                                      | |
                                CompleteResultPromise_| |_CompleteNullPromise
  • Promise - root interface which represents promises behaviour.
  • MaterializedPromise - an interface which has getResult() and getException() methods and a special container for result. This allows to materialize intermediate stateless Promises and get their values when they will be completed.
  • SettablePromise - a class which can be used as a root for chain of Promises. Allows to wrap operations in Promises, can be completed manually even before actual completion.
  • AbstractPromise, NextPromise - helper classes which enable creating chains of stateless Promises. You can treat these chains as pipes which pass values through, but don’t store them.
  • CompletePromise - an abstract class which represents a successfully completed Promise.
  • CompleteExceptionallyPromise, CompleteResultPromise, CompleteNullPromise - helper classes.

You can add Promise module to your project by inserting dependency in pom.xml:

<dependency>
    <groupId>io.datakernel</groupId>
    <artifactId>datakernel-promise</artifactId>
    <version>3.0.0-SNAPSHOT</version>
</dependency>


Examples

Note: To run the examples, you need to clone DataKernel from GitHub:
$ git clone https://github.com/softindex/datakernel
And import it as a Maven project. Before running the examples, build the project.
These examples are located at datakernel -> examples -> core -> promise

PromiseChainExample

You can create chains of Promises even before they are completed and you don’t know if they will complete successfully or with an exception. In this example we have a doSomeProcess which returns a Promise that has equal chances to complete either successfully or with an exception. So we create a chain which will handle both cases:

public class PromiseChainExample {
	private static final Eventloop eventloop = Eventloop.create().withCurrentThread();
	public static void main(String[] args) {
		//[START REGION_1]
		doSomeProcess()
				.whenResult(result -> System.out.println(String.format("Result of some process is '%s'", result)))
				.whenException(e -> System.out.println(String.format("Exception after some process is '%s'",e.getMessage())))
				.map(String::toLowerCase)
				.mapEx((result, e) -> e == null ? String.format("The mapped result is '%s'", result) : e.getMessage())
				.whenResult(System.out::println);
		//[END REGION_1]
		Promise.complete()
				.then($ -> loadData())
				.whenResult(result -> System.out.println(String.format("Loaded data is '%s'", result)));
		eventloop.run();
	}

	private static Promise<String> loadData() {
		return Promise.of("Hello World");
	}

	public static Promise<String> doSomeProcess() {
		return Promises.delay(1000, Math.random() > 0.5 ?
				Promise.of("Hello World") :
				Promise.ofException(new RuntimeException("Something went wrong")));
	}
}

If you run the example, you will receive either this output (if doSomeProcess finishes successfully):

Loaded data is 'Hello World'
Result of some process is 'Hello World'
The mapped result is 'hello world'

Or this, if it finishes with an exception:

Loaded data is 'Hello World'
Exception after some process is 'Something went wrong'
Something went wrong

Note that the first line is

Loaded data is 'Hello World'

This is due to the 1 second delay we set up in doSomeProcess.

PromiseAdvanceExample

You can combine several Promises, for example:

Promise<Integer> firstNumber = Promise.of(10);
Promise<Integer> secondNumber = Promises.delay(2000, 100);

Promise<Integer> result = firstNumber.combine(secondNumber, Integer::sum);
result.whenResult(res -> System.out.println("The first result is " + res));

There are also several ways to delay Promise:

Promise<Integer> intervalPromise = Promises.interval(2000, Promise.of(1000));
Promise<Integer> schedulePromise = Promises.schedule(2000, Instant.now());
Promise<Integer> delayPromise = Promises.delay(1000, 1000);

Promise<Integer> result = intervalPromise
		.combine(schedulePromise, (first, second) -> first - second)
		.combine(delayPromise, Integer::sum);

result.whenResult(res -> System.out.println("The second result is " + res));

PromisesExamples

In the following example the Promises loop was utilized, which resembles Java for loop, but has async capabilities, which are provided by Promise:

Promises.loop(0, AsyncPredicate.of(i -> i < 5), i -> {
	System.out.println("This is iteration #" + ++i);
	return Promise.of(i);
});

The output is:

Looping with condition:
This is iteration #1
This is iteration #2
This is iteration #3
This is iteration #4
This is iteration #5

Another example creates a list of Promises using Promises toList method:

Promises.toList(Promise.of(1), Promise.of(2), Promise.of(3), Promise.of(4), Promise.of(5), Promise.of(6))
		.whenResult(list -> System.out.println("Size of collected list: " + list.size() + "\nList: " + list));

The output is:

Collecting group of Promises to list of Promises' results:
Size of collected list: 6
List: [1, 2, 3, 4, 5, 6]

In the last example Promises toArray method is utilized, which reduces promises to array of provided data type (in this case, Integers):

Promises.toArray(Integer.class, Promise.of(1), Promise.of(2), Promise.of(3), Promise.of(4), Promise.of(5), Promise.of(6))
		.whenResult(array -> System.out.println("Size of collected array: " + array.length + "\nArray: " + Arrays.toString(array)));

And the final output is:

Collecting group of Promises to array of Promises' results:
Size of collected array: 6
Array: [1, 2, 3, 4, 5, 6]

See full example on GitHub.

AsyncFileServiceExample

When you run Async File Example, you’ll receive the following output, which represents content of the created file:

Hello
This is test file
This is the 3rd line in file

In this example Promise’s AsyncFile (represents a file with asynchronous capabilities) is utilized, along with several methods associated with the class, such as:

  • open() - opens file synchronously.
  • write() - writes all bytes of provided ByteBuf into file asynchronously.
  • read() - reads all bytes from file into a ByteBuf asynchronously.
@NotNull
private static Promise<Void> writeToFile() {
	try {
		FileChannel channel = FileChannel.open(PATH, set(WRITE, APPEND));

		byte[] message1 = "Hello\n".getBytes();
		byte[] message2 = "This is test file\n".getBytes();
		byte[] message3 = "This is the 3rd line in file".getBytes();

		return fileService.write(channel, 0, message1, 0, message1.length)
				.then($ -> fileService.write(channel, 0, message2, 0, message2.length))
				.then($ -> fileService.write(channel, 0, message3, 0, message3.length))
				.toVoid();
	} catch (IOException e) {
		return Promise.ofException(e);
	}
}

@NotNull
private static Promise<ByteBuf> readFromFile() {
	byte[] array = new byte[1024];
	FileChannel channel;
	try {
		channel = FileChannel.open(PATH, set(READ));
	} catch (IOException e) {
		return Promise.ofException(e);
	}

	return fileService.read(channel, 0, array, 0, array.length)
			.map($ -> {
				ByteBuf buf = ByteBuf.wrapForReading(array);
				System.out.println(buf.getString(UTF_8));
				return buf;
			});
}

See full example on GitHub