ZIO Quartz H2 provides a Java NIO-based asynchronous I/O implementation that uses Java's standard NIO.2 asynchronous channel API with ZIO's effect system to create a fully non-blocking I/O stack.
This implementation offers a cross-platform alternative to the Linux-specific io_uring implementation, making it suitable for deployment on any platform that supports Java.
The Java NIO implementation in ZIO Quartz H2 is built on three key components:
The architecture follows a layered approach:
The TCPChannel
class provides a ZIO-friendly wrapper around Java's AsynchronousSocketChannel
:
class TCPChannel(val ch: AsynchronousSocketChannel) extends IOChannel {
def read(timeOut: Int): Task[Chunk[Byte]] = {
for {
bb <- ZIO.attempt(ByteBuffer.allocate(TCPChannel.HTTP_READ_PACKET))
n <- TCPChannel.effectAsyncChannel[AsynchronousSocketChannel, Integer](ch)(
c => c.read(bb, timeOut, TimeUnit.MILLISECONDS, (), _)
)
_ <- ZIO.fail(new java.nio.channels.ClosedChannelException).when(n < 0)
chunk <- ZIO.attempt(Chunk.fromByteBuffer(bb.flip))
} yield (chunk)
}
def write(chunk: Chunk[Byte]): Task[Int] = {
val bb = ByteBuffer.wrap(chunk.toArray)
write(bb)
}
def write(buffer: ByteBuffer): Task[Int] = {
TCPChannel
.effectAsyncChannel[AsynchronousSocketChannel, Integer](ch)(
c => ch.write(buffer, (), _)
)
.map(_.intValue)
.repeatWhile(_ => buffer.remaining() > 0)
}
}
The TCPChannel implementation provides several key methods:
read
: Asynchronously reads data from the channel into a ByteBufferwrite
: Asynchronously writes data from a ByteBuffer or Chunk to the channelreadBuffer
: Lower-level read operation that works with a provided ByteBufferclose
: Closes the underlying channel
The key to integrating Java NIO with ZIO is the effectAsyncChannel
function, which converts
Java's callback-based asynchronous operations into ZIO effects:
def effectAsyncChannel[C <: Channel, A](ch: C)(
op: C => CompletionHandler[A, Any] => Unit
): Task[A] = {
ZIO.asyncZIO[Any, Throwable, A](cb =>
ZIO.attempt(op(ch))
.flatMap(handler =>
ZIO.attempt(handler(new CompletionHandler[A, Any] {
def completed(result: A, u: Any): Unit = {
cb(ZIO.succeed(result))
}
def failed(t: Throwable, u: Any): Unit = {
t match {
case e: Exception => cb(ZIO.fail(e))
case _ => cb(ZIO.die(t))
}
}
}))
)
)
}
This function:
The TCPChannel object provides methods for creating and accepting connections:
def accept(
sch: AsynchronousServerSocketChannel
): Task[TCPChannel] =
effectAsyncChannel[AsynchronousServerSocketChannel, AsynchronousSocketChannel](sch)(
c => h => { c.accept(null, h) }
).map(new TCPChannel(_))
def connect(
host: String,
port: Int,
group: AsynchronousChannelGroup = null
): Task[TCPChannel] = {
val T = for {
address <- ZIO.attempt(new InetSocketAddress(host, port))
ch <- if (group == null) ZIO.attempt(AsynchronousSocketChannel.open())
else ZIO.attempt(AsynchronousSocketChannel.open(group))
_ <- effectAsyncChannel[AsynchronousSocketChannel, Void](ch)(
ch => ch.connect(address, (), _)
)
} yield (ch)
T.map(c => new TCPChannel(c))
}
The QuartzH2Server
class provides multiple server implementations, including a Java NIO-based implementation:
def run3(
e: ExecutorService,
R: HttpRoute[Env],
maxThreadNum: Int,
maxStreams: Int,
keepAliveMs: Int
): ZIO[Env, Throwable, ExitCode] = {
for {
addr <- ZIO.attempt(new InetSocketAddress(HOST, PORT))
_ <- ZIO.logInfo("HTTP/2 h2c: QuartzH2 (async - Java NIO)")
conId <- Ref.make(0L)
group <- ZIO.attempt(AsynchronousChannelGroup.withThreadPool(e))
server_ch <- ZIO.attempt(
group.provider().openAsynchronousServerSocketChannel(group).bind(addr)
)
accept = ZIO.logDebug("Wait on accept") *> TCPChannel
.accept(server_ch)
.tap(c => ZIO.logInfo(
s"Connect from remote peer: ${hostName(c.ch.getRemoteAddress())}"
))
ch0 <- accept
.tap(_ => conId.update(_ + 1))
.flatMap(ch1 =>
ZIO.scoped {
ZIO.acquireRelease(ZIO.succeed(ch1))(t => t.close().ignore)
.flatMap(t =>
doConnect(t, conId, maxStreams, keepAliveMs, R, Chunk.empty[Byte])
.catchAll(e => errorHandler(e).ignore)
)
}.fork
)
.catchAll(e => errorHandler(e).ignore)
.repeatUntil(_ => shutdownFlag)
_ <- ZIO.attempt(server_ch.close())
} yield (ExitCode.success)
}
The server implementation:
Both the Java NIO and io_uring implementations provide asynchronous I/O capabilities, but with some key differences:
Feature | Java NIO | io_uring |
---|---|---|
Platform Support | Cross-platform (all Java platforms) | Linux-specific (kernel 5.1+) |
Performance | Good | Excellent (lower overhead) |
Implementation | Pure Java | JNI with native code |
Completion Processing | One completion per callback | Multiple completions can be processed in a single call |
Memory Management | JVM-managed | Direct buffers with manual reference counting |
Both implementations benefit from ZIO integration in similar ways:
Use Java NIO when:
Use io_uring when: