RPC Module

Table of contents:

Features

RPC module allows to build distributed applications that require efficient client-server interconnections between servers.

  • Ideal for creation of near-realtime (i.e. memcache-like) servers with application-specific business logic
  • Up to ~15M of requests per second on single core
  • Pluggable high-performance asynchronous binary RPC streaming protocol
  • Consistent hashing and round-robin distribution strategies
  • Fault tolerance - with re-connections to fallback and replica servers

You can add RPC module to your project by inserting dependency in pom.xml:

<dependency>
    <groupId>io.datakernel</groupId>
    <artifactId>datakernel-rpc</artifactId>
    <version>3.0.0-beta1</version>
</dependency>

Benchmarks

We’ve measured RPC performance using JMH as the benchmark tool:

Time: 6724ms; Average time: 672.4ms; Best time: 657ms; Worst time: 694ms; Requests per second: 14872099

And MemcacheRPC performance:

Put
Time: 7019ms; Average time: 1002.714285714285ms; Best time: 954ms; Worst time: 1180ms; Requests per second: 9479697
Get
Time: 6805ms; Average time: 972.1428571428571ms; Best time: 931ms; Worst time: 1064ms; Requests per second: 9714915

You can find benchmark sources on GitHub.

Examples

Simple RPC Example

Note: To run the example, you need to clone DataKernel from GitHub:
$ git clone https://github.com/softindex/datakernel
And import it as a Maven project. Before running the example, build the project.
The example is located at datakernel -> examples -> cloud -> rpc.

In the “Hello World” client and server RPC Example client sends a request which contains word “World” to server. When server receives it, it sends a respond which contains word “Hello “. If everything completes successfully, we get the following output:

Got result: Hello World

Let’s have a look at the implementation:

public class RpcExample extends Launcher {
	private static final int SERVICE_PORT = 34765;

	@Inject
	private RpcClient client;

	@Inject
	private RpcServer server;

	@Inject
	private Eventloop eventloop;

	@Provides
	Eventloop eventloop() {
		return Eventloop.create();
	}

	@Provides
	RpcServer rpcServer(Eventloop eventloop) {
		return RpcServer.create(eventloop)
				.withMessageTypes(String.class)
				.withHandler(String.class, String.class,
						request -> Promise.of("Hello " + request))
				.withListenPort(SERVICE_PORT);
	}

	@Provides
	RpcClient rpcClient(Eventloop eventloop) {
		return RpcClient.create(eventloop)
				.withMessageTypes(String.class)
				.withStrategy(server(new InetSocketAddress(SERVICE_PORT)));
	}

	@Override
	protected Module getModule() {
		return ServiceGraphModule.create();
	}

	@Override
	protected void run() throws ExecutionException, InterruptedException {
		CompletableFuture<Object> future = eventloop.submit(() ->
				client.sendRequest("World", 1000)
		);
		System.out.println("RPC result: " + future.get());
	}

	public static void main(String[] args) throws Exception {
		RpcExample example = new RpcExample();
		example.launch(args);
	}
}

RpcExample class extends Launcher to help us manage application lifecycle.

We need to provide RpcServer and RpcClient with relevant configurations and required dependencies using DataKernel DI. RpcClient sends requests to the specified server according to the provided RpcStrategies (getting a single RPC-service). For RpcServer we define the type of messages which it will proceed, corresponding RpcRequestHandler and listen port.

Since we extend Launcher, we will also override 2 methods: getModule to provide ServiceGraphModule and run, which represents the main logic of the example.

Finally, we define main method, which will launch our example.

Round-Robin Strategy

RPC module contains pre-defined strategies for requests arrangement between RPC servers or shards of servers. Round-Robin is one of the simplest of the strategies: it just goes through the servers or shards in a cyclic way one by one. So, in this example we create an RPC pool with 5 equal connections and set roundRobin strategy for them. Next, we create a sender for the pool with the previously defined strategy. And that’s it, 100 requests will be equally distributed between the servers:

public void roundRobinTest() {
	RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();
	RpcSenderStub connection1 = new RpcSenderStub();
	RpcSenderStub connection2 = new RpcSenderStub();
	RpcSenderStub connection3 = new RpcSenderStub();
	RpcSenderStub connection4 = new RpcSenderStub();
	RpcSenderStub connection5 = new RpcSenderStub();
	pool.put(ADDRESS_1, connection1);
	pool.put(ADDRESS_2, connection2);
	pool.put(ADDRESS_3, connection3);
	pool.put(ADDRESS_4, connection4);
	pool.put(ADDRESS_5, connection5);
	int iterations = 100;
	RpcStrategy strategy = roundRobin(servers(ADDRESS_1, ADDRESS_2, ADDRESS_3, ADDRESS_4, ADDRESS_5));

	RpcSender sender = strategy.createSender(pool);
	for (int i = 0; i < iterations; i++) {
		sender.sendRequest(new Object(), 50, ignore());
	}

	List<RpcSenderStub> connections =
			asList(connection1, connection2, connection3, connection4, connection5);
	for (int i = 0; i < 5; i++) {
		assertEquals(iterations / 5, connections.get(i).getRequests());
	}
}

Round-Robin and First Available Strategies Combined

You can simply combine RPC strategies. In this example we will combine roundRobin and firstAvailable strategies. First, we create 4 connections but don’t put connection3 into the pool. Then we start sending 20 requests. As a result, all the requests will be equally distributed between connection1 (as it is always firstAvailable) and connection4 (as connection3 isn’t available for the pool):

public void roundRobinAndFirstAvailableTest() {
	RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();
	RpcSenderStub connection1 = new RpcSenderStub();
	RpcSenderStub connection2 = new RpcSenderStub();
	RpcSenderStub connection3 = new RpcSenderStub();
	RpcSenderStub connection4 = new RpcSenderStub();
	pool.put(ADDRESS_1, connection1);
	pool.put(ADDRESS_2, connection2);
	// we don't put connection3
	pool.put(ADDRESS_4, connection4);
	int iterations = 20;
	RpcStrategy strategy = roundRobin(
			firstAvailable(servers(ADDRESS_1, ADDRESS_2)),
			firstAvailable(servers(ADDRESS_3, ADDRESS_4)));

	RpcSender sender = strategy.createSender(pool);
	for (int i = 0; i < iterations; i++) {
		sender.sendRequest(new Object(), 50, assertNoCalls());
	}

	assertEquals(iterations / 2, connection1.getRequests());
	assertEquals(0, connection2.getRequests());
	assertEquals(0, connection3.getRequests());
	assertEquals(iterations / 2, connection4.getRequests());
}

Sharding and First Valid Strategies Combined

You can also create your own sharding functions and combine them with other strategies if needed. In this example we create 5 equal connections but don’t add connection2 into the pool. Next, we provide a simple sharding function which distributes requests between shards in accordance to the content of the request. We split the connections into two shards, and set firstValidResult strategy for both of them. This strategy sends request to all the available servers. Now we manually send 7 requests:

  • 4 with 0 message, so they’ll be sent to the first shard’s connection1
  • 3 with 1, so they’ll all be sent to all three connections of the second shard
public void shardingAndFirstValidTest() {
	RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();
	RpcSenderStub connection1 = new RpcSenderStub();
	RpcSenderStub connection2 = new RpcSenderStub();
	RpcSenderStub connection3 = new RpcSenderStub();
	RpcSenderStub connection4 = new RpcSenderStub();
	RpcSenderStub connection5 = new RpcSenderStub();
	pool.put(ADDRESS_1, connection1);
	// we don't put connection2
	pool.put(ADDRESS_3, connection3);
	pool.put(ADDRESS_4, connection4);
	pool.put(ADDRESS_5, connection5);
	int shardsCount = 2;
	ShardingFunction<Integer> shardingFunction = item -> item % shardsCount;
	RpcStrategy strategy = sharding(shardingFunction,
			firstValidResult(servers(ADDRESS_1, ADDRESS_2)),
			firstValidResult(servers(ADDRESS_3, ADDRESS_4, ADDRESS_5)));

	RpcSender sender = strategy.createSender(pool);
	sender.sendRequest(0, 50, assertNoCalls());
	sender.sendRequest(0, 50, assertNoCalls());
	sender.sendRequest(1, 50, assertNoCalls());
	sender.sendRequest(1, 50, assertNoCalls());
	sender.sendRequest(0, 50, assertNoCalls());
	sender.sendRequest(0, 50, assertNoCalls());
	sender.sendRequest(1, 50, assertNoCalls());

	assertEquals(4, connection1.getRequests());
	assertEquals(0, connection2.getRequests());
	assertEquals(3, connection3.getRequests());
	assertEquals(3, connection4.getRequests());
	assertEquals(3, connection5.getRequests());
}

Rendezvous Hashing Strategy

Rendezvous hashing strategy pre-calculates the hash function for the RpcSender and creates a map of the RPC servers. The map is stored in cache and will be re-calculated only if servers go online/offline.

In this example requests will be equally distributed between connection1, connection2, connection3:

public void rendezvousHashingTest() {
	RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();
	RpcSenderStub connection1 = new RpcSenderStub();
	RpcSenderStub connection2 = new RpcSenderStub();
	RpcSenderStub connection3 = new RpcSenderStub();
	RpcSenderStub connection4 = new RpcSenderStub();
	RpcSenderStub connection5 = new RpcSenderStub();
	HashFunction<Integer> hashFunction = item -> item;
	RpcStrategy strategy = rendezvousHashing(hashFunction)
			.withShard(1, firstAvailable(servers(ADDRESS_1, ADDRESS_2)))
			.withShard(2, firstAvailable(servers(ADDRESS_3, ADDRESS_4)))
			.withShard(3, server(ADDRESS_5));
	int iterationsPerLoop = 1000;
	RpcSender sender;

	pool.put(ADDRESS_1, connection1);
	pool.put(ADDRESS_2, connection2);
	pool.put(ADDRESS_3, connection3);
	pool.put(ADDRESS_4, connection4);
	pool.put(ADDRESS_5, connection5);
	sender = strategy.createSender(pool);
	for (int i = 0; i < iterationsPerLoop; i++) {
		sender.sendRequest(i, 50, ignore());
	}

When we remove some of the connections form the pool, hash function is recalculated:

	pool.remove(ADDRESS_3);
	pool.remove(ADDRESS_4);
	sender = strategy.createSender(pool);
	for (int i = 0; i < iterationsPerLoop; i++) {
		sender.sendRequest(i, 50, ignore());
	}

	double acceptableError = iterationsPerLoop / 10.0;
	assertEquals(iterationsPerLoop / 3 + iterationsPerLoop / 2, connection1.getRequests(), acceptableError);
	assertEquals(0, connection2.getRequests());
	assertEquals(iterationsPerLoop / 3, connection3.getRequests(), acceptableError);
	assertEquals(0, connection4.getRequests());
	assertEquals(iterationsPerLoop / 3 + iterationsPerLoop / 2, connection5.getRequests(), acceptableError);
}

Type Dispatch Strategy

This strategy simply distributes requests among shards in accordance to the type of the request. In the example all String requests are sent on the first shard which has firstValidResult strategy for the servers. Request with all other types are sent to the second shard with firstAvailable strategy. As a result, connection1 and connection2 will process 35 requests, connection3 - 25 requests, while connection4 and connection5 - 0 requests as connection3 was always firstAvailable:

public void typeDispatchTest() {
	RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();
	RpcSenderStub connection1 = new RpcSenderStub();
	RpcSenderStub connection2 = new RpcSenderStub();
	RpcSenderStub connection3 = new RpcSenderStub();
	RpcSenderStub connection4 = new RpcSenderStub();
	RpcSenderStub connection5 = new RpcSenderStub();
	pool.put(ADDRESS_1, connection1);
	pool.put(ADDRESS_2, connection2);
	pool.put(ADDRESS_3, connection3);
	pool.put(ADDRESS_4, connection4);
	pool.put(ADDRESS_5, connection5);
	int timeout = 50;
	int iterationsPerDataStub = 25;
	int iterationsPerDataStubWithKey = 35;
	RpcSender sender;
	RpcStrategy strategy = typeDispatching()
			.on(String.class,
					firstValidResult(servers(ADDRESS_1, ADDRESS_2)))
			.onDefault(
					firstAvailable(servers(ADDRESS_3, ADDRESS_4, ADDRESS_5)));

	sender = strategy.createSender(pool);
	for (int i = 0; i < iterationsPerDataStub; i++) {
		sender.sendRequest(new Object(), timeout, assertNoCalls());
	}
	for (int i = 0; i < iterationsPerDataStubWithKey; i++) {
		sender.sendRequest("request", timeout, assertNoCalls());
	}

	assertEquals(iterationsPerDataStubWithKey, connection1.getRequests());
	assertEquals(iterationsPerDataStubWithKey, connection2.getRequests());
	assertEquals(iterationsPerDataStub, connection3.getRequests());
	assertEquals(0, connection4.getRequests());
	assertEquals(0, connection5.getRequests());
}