The Quartz H2 Server implements a high-performance, non-blocking I/O system using Linux's io_uring interface wrapped with ZIO for asynchronous operations. This implementation provides efficient network I/O operations with minimal overhead, leveraging the power of both io_uring and ZIO's effect system.
The IoUringTbl
class manages a collection of IoUring
instances, providing load balancing across multiple rings. Each ring is encapsulated in an IoUringEntry
which contains:
IoUring
instanceThe table provides methods to:
Each IoUringEntry
represents a single io_uring instance with its associated resources:
case class IoUringEntry(
q: Queue[Task[Unit]], // Task[Unit] IO wrapped op to execute later
cntr: AtomicInteger,
ring: IoUring
)
It provides synchronized wrappers for io_uring operations:
queueRead
: Enqueues a read operationqueueWrite
: Enqueues a write operationThe IOURingChannel
class implements the IOChannel
interface using io_uring for I/O operations. It provides ZIO-wrapped methods for:
The class uses ZIO.async to convert callback-based io_uring operations into ZIO effects.
The core of the implementation uses ZIO's asynchronous programming model to wrap the callback-based io_uring API:
def accept(ring: IoUring, serverSocket: IoUringServerSocket): Task[(IoUring, IoUringSocket)] = {
for {
result <- ZIO.asyncZIO[Any, Throwable, (IoUring, IoUringSocket)](cb =>
for {
f1 <- ZIO.succeed((ring: IoUring, socket: IoUringSocket) => cb(ZIO.succeed(ring, socket)))
_ <- ZIO.attempt(ioUringAccept(ring, serverSocket, f1))
} yield (Some(ZIO.unit))
)
} yield (result)
}
This pattern is used throughout the codebase to convert callback-based operations into ZIO effects:
ZIO.asyncZIO
creates an asynchronous effect that will be completed when the callback is invokedTwo key processors handle the io_uring event loop:
def submitProcessor(entry: IoUringEntry): Task[Unit]
def getCqesProcessor(entry: IoUringEntry): Task[Unit]
Both processors run in separate fibers and continue until shutdown is requested.
The number of io_uring instances is configured when starting the server. This is a critical performance parameter that should be tuned based on your system's resources and workload.
In your application code, set the number of ring instances:
// Define the number of ring instances to use
val NUMBER_OF_RING_INSTANCES = 4 // Adjust based on your needs
// Start the server with the specified number of ring instances
new QuartzH2Server(
"localhost",
8443,
16000,
ctx,
onConnect = onConnect,
onDisconnect = onDisconnect
).startIO_linuxOnly(NUMBER_OF_RING_INSTANCES, R, filter)
The size of each ring (number of entries in the submission and completion queues) can be configured when creating the IoUringTbl
:
// Default ring size is 1024, but can be customized
IoUringTbl(server, count, ringSize = 2048)
The optimal number of ring instances depends on your system's resources and workload:
When using io_uring, configure your thread pools appropriately:
override val bootstrap = zio.Runtime.removeDefaultLoggers ++ SLF4J.slf4j ++ zio.Runtime.setExecutor(
zio.Executor.fromJavaExecutor(
Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors() - NUMBER_OF_RING_INSTANCES)
)
) ++ zio.Runtime.setBlockingExecutor(zio.Executor.fromJavaExecutor(Executors.newCachedThreadPool()))
Reserve CPU cores for io_uring event processing by subtracting NUMBER_OF_RING_INSTANCES from the available processors.
Here's a complete example of configuring and starting a server with io_uring:
object MyApp extends ZIOAppDefault {
// Configure the number of ring instances
val NUMBER_OF_RING_INSTANCES = 4
// Configure thread pools to account for io_uring rings
override val bootstrap = zio.Runtime.removeDefaultLoggers ++ SLF4J.slf4j ++ zio.Runtime.setExecutor(
zio.Executor.fromJavaExecutor(
Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors() - NUMBER_OF_RING_INSTANCES)
)
) ++ zio.Runtime.setBlockingExecutor(zio.Executor.fromJavaExecutor(Executors.newCachedThreadPool()))
// Define your routes and handlers
val R: HttpRouteIO[String] = { /* your routes here */ }
def run = {
val env = ZLayer.fromZIO(ZIO.succeed("Hello ZIO World!"))
(for {
ctx <- QuartzH2Server.buildSSLContext("TLS", "keystore.jks", "password")
exitCode <- new QuartzH2Server(
"localhost",
8443,
16000,
ctx,
onConnect = id => ZIO.logTrace(s"connected - $id"),
onDisconnect = id => ZIO.logTrace(s"disconnected - $id")
).startIO_linuxOnly(NUMBER_OF_RING_INSTANCES, R, WebFilter.identity)
} yield (exitCode)).provideSomeLayer(env)
}
}
To monitor the performance of your io_uring configuration:
QuartzH2Server.setLoggingLevel(Level.TRACE)
The ZIO Quartz H2 server integrates with Linux's io_uring API through a custom JNI (Java Native Interface) layer. This section explains how the high-level Scala code interacts with the low-level native Linux io_uring implementation.
The integration follows a layered approach:
The queueRead and queueWrite functions in IoUringEntry wrap the native io_uring submission queue operations:
def queueRead(consumer: Consumer[ByteBuffer], channel: IoUringSocket, buffer: java.nio.ByteBuffer): Task[Unit] =
for {
_ <- ZIO.succeed(channel.onRead(consumer))
queueReadIO <- ZIO.succeed(ZIO.succeed(ring.queueRead(channel, buffer)))
_ <- q.offer(queueReadIO.unit)
} yield ()
def queueWrite(consumer: Consumer[ByteBuffer], channel: IoUringSocket, buffer: java.nio.ByteBuffer): Task[Unit] =
for {
_ <- ZIO.succeed(channel.onWrite(consumer))
queueWriteIO <- ZIO.succeed(ZIO.succeed(ring.queueWrite(channel, buffer)))
_ <- q.offer(queueWriteIO.unit)
} yield ()
public IoUring queueRead(AbstractIoUringChannel channel, ByteBuffer buffer) {
// Validation and preparation
fdToSocket.put(channel.fd(), channel);
long bufferAddress = IoUring.queueRead(ring, channel.fd(), buffer, buffer.position(),
buffer.limit() - buffer.position(), offset);
// Reference counting for buffer management
ReferenceCounter refCounter = channel.readBufferMap().get(bufferAddress);
if (refCounter == null) {
refCounter = new ReferenceCounter<>(buffer);
channel.readBufferMap().put(bufferAddress, refCounter);
}
refCounter.incrementReferenceCount();
return this;
}
public IoUring queueWrite(AbstractIoUringChannel channel, ByteBuffer buffer) {
// Validation and preparation
fdToSocket.put(channel.fd(), channel);
long bufferAddress = IoUring.queueWrite(ring, channel.fd(), buffer, buffer.position(),
buffer.limit() - buffer.position(), offset);
// Reference counting for buffer management
ReferenceCounter refCounter = channel.writeBufferMap().get(bufferAddress);
if (refCounter == null) {
refCounter = new ReferenceCounter<>(buffer);
channel.writeBufferMap().put(bufferAddress, refCounter);
}
refCounter.incrementReferenceCount();
return this;
}
JNIEXPORT jlong JNICALL
Java_io_quartz_iouring_IoUring_queueRead(JNIEnv *env, jclass cls, jlong ring_address,
jint fd, jobject byte_buffer,
jint buffer_pos, jint buffer_len, jlong io_offset) {
// Get direct buffer address
void *buffer = (*env)->GetDirectBufferAddress(env, byte_buffer);
// Get io_uring instance and check space
struct io_uring *ring = (struct io_uring *) ring_address;
if (io_uring_sq_space_left(ring) <= 1) {
throw_exception(env, "io_uring_sq_space_left", -EBUSY);
return -1;
}
// Get submission queue entry
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
// Prepare request metadata
struct request *req = malloc(sizeof(*req));
req->event_type = EVENT_TYPE_READ;
req->buffer_addr = (int64_t) buffer;
req->fd = fd;
// Prepare read operation
io_uring_prep_read(sqe, fd, buffer + buffer_pos, buffer_len, (uint64_t) io_offset);
io_uring_sqe_set_data(sqe, req);
return (uint64_t) buffer;
}
JNICALL
Java_io_quartz_iouring_IoUring_queueWrite(JNIEnv *env, jclass cls, jlong ring_address,
jint fd, jobject byte_buffer,
jint buffer_pos, jint buffer_len, jlong io_offset) {
// Get direct buffer address
void *buffer = (*env)->GetDirectBufferAddress(env, byte_buffer);
// Get io_uring instance and check space
struct io_uring *ring = (struct io_uring *) ring_address;
if (io_uring_sq_space_left(ring) <= 1) {
throw_exception(env, "io_uring_sq_space_left", -EBUSY);
return -1;
}
// Get submission queue entry
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
// Prepare request metadata
struct request *req = malloc(sizeof(*req));
req->event_type = EVENT_TYPE_WRITE;
req->buffer_addr = (int64_t) buffer;
req->fd = fd;
// Prepare write operation
io_uring_prep_write(sqe, fd, buffer + buffer_pos, buffer_len, (uint64_t) io_offset);
io_uring_sqe_set_data(sqe, req);
return (uint64_t) buffer;
}
The native implementation uses the liburing library to:
The getCqes method processes completion events from the io_uring instance:
def getCqesProcessor(entry: IoUringEntry): Task[Unit] = {
val processCqes = ZIO.attemptBlocking(entry.ring.getCqes(server.timeout)).catchAll { case _: Throwable =>
ZIO.logError("IoUring: ring shutdown") *> ZIO.succeed(IoUringTbl.shutdown = true) *> server.shutdown
}
// Continue until shutdown becomes true
processCqes
.repeatUntil(_ => IoUringTbl.shutdown || server.shutdownFlag)
.unit
}
public int getCqes(long timeoutMs) {
return doGetCqes(true, timeoutMs);
}
private int doGetCqes(boolean shouldWait, long timeoutMs) {
int didReadEverHappen = 0;
// Check for completion events
int count = IoUring.getCqes(ring, resultBuffer, cqes, ringSize, shouldWait, timeoutMs);
// Process each completion event
for (int i = 0; i < count && i < ringSize; i++) {
try {
int eventType = handleEventCompletion(cqes, resultBuffer, i);
if (eventType == EVENT_TYPE_READ) didReadEverHappen = 1;
} finally {
IoUring.markCqeSeen(ring, cqes, i);
}
}
return didReadEverHappen;
}
JNIEXPORT jint JNICALL
Java_io_quartz_iouring_IoUring_getCqes(JNIEnv *env, jclass cls, jlong ring_address,
jobject byte_buffer, jlong cqes_address,
jint cqes_size, jboolean should_wait, jlong timeoutMs) {
struct io_uring *ring = (struct io_uring *) ring_address;
struct io_uring_cqe **cqes = (struct io_uring_cqe **) cqes_address;
// First try to peek for completions without waiting
int32_t ret = io_uring_peek_batch_cqe(ring, cqes, cqes_size);
// If no completions and we should wait, use timeout
if (ret == 0 && should_wait) {
struct __kernel_timespec ts;
ts.tv_sec = timeoutMs / 1000;
ts.tv_nsec = (timeoutMs % 1000) * 1000000; // Convert ms to ns
ret = io_uring_wait_cqe_timeout(ring, cqes, &ts);
if (ret < 0) {
return 0; // Timeout or error
}
ret = 1;
}
// Process completion events and copy data to Java buffer
char *buffer = (*env)->GetDirectBufferAddress(env, byte_buffer);
int32_t cqe_index = 0;
int32_t buf_index = 0;
while (cqe_index < ret) {
struct io_uring_cqe *cqe = cqes[cqe_index];
struct request *req = (struct request *) cqe->user_data;
// Copy result, file descriptor, and event type to buffer
buffer[buf_index++] = cqe->res >> 24;
buffer[buf_index++] = cqe->res >> 16;
buffer[buf_index++] = cqe->res >> 8;
buffer[buf_index++] = cqe->res;
buffer[buf_index++] = req->fd >> 24;
buffer[buf_index++] = req->fd >> 16;
buffer[buf_index++] = req->fd >> 8;
buffer[buf_index++] = req->fd;
buffer[buf_index++] = req->event_type;
// For read/write operations, also include buffer address
if (req->event_type == EVENT_TYPE_READ || req->event_type == EVENT_TYPE_WRITE) {
// Copy 8-byte buffer address
buffer[buf_index++] = req->buffer_addr >> 56;
// ... (remaining bytes)
}
cqe_index++;
}
return (int32_t) ret;
}
The native implementation:
The submitProcessor function processes operations from the queue and submits them to the io_uring instance:
def submitProcessor(entry: IoUringEntry): Task[Unit] = {
val processSubmit = for {
fiber <- ZIO.descriptor
queueOpIO <- entry.q.take
_ <- queueOpIO *> ZIO.succeed(entry.ring.submit())
} yield ()
processSubmit
.catchAll { case e: Throwable =>
ZIO.logError(s"${e.toString()} - IoUring: submission queue shutdown") *>
ZIO.succeed(IoUringTbl.shutdown = true) *> server.shutdown
}
.catchAllDefect { deffect =>
ZIO.logError(s"Server halted, cannot exit from callback, critical error") *>
ZIO.succeed(deffect.printStackTrace()) *>
ZIO.succeed(IoUringTbl.shutdown = true) *> server.shutdown
}
.repeatUntil(_ => IoUringTbl.shutdown || server.shutdownFlag)
}
JNIEXPORT jint JNICALL
Java_io_quartz_iouring_IoUring_submit(JNIEnv *env, jclass cls, jlong ring_address) {
struct io_uring *ring = (struct io_uring *) ring_address;
int32_t ret = io_uring_submit(ring);
if (ret < 0) {
if (ret != -EBUSY) { // if busy, continue handling completions
throw_exception(env, "io_uring_submit", ret);
return -1;
}
}
return ret;
}
The submission process:
The complete workflow for an asynchronous I/O operation is:
The JNI implementation is designed for high performance:
Errors are handled at multiple levels: