Remote Key-Value Storage

Purpose

In this guide we will create a remote key-value storage using RPC module. App will have 2 basic operations: put and get and will use RPC as the communication protocol.

Introduction

When writing distributed application the common concern is what protocol to use for communication. There are two main options:

  • HTTP/REST
  • RPC

While HTTP is more popular and well-specified, it has some overhead. When performance is a significant aspect of application, you should use something faster than HTTP. And for this purpose DataKernel framework has an RPC module which is based on fast serializers and custom optimized communication protocol that allows to significantly improve application performance.

What you will need:

  • About 20 minutes
  • IDE or terminal
  • JDK 1.8+
  • Maven 3.0+

To proceed with this guide you have 2 options:

Working Example

To run the example in IDE, clone DataKernel locally first:

$ git clone https://github.com/softindex/datakernel.git

And import it as a Maven project. Check out branch v3.0.

Before running the example, build the project (Ctrl + F9 for IntelliJ IDEA).

Then, go to testing section.

Step-by-step guide

1. Set up the project

First, create a folder for application and build an appropriate project structure:

remote-key-value-storage
└── pom.xml
└── src
    └── main
        └── java
            └── GetRequest.java
            └── GetResponse.java
            └── PutRequest.java
            └── PutResponse.java
            └── KeyValueStore.java
            └── ServerModule.java
            └── ServerLauncher.java
            └── ClientModule.java
            └── ClientLauncher.java

Next, configure your pom.xml file like this:

    
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>io.datakernel</groupId>
  <artifactId>rpc-kv-storage</artifactId>
  <version>3.0.0</version>

  <name>Examples : Tutorials : Rpc-KV-Storage</name>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>io.datakernel</groupId>
      <artifactId>datakernel-boot</artifactId>
      <version>3.0.0</version>
    </dependency>
    <dependency>
      <groupId>io.datakernel</groupId>
      <artifactId>datakernel-rpc</artifactId>
      <version>3.0.0</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.2.3</version>
    </dependency>
  </dependencies>

</project>

2. Define basic app functionality

Since we have two basic operations to implement (put and get), let’s start with writing down classes that will be used for communication between the client and the server: specifically, PutRequest, PutResponse, GetRequest and GetResponse. Instances of these classes will be serialized by using the fast DataKernel Serializer. It requires some meta information about these classes, which is provided by appropriate annotations. The basic rules are:

  • Use @Serialize annotation with an order number on the getter of property. Ordering provides better compatibility in case classes are changed.
  • Use @Deserialize annotation with a property name (which should be the same as the one in the getter) in constructor.
  • Use @SerializeNullable on properties that can have null values.

Therefore, classes for communication should look like following:

public class PutRequest {

	private final String key;
	private final String value;

	public PutRequest(@Deserialize("key") String key, @Deserialize("value") String value) {
		this.key = key;
		this.value = value;
	}

	@Serialize(order = 0)
	public String getKey() {
		return key;
	}

	@Serialize(order = 1)
	public String getValue() {
		return value;
	}
}
public class PutResponse {
	private final String previousValue;

	public PutResponse(@Deserialize("previousValue") String previousValue) {
		this.previousValue = previousValue;
	}

	@Serialize(order = 0)
	@SerializeNullable
	public String getPreviousValue() {
		return previousValue;
	}

	@Override
	public String toString() {
		return "{previousValue='" + previousValue + '\'' + '}';
	}
}
public class GetRequest {

	private final String key;

	public GetRequest(@Deserialize("key") String key) {
		this.key = key;
	}

	@Serialize(order = 0)
	public String getKey() {
		return key;
	}
}
public class GetResponse {
	private final String value;

	public GetResponse(@Deserialize("value") String value) {
		this.value = value;
	}

	@Serialize(order = 0)
	@SerializeNullable
	public String getValue() {
		return value;
	}

	@Override
	public String toString() {
		return "{value='" + value + '\'' + '}';
	}
}

Next, let’s write a simple implementation of key-value storage:

public class KeyValueStore {

	private final Map<String, String> store = new HashMap<>();

	public String put(String key, String value) {
		return store.put(key, value);
	}

	public String get(String key) {
		return store.get(key);
	}
}

3. Create client and server

Now, let’s write down an AbstractModule for the RPC server using DataKernel Boot to handle the “get” and “put” requests

public class ServerModule extends AbstractModule {
	private static final int RPC_SERVER_PORT = 5353;

	@Provides
	Eventloop eventloop() {
		return Eventloop.create()
				.withFatalErrorHandler(rethrowOnAnyError());
	}

	@Provides
	KeyValueStore keyValueStore() {
		return new KeyValueStore();
	}

	@Provides
	RpcServer rpcServer(Eventloop eventloop, KeyValueStore store) {
		return RpcServer.create(eventloop)
				.withSerializerBuilder(SerializerBuilder.create(Thread.currentThread().getContextClassLoader()))
				.withMessageTypes(PutRequest.class, PutResponse.class, GetRequest.class, GetResponse.class)
				.withHandler(PutRequest.class, PutResponse.class, req -> Promise.of(new PutResponse(store.put(req.getKey(), req.getValue()))))
				.withHandler(GetRequest.class, GetResponse.class, req -> Promise.of(new GetResponse(store.get(req.getKey()))))
				.withListenPort(RPC_SERVER_PORT);
	}
}

As you can see, in order to properly create an RpcServer we should indicate all classes that will be sent between a client and a server, and specify appropriate RequestHandler for each request class.

We represent them as the third arguments in these lines using Java 1.8 lambdas.

.withHandler(PutRequest.class, PutResponse.class, req -> Promise.of(new PutResponse(store.put(req.getKey(), req.getValue()))))
.withHandler(GetRequest.class, GetResponse.class, req -> Promise.of(new GetResponse(store.get(req.getKey()))))

Next, create a launcher for the RPC server:

public class ServerLauncher extends Launcher {
	@Inject
	private RpcServer server;

	@Override
	protected Module getModule() {
		return combine(
				ServiceGraphModule.create(),
				new ServerModule());
	}

	@Override
	protected void run() throws Exception {
		awaitShutdown();
	}

	public static void main(String[] args) throws Exception {
		ServerLauncher launcher = new ServerLauncher();
		launcher.launch(args);
	}
}

Now, let’s write the RPC client. In order to create the it we should again indicate all classes that will be used for communication and specify RpcStrategy. There is a whole bunch of strategies in the RPC module (such as single-server, first-available, round-robin, sharding and so on). The nice thing about them is that all strategies can be combined. For example, if you want to dispatch requests between 2 shards, and each shard actually contains main and reserve servers, you can easily tell RPC client to dispatch requests in a proper way using the following code:

RpcStrategy strategy = sharding(hashFunction,
    firstAvailable(shard_1_main_server, shard_1_reserve_server),
    firstAvailable(shard_2_main_server, shard_2_reserve_server)
);

But since we only have one server, we will just use the single-server strategy:

public class ClientModule extends AbstractModule {
	private static final int RPC_SERVER_PORT = 5353;

	@Provides
	Eventloop eventloop() {
		return Eventloop.create()
				.withFatalErrorHandler(rethrowOnAnyError())
				.withCurrentThread();
	}

	@Provides
	RpcClient rpcClient(Eventloop eventloop) {
		return RpcClient.create(eventloop)
				.withConnectTimeout(Duration.ofSeconds(1))
				.withSerializerBuilder(SerializerBuilder.create(Thread.currentThread().getContextClassLoader()))
				.withMessageTypes(PutRequest.class, PutResponse.class, GetRequest.class, GetResponse.class)
				.withStrategy(RpcStrategies.server(new InetSocketAddress("localhost", RPC_SERVER_PORT)));
	}
}

Let’s also build ClientLauncher. In run() we will consider command line arguments and make appropriate requests to RpcServer.

public class ClientLauncher extends Launcher {
	private static final int TIMEOUT = 1000;

	@Inject
	private RpcClient client;

	@Inject
	Eventloop eventloop;

	@Override
	protected Module getModule() {
		return combine(
				ServiceGraphModule.create(),
				new ClientModule());
	}

	@Override
	protected void run() throws Exception {
		if (args.length < 2) {
			System.err.println("Command line args:\n\t--put key value\n\t--get key");
			return;
		}

		switch (args[0]) {
			case "--put":
				CompletableFuture<PutResponse> future1 = eventloop.submit(() ->
						client.sendRequest(new PutRequest(args[1], args[2]), TIMEOUT)
				);
				PutResponse putResponse = future1.get();
				System.out.println("PutResponse: " + putResponse);
				break;
			case "--get":
				CompletableFuture<GetResponse> future2 = eventloop.submit(() ->
						client.sendRequest(new GetRequest(args[1]), TIMEOUT)
				);
				GetResponse getResponse = future2.get();
				System.out.println("GetResponse: " + getResponse);
				break;
			default:
				throw new RuntimeException("Unsupported option: " + args[0]);
		}
	}

	public static void main(String[] args) throws Exception {
		ClientLauncher launcher = new ClientLauncher();
		launcher.launch(args);
	}
}

As you can see, sendRequest() method returns a CompletionStage, at which we could listen for its results asynchronously with lambdas.

Congratulations! We’ve finished writing the code for this app.

Testing

First, launch server.

Open ServerLauncher class and run its main() method.

Then make a “put” request.

Open ClientLauncher class which is located at datakernel -> examples -> remote-key-value-storage and set up program arguments to --put key1 value1. For IntelliJ IDEA: Run -> Edit configurations -> |Run/Debug Configurations -> |Program arguments -> --put key1 value1||. Then run launcher’s main() method.

You will see the following output:

put request was made successfully
previous value: null

Finally, make a “get” request.

Open ClientLauncher class again and set up program arguments to --get key1. For IntelliJ IDEA: Run -> Edit configurations -> |Run/Debug Configurations -> |Program arguments -> --get key1||. Then run main() method of the client launcher.

You will see the following output:

get request was made successfully
value: value1

Congratulations, you’ve just created a remote key-value storage with RPC communication protocol!