ZIO Quartz H2 Logo

ZIO Async for Java NIO

Technical Overview

ZIO Quartz H2 provides a Java NIO-based asynchronous I/O implementation that uses Java's standard NIO.2 asynchronous channel API with ZIO's effect system to create a fully non-blocking I/O stack.

This implementation offers a cross-platform alternative to the Linux-specific io_uring implementation, making it suitable for deployment on any platform that supports Java.

Architecture Overview

The Java NIO implementation in ZIO Quartz H2 is built on three key components:

  1. ZIO's asynchronous effect system for managing I/O operations
  2. Java's AsynchronousChannelGroup and AsynchronousSocketChannel for non-blocking I/O
  3. Custom TCPChannel wrapper that bridges Java NIO with ZIO

The architecture follows a layered approach:

  • ZIO Layer: Provides functional effects, resource management, and concurrency
  • Wrapper Layer: TCPChannel and related classes that adapt Java NIO to ZIO
  • Java NIO Layer: Standard Java asynchronous I/O APIs

TCPChannel Implementation

The TCPChannel class provides a ZIO-friendly wrapper around Java's AsynchronousSocketChannel:

class TCPChannel(val ch: AsynchronousSocketChannel) extends IOChannel {
  def read(timeOut: Int): Task[Chunk[Byte]] = {
    for {
      bb <- ZIO.attempt(ByteBuffer.allocate(TCPChannel.HTTP_READ_PACKET))
      n <- TCPChannel.effectAsyncChannel[AsynchronousSocketChannel, Integer](ch)(
        c => c.read(bb, timeOut, TimeUnit.MILLISECONDS, (), _)
      )
      _ <- ZIO.fail(new java.nio.channels.ClosedChannelException).when(n < 0)
      chunk <- ZIO.attempt(Chunk.fromByteBuffer(bb.flip))
    } yield (chunk)
  }

  def write(chunk: Chunk[Byte]): Task[Int] = {
    val bb = ByteBuffer.wrap(chunk.toArray)
    write(bb)
  }

  def write(buffer: ByteBuffer): Task[Int] = {
    TCPChannel
      .effectAsyncChannel[AsynchronousSocketChannel, Integer](ch)(
        c => ch.write(buffer, (), _)
      )
      .map(_.intValue)
      .repeatWhile(_ => buffer.remaining() > 0)
  }
}

The TCPChannel implementation provides several key methods:

  • read: Asynchronously reads data from the channel into a ByteBuffer
  • write: Asynchronously writes data from a ByteBuffer or Chunk to the channel
  • readBuffer: Lower-level read operation that works with a provided ByteBuffer
  • close: Closes the underlying channel

ZIO Integration with Java NIO

The key to integrating Java NIO with ZIO is the effectAsyncChannel function, which converts Java's callback-based asynchronous operations into ZIO effects:

def effectAsyncChannel[C <: Channel, A](ch: C)(
    op: C => CompletionHandler[A, Any] => Unit
): Task[A] = {
  ZIO.asyncZIO[Any, Throwable, A](cb =>
    ZIO.attempt(op(ch))
      .flatMap(handler =>
        ZIO.attempt(handler(new CompletionHandler[A, Any] {
          def completed(result: A, u: Any): Unit = {
            cb(ZIO.succeed(result))
          }
          def failed(t: Throwable, u: Any): Unit = {
            t match {
              case e: Exception => cb(ZIO.fail(e))
              case _            => cb(ZIO.die(t))
            }
          }
        }))
      )
  )
}

This function:

  1. Takes a channel and an operation that requires a CompletionHandler
  2. Creates a ZIO.asyncZIO effect that will be completed when the CompletionHandler is called
  3. Adapts Java's callback-based API to ZIO's functional effect system
  4. Properly handles both successful results and failures

Connection and Acceptance

The TCPChannel object provides methods for creating and accepting connections:

def accept(
    sch: AsynchronousServerSocketChannel
): Task[TCPChannel] =
  effectAsyncChannel[AsynchronousServerSocketChannel, AsynchronousSocketChannel](sch)(
    c => h => { c.accept(null, h) }
  ).map(new TCPChannel(_))

def connect(
    host: String,
    port: Int,
    group: AsynchronousChannelGroup = null
): Task[TCPChannel] = {
  val T = for {
    address <- ZIO.attempt(new InetSocketAddress(host, port))
    ch <- if (group == null) ZIO.attempt(AsynchronousSocketChannel.open())
          else ZIO.attempt(AsynchronousSocketChannel.open(group))
    _ <- effectAsyncChannel[AsynchronousSocketChannel, Void](ch)(
           ch => ch.connect(address, (), _)
         )
  } yield (ch)
  T.map(c => new TCPChannel(c))
}

Server Implementation

The QuartzH2Server class provides multiple server implementations, including a Java NIO-based implementation:

def run3(
    e: ExecutorService,
    R: HttpRoute[Env],
    maxThreadNum: Int,
    maxStreams: Int,
    keepAliveMs: Int
): ZIO[Env, Throwable, ExitCode] = {
  for {
    addr <- ZIO.attempt(new InetSocketAddress(HOST, PORT))
    _ <- ZIO.logInfo("HTTP/2 h2c: QuartzH2 (async - Java NIO)")
    
    conId <- Ref.make(0L)
    group <- ZIO.attempt(AsynchronousChannelGroup.withThreadPool(e))
    server_ch <- ZIO.attempt(
      group.provider().openAsynchronousServerSocketChannel(group).bind(addr)
    )
    
    accept = ZIO.logDebug("Wait on accept") *> TCPChannel
      .accept(server_ch)
      .tap(c => ZIO.logInfo(
        s"Connect from remote peer: ${hostName(c.ch.getRemoteAddress())}"
      ))
    
    ch0 <- accept
      .tap(_ => conId.update(_ + 1))
      .flatMap(ch1 =>
        ZIO.scoped {
          ZIO.acquireRelease(ZIO.succeed(ch1))(t => t.close().ignore)
            .flatMap(t =>
              doConnect(t, conId, maxStreams, keepAliveMs, R, Chunk.empty[Byte])
                .catchAll(e => errorHandler(e).ignore)
            )
        }.fork
      )
      .catchAll(e => errorHandler(e).ignore)
      .repeatUntil(_ => shutdownFlag)
    
    _ <- ZIO.attempt(server_ch.close())
  } yield (ExitCode.success)
}

The server implementation:

  1. Creates an AsynchronousChannelGroup with a thread pool
  2. Opens and binds an AsynchronousServerSocketChannel
  3. Repeatedly accepts connections using TCPChannel.accept
  4. For each connection, creates a forked fiber that handles the connection
  5. Uses ZIO.scoped to ensure resources are properly released
  6. Continues accepting connections until shutdown is requested

Comparison with io_uring

Both the Java NIO and io_uring implementations provide asynchronous I/O capabilities, but with some key differences:

Feature Java NIO io_uring
Platform Support Cross-platform (all Java platforms) Linux-specific (kernel 5.1+)
Performance Good Excellent (lower overhead)
Implementation Pure Java JNI with native code
Completion Processing One completion per callback Multiple completions can be processed in a single call
Memory Management JVM-managed Direct buffers with manual reference counting

ZIO Integration Benefits

Both implementations benefit from ZIO integration in similar ways:

  • Composable I/O operations with ZIO's functional effects
  • Resource safety through ZIO's scoped resources
  • Structured concurrency with ZIO's fiber system
  • Unified error handling across the application
  • Seamless integration with ZIO's runtime and ecosystem

When to Use Java NIO vs io_uring

Use Java NIO when:

  • Cross-platform compatibility is required
  • Running on older Linux kernels (pre-5.1)
  • Simpler deployment without native dependencies is preferred
  • The application doesn't require extreme I/O performance

Use io_uring when:

  • Maximum I/O performance is critical
  • Running on modern Linux systems
  • Handling high-throughput workloads
  • Native library dependencies are acceptable

Requirements

  • JDK 11 or later
  • ZIO 2.x