Net Module

Features

Tiny abstraction layer on top of Eventloop and Java NIO Adapters for AsyncTcpSocket, AsyncUdpSocket:

  • support of Promises for read and write operations
  • compatibility with CSP ChannelSupplier and ChannelConsumer. AsyncTcpSocket can work as a CSP channel with built-in back pressure propagation, and can be plugged into CSP/Datastream pipeline with all its features (like buffering, compression, serialization/deserialization, data transformations, data filtering, reducing etc.)
  • extensively optimized and has almost no performance overhead, uses ByteBufPool widely

AbstractServer class serves as a foundation for building Eventloop-aware TCP servers (HTTP servers, RPC servers, TCP file services, etc.):

  • support of start/stop semantics
  • implements EventloopServer, with listen/close capabilities
  • implements WorkerServer interface, so all subclasses of AbstractServer can be readily used as worker servers
  • support of ServerSocketSettings, SocketSettings

Ready-to-use PrimaryServer implementation which works in primary Eventloops as balancer: it redistributes external accept requests to WorkerServers, which do actual accepts in their corresponding worker Eventloop threads

You can add Net module to your project by inserting dependency in pom.xml:

<dependency>
    <groupId>io.datakernel</groupId>
    <artifactId>datakernel-net</artifactId>
    <version>3.0.0-beta1</version>
</dependency>

Examples

Note: To run the examples, you need to clone DataKernel from GitHub:
$ git clone https://github.com/softindex/datakernel
And import it as a Maven project. Before running the examples, build the project.
These examples are located at datakernel -> examples-> core -> net.

Ping-Pong Socket Connection

In this example we are using an implementation of AbstractServer - SimpleServer which receives a message and sends a response (PONG). We also use AsyncTcpSocketImpl to send 3 request messages (PING).

public static void main(String[] args) throws IOException {
	Eventloop eventloop = Eventloop.create().withCurrentThread();

	SimpleServer server = SimpleServer.create(
			socket -> {
				BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket));
				repeat(() ->
						bufsSupplier.parse(PARSER)
								.whenResult(System.out::println)
								.then($ -> socket.write(wrapAscii(RESPONSE_MSG))))
						.whenComplete(($, e) -> socket.close());
			})
			.withListenAddress(ADDRESS)
			.withAcceptOnce();

	server.listen();

	AsyncTcpSocketImpl.connect(ADDRESS)
			.whenResult(socket -> {
				BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket));
				loop(0, AsyncPredicate.of(i -> i < ITERATIONS),
						i -> socket.write(wrapAscii(REQUEST_MSG))
								.then($ -> bufsSupplier.parse(PARSER)
										.whenResult(System.out::println)
										.map($2 -> i + 1)))
						.whenComplete(($, e) -> socket.close());
			})
			.whenException(e -> { throw new RuntimeException(e); });

	eventloop.run();
}
See full example on GitHub

CSP TCP Client

A simple TCP console client which connects to TCP server:

private void run() {
	System.out.println("Connecting to server at localhost (port 9922)...");
	eventloop.connect(new InetSocketAddress("localhost", 9922), new ConnectCallback() {
		@Override
		public void onConnect(@NotNull SocketChannel socketChannel) {
			System.out.println("Connected to server, enter some text and send it by pressing 'Enter'.");
			socket = AsyncTcpSocket.ofSocketChannel(socketChannel);

			BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket))
					.parseStream(ByteBufsParser.ofCrlfTerminatedBytes())
					.streamTo(ChannelConsumer.ofConsumer(buf -> System.out.println(buf.asString(UTF_8))));

			getScannerThread().start();
		}

		@Override
		public void onException(@NotNull Throwable e) {
			System.out.printf("Could not connect to server, make sure it is started: %s\n", e);
		}
	});

	eventloop.run();
}

public static void main(String[] args) {
	new TcpClientExample().run();
}

It sends characters, receives some data back through CSP channel, parses it and then prints out to console.

See full example on GitHub

CSP TCP Server

Simple TCP echo server which runs in an eventloop:

public static void main(String[] args) throws Exception {
	Eventloop eventloop = Eventloop.create().withCurrentThread();

	SimpleServer server = SimpleServer.create(socket ->
			BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket))
					.parseStream(ByteBufsParser.ofCrlfTerminatedBytes())
					.peek(buf -> System.out.println("client:" + buf.getString(UTF_8)))
					.map(buf -> {
						ByteBuf serverBuf = ByteBufStrings.wrapUtf8("Server> ");
						return ByteBufPool.append(serverBuf, buf);
					})
					.map(buf -> ByteBufPool.append(buf, CRLF))
					.streamTo(ChannelConsumer.ofSocket(socket)))
			.withListenPort(PORT);

	server.listen();

	System.out.println("Server is running");
	System.out.println("You can connect from telnet with command: telnet localhost 9922 or by running csp.TcpClientExample");

	eventloop.run();
}

This server listens for connections and when client connects, it parses its message and sends it back as CSP channel via socket.

See full example on GitHub

Datastream TCP Client

This image illustrates communication and transformations between two Datastream servers. Datastream TCP client represents Server#1:

public final class TcpClientExample {
	public static final int PORT = 9922;

	public static void main(String[] args) {
		Eventloop eventloop = Eventloop.create().withFatalErrorHandler(rethrowOnAnyError());

		eventloop.connect(new InetSocketAddress("localhost", PORT), new ConnectCallback() {
			@Override
			public void onConnect(@NotNull SocketChannel socketChannel) {
				AsyncTcpSocketImpl socket = AsyncTcpSocketImpl.wrapChannel(eventloop, socketChannel, null);

				StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
						.transformWith(ChannelSerializer.create(INT_SERIALIZER))
						.streamTo(ChannelConsumer.ofSocket(socket));

				StreamConsumerToList<String> consumer = StreamConsumerToList.create();

				ChannelSupplier.ofSocket(socket)
						.transformWith(ChannelDeserializer.create(UTF8_SERIALIZER))
						.streamTo(consumer);

				consumer.getResult()
						.whenResult(list -> list.forEach(System.out::println));
			}

			@Override
			public void onException(@NotNull Throwable e) {
				System.out.printf("Could not connect to server, make sure it is started: %s\n", e);
			}
		});

		eventloop.run();
	}
}
See this example on GitHub

Datastream TCP Server

This server represents Server#2 from the illustration above:

public final class TcpServerExample {

	public static void main(String[] args) throws IOException {
		Eventloop eventloop = Eventloop.create();

		eventloop.listen(new InetSocketAddress("localhost", TcpClientExample.PORT), ServerSocketSettings.create(100), channel -> {
			AsyncTcpSocketImpl socket = AsyncTcpSocketImpl.wrapChannel(eventloop, channel, null);

			try {
				System.out.println("Client connected: " + channel.getRemoteAddress());
			} catch (IOException e) {
				e.printStackTrace();
			}

			ChannelSupplier.ofSocket(socket)
					.transformWith(ChannelDeserializer.create(INT_SERIALIZER))
					.transformWith(StreamMapper.create(x -> x + " times 10 = " + x * 10))
					.transformWith(ChannelSerializer.create(UTF8_SERIALIZER))
					.streamTo(ChannelConsumer.ofSocket(socket));
		});

		System.out.println("Connect to the server by running datastream.TcpClientExample");

		eventloop.run();
	}
}
See this example on GitHub