ZIO Quartz H2 Logo

ZIO Async for IoUring

Technical Overview

IoUring Architecture in Quartz H2 Server

The Quartz H2 Server implements a high-performance, non-blocking I/O system using Linux's io_uring interface wrapped with ZIO for asynchronous operations. This implementation provides efficient network I/O operations with minimal overhead, leveraging the power of both io_uring and ZIO's effect system.

Key Components

IoUringTbl

The IoUringTbl class manages a collection of IoUring instances, providing load balancing across multiple rings. Each ring is encapsulated in an IoUringEntry which contains:

  • A bounded queue for task submission
  • An atomic counter for reference counting
  • The actual IoUring instance

The table provides methods to:

  • Get the least used ring based on reference counters
  • Release a ring when it's no longer needed
  • Close all rings during shutdown

IoUringEntry

Each IoUringEntry represents a single io_uring instance with its associated resources:

case class IoUringEntry(
    q: Queue[Task[Unit]], // Task[Unit] IO wrapped op to execute later
    cntr: AtomicInteger,
    ring: IoUring
)

It provides synchronized wrappers for io_uring operations:

  • queueRead: Enqueues a read operation
  • queueWrite: Enqueues a write operation

IOURingChannel

The IOURingChannel class implements the IOChannel interface using io_uring for I/O operations. It provides ZIO-wrapped methods for:

  • Reading data from a socket
  • Writing data to a socket
  • Closing connections

The class uses ZIO.async to convert callback-based io_uring operations into ZIO effects.

ZIO.async Implementation

The core of the implementation uses ZIO's asynchronous programming model to wrap the callback-based io_uring API:

def accept(ring: IoUring, serverSocket: IoUringServerSocket): Task[(IoUring, IoUringSocket)] = {
  for {
    result <- ZIO.asyncZIO[Any, Throwable, (IoUring, IoUringSocket)](cb =>
      for {
        f1 <- ZIO.succeed((ring: IoUring, socket: IoUringSocket) => cb(ZIO.succeed(ring, socket)))
        _ <- ZIO.attempt(ioUringAccept(ring, serverSocket, f1))
      } yield (Some(ZIO.unit))
    )
  } yield (result)
}

This pattern is used throughout the codebase to convert callback-based operations into ZIO effects:

  1. ZIO.asyncZIO creates an asynchronous effect that will be completed when the callback is invoked
  2. A callback function is created that will complete the ZIO effect when invoked
  3. The io_uring operation is initiated with the callback
  4. The ZIO effect is returned, which will complete when the io_uring operation completes

Event Processing

Two key processors handle the io_uring event loop:

  1. Submit Processor: Takes operations from the queue and submits them to the io_uring instance
    def submitProcessor(entry: IoUringEntry): Task[Unit]
  2. CQEs Processor: Processes completion queue events from the io_uring instance
    def getCqesProcessor(entry: IoUringEntry): Task[Unit]

Both processors run in separate fibers and continue until shutdown is requested.

User Guide

Configuring IoUring Rings

Setting the Number of Ring Instances

The number of io_uring instances is configured when starting the server. This is a critical performance parameter that should be tuned based on your system's resources and workload.

In your application code, set the number of ring instances:

// Define the number of ring instances to use
val NUMBER_OF_RING_INSTANCES = 4  // Adjust based on your needs

// Start the server with the specified number of ring instances
new QuartzH2Server(
  "localhost",
  8443,
  16000,
  ctx,
  onConnect = onConnect,
  onDisconnect = onDisconnect
).startIO_linuxOnly(NUMBER_OF_RING_INSTANCES, R, filter)

Ring Size Configuration

The size of each ring (number of entries in the submission and completion queues) can be configured when creating the IoUringTbl:

// Default ring size is 1024, but can be customized
IoUringTbl(server, count, ringSize = 2048)

Performance Considerations

Optimal Number of Rings

The optimal number of ring instances depends on your system's resources and workload:

  • CPU Cores: Generally, set NUMBER_OF_RING_INSTANCES to (total cores - 1) or less
  • Memory: Each ring consumes memory for its queues and buffers
  • Workload: I/O-bound applications may benefit from more rings

Thread Pool Configuration

When using io_uring, configure your thread pools appropriately:

override val bootstrap = zio.Runtime.removeDefaultLoggers ++ SLF4J.slf4j ++ zio.Runtime.setExecutor(
  zio.Executor.fromJavaExecutor(
    Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors() - NUMBER_OF_RING_INSTANCES)
  )
) ++ zio.Runtime.setBlockingExecutor(zio.Executor.fromJavaExecutor(Executors.newCachedThreadPool()))

Reserve CPU cores for io_uring event processing by subtracting NUMBER_OF_RING_INSTANCES from the available processors.

Example Configuration

Here's a complete example of configuring and starting a server with io_uring:

object MyApp extends ZIOAppDefault {
  // Configure the number of ring instances
  val NUMBER_OF_RING_INSTANCES = 4

  // Configure thread pools to account for io_uring rings
  override val bootstrap = zio.Runtime.removeDefaultLoggers ++ SLF4J.slf4j ++ zio.Runtime.setExecutor(
    zio.Executor.fromJavaExecutor(
      Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors() - NUMBER_OF_RING_INSTANCES)
    )
  ) ++ zio.Runtime.setBlockingExecutor(zio.Executor.fromJavaExecutor(Executors.newCachedThreadPool()))

  // Define your routes and handlers
  val R: HttpRouteIO[String] = { /* your routes here */ }

  def run = {
    val env = ZLayer.fromZIO(ZIO.succeed("Hello ZIO World!"))
    (for {
      ctx <- QuartzH2Server.buildSSLContext("TLS", "keystore.jks", "password")
      exitCode <- new QuartzH2Server(
        "localhost",
        8443,
        16000,
        ctx,
        onConnect = id => ZIO.logTrace(s"connected - $id"),
        onDisconnect = id => ZIO.logTrace(s"disconnected - $id")
      ).startIO_linuxOnly(NUMBER_OF_RING_INSTANCES, R, WebFilter.identity)
    } yield (exitCode)).provideSomeLayer(env)
  }
}

Monitoring and Debugging

To monitor the performance of your io_uring configuration:

  1. Enable trace logging to see connection and io_uring events:
    QuartzH2Server.setLoggingLevel(Level.TRACE)
  2. Watch for log messages related to ring operations and completion events
  3. Monitor system metrics like CPU usage and I/O wait times

Low-Level JNI Implementation

Native Linux io_uring Integration

The ZIO Quartz H2 server integrates with Linux's io_uring API through a custom JNI (Java Native Interface) layer. This section explains how the high-level Scala code interacts with the low-level native Linux io_uring implementation.

Architecture Overview

The integration follows a layered approach:

  1. Scala Layer: High-level ZIO effects and abstractions in IoUringTbl.scala
  2. Java Layer: JNI interface classes in the io.quartz.iouring package
  3. Native Layer: C implementation in liburing_provider.c that directly interfaces with the Linux kernel's io_uring API

Key Native Functions

1. Queue Operations (queueRead/queueWrite)

The queueRead and queueWrite functions in IoUringEntry wrap the native io_uring submission queue operations:

Scala Layer (IoUringEntry)
def queueRead(consumer: Consumer[ByteBuffer], channel: IoUringSocket, buffer: java.nio.ByteBuffer): Task[Unit] =
  for {
    _ <- ZIO.succeed(channel.onRead(consumer))
    queueReadIO <- ZIO.succeed(ZIO.succeed(ring.queueRead(channel, buffer)))
    _ <- q.offer(queueReadIO.unit)
  } yield ()

def queueWrite(consumer: Consumer[ByteBuffer], channel: IoUringSocket, buffer: java.nio.ByteBuffer): Task[Unit] =
  for {
    _ <- ZIO.succeed(channel.onWrite(consumer))
    queueWriteIO <- ZIO.succeed(ZIO.succeed(ring.queueWrite(channel, buffer)))
    _ <- q.offer(queueWriteIO.unit)
  } yield ()
Java Layer (IoUring.java)
public IoUring queueRead(AbstractIoUringChannel channel, ByteBuffer buffer) {
    // Validation and preparation
    fdToSocket.put(channel.fd(), channel);
    long bufferAddress = IoUring.queueRead(ring, channel.fd(), buffer, buffer.position(),
            buffer.limit() - buffer.position(), offset);
    // Reference counting for buffer management
    ReferenceCounter refCounter = channel.readBufferMap().get(bufferAddress);
    if (refCounter == null) {
        refCounter = new ReferenceCounter<>(buffer);
        channel.readBufferMap().put(bufferAddress, refCounter);
    }
    refCounter.incrementReferenceCount();
    return this;
}

public IoUring queueWrite(AbstractIoUringChannel channel, ByteBuffer buffer) {
    // Validation and preparation
    fdToSocket.put(channel.fd(), channel);
    long bufferAddress = IoUring.queueWrite(ring, channel.fd(), buffer, buffer.position(),
            buffer.limit() - buffer.position(), offset);
    // Reference counting for buffer management
    ReferenceCounter refCounter = channel.writeBufferMap().get(bufferAddress);
    if (refCounter == null) {
        refCounter = new ReferenceCounter<>(buffer);
        channel.writeBufferMap().put(bufferAddress, refCounter);
    }
    refCounter.incrementReferenceCount();
    return this;
}
Native Layer (liburing_provider.c)
JNIEXPORT jlong JNICALL
Java_io_quartz_iouring_IoUring_queueRead(JNIEnv *env, jclass cls, jlong ring_address, 
                                        jint fd, jobject byte_buffer, 
                                        jint buffer_pos, jint buffer_len, jlong io_offset) {
    // Get direct buffer address
    void *buffer = (*env)->GetDirectBufferAddress(env, byte_buffer);
    
    // Get io_uring instance and check space
    struct io_uring *ring = (struct io_uring *) ring_address;
    if (io_uring_sq_space_left(ring) <= 1) {
        throw_exception(env, "io_uring_sq_space_left", -EBUSY);
        return -1;
    }
    
    // Get submission queue entry
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    
    // Prepare request metadata
    struct request *req = malloc(sizeof(*req));
    req->event_type = EVENT_TYPE_READ;
    req->buffer_addr = (int64_t) buffer;
    req->fd = fd;
    
    // Prepare read operation
    io_uring_prep_read(sqe, fd, buffer + buffer_pos, buffer_len, (uint64_t) io_offset);
    io_uring_sqe_set_data(sqe, req);
    
    return (uint64_t) buffer;
}

JNICALL
Java_io_quartz_iouring_IoUring_queueWrite(JNIEnv *env, jclass cls, jlong ring_address, 
                                        jint fd, jobject byte_buffer, 
                                        jint buffer_pos, jint buffer_len, jlong io_offset) {
    // Get direct buffer address
    void *buffer = (*env)->GetDirectBufferAddress(env, byte_buffer);
    
    // Get io_uring instance and check space
    struct io_uring *ring = (struct io_uring *) ring_address;
    if (io_uring_sq_space_left(ring) <= 1) {
        throw_exception(env, "io_uring_sq_space_left", -EBUSY);
        return -1;
    }
    
    // Get submission queue entry
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    
    // Prepare request metadata
    struct request *req = malloc(sizeof(*req));
    req->event_type = EVENT_TYPE_WRITE;
    req->buffer_addr = (int64_t) buffer;
    req->fd = fd;
    
    // Prepare write operation
    io_uring_prep_write(sqe, fd, buffer + buffer_pos, buffer_len, (uint64_t) io_offset);
    io_uring_sqe_set_data(sqe, req);
    
    return (uint64_t) buffer;
}

The native implementation uses the liburing library to:

  1. Obtain a submission queue entry (SQE) from the io_uring instance
  2. Prepare the read/write operation with the appropriate file descriptor, buffer, and offset
  3. Associate user data with the SQE for later identification when the operation completes
  4. Return the buffer address for reference counting in the Java layer

2. Completion Queue Processing (getCqes)

The getCqes method processes completion events from the io_uring instance:

Scala Layer (IoUringTbl)
def getCqesProcessor(entry: IoUringEntry): Task[Unit] = {
  val processCqes = ZIO.attemptBlocking(entry.ring.getCqes(server.timeout)).catchAll { case _: Throwable =>
    ZIO.logError("IoUring: ring shutdown") *> ZIO.succeed(IoUringTbl.shutdown = true) *> server.shutdown
  }
  // Continue until shutdown becomes true
  processCqes
    .repeatUntil(_ => IoUringTbl.shutdown || server.shutdownFlag)
    .unit
}
Java Layer (IoUring.java)
public int getCqes(long timeoutMs) {
    return doGetCqes(true, timeoutMs);
}

private int doGetCqes(boolean shouldWait, long timeoutMs) {
    int didReadEverHappen = 0;    
    
    // Check for completion events
    int count = IoUring.getCqes(ring, resultBuffer, cqes, ringSize, shouldWait, timeoutMs);
    
    // Process each completion event
    for (int i = 0; i < count && i < ringSize; i++) {
        try {
            int eventType = handleEventCompletion(cqes, resultBuffer, i);
            if (eventType == EVENT_TYPE_READ) didReadEverHappen = 1;
        } finally {
            IoUring.markCqeSeen(ring, cqes, i);
        }
    }
    
    return didReadEverHappen;
}
Native Layer (liburing_provider.c)
JNIEXPORT jint JNICALL
Java_io_quartz_iouring_IoUring_getCqes(JNIEnv *env, jclass cls, jlong ring_address, 
                                      jobject byte_buffer, jlong cqes_address, 
                                      jint cqes_size, jboolean should_wait, jlong timeoutMs) {
    struct io_uring *ring = (struct io_uring *) ring_address;
    struct io_uring_cqe **cqes = (struct io_uring_cqe **) cqes_address;
    
    // First try to peek for completions without waiting
    int32_t ret = io_uring_peek_batch_cqe(ring, cqes, cqes_size);
    
    // If no completions and we should wait, use timeout
    if (ret == 0 && should_wait) {
        struct __kernel_timespec ts;
        ts.tv_sec = timeoutMs / 1000;
        ts.tv_nsec = (timeoutMs % 1000) * 1000000;  // Convert ms to ns
        ret = io_uring_wait_cqe_timeout(ring, cqes, &ts);
        if (ret < 0) {
            return 0; // Timeout or error
        }
        ret = 1;
    }
    
    // Process completion events and copy data to Java buffer
    char *buffer = (*env)->GetDirectBufferAddress(env, byte_buffer);
    int32_t cqe_index = 0;
    int32_t buf_index = 0;
    
    while (cqe_index < ret) {
        struct io_uring_cqe *cqe = cqes[cqe_index];
        struct request *req = (struct request *) cqe->user_data;
        
        // Copy result, file descriptor, and event type to buffer
        buffer[buf_index++] = cqe->res >> 24;
        buffer[buf_index++] = cqe->res >> 16;
        buffer[buf_index++] = cqe->res >> 8;
        buffer[buf_index++] = cqe->res;
        
        buffer[buf_index++] = req->fd >> 24;
        buffer[buf_index++] = req->fd >> 16;
        buffer[buf_index++] = req->fd >> 8;
        buffer[buf_index++] = req->fd;
        
        buffer[buf_index++] = req->event_type;
        
        // For read/write operations, also include buffer address
        if (req->event_type == EVENT_TYPE_READ || req->event_type == EVENT_TYPE_WRITE) {
            // Copy 8-byte buffer address
            buffer[buf_index++] = req->buffer_addr >> 56;
            // ... (remaining bytes)
        }
        
        cqe_index++;
    }
    
    return (int32_t) ret;
}

The native implementation:

  1. First tries to peek for any completed events without waiting
  2. If instructed to wait and no events are available, uses io_uring_wait_cqe_timeout to wait with a timeout
  3. For each completion event, extracts the result code, file descriptor, and event type
  4. Serializes this information into a byte buffer that's passed back to Java
  5. For read/write operations, also includes the buffer address for reference counting

3. Submission Processing (submitProcessor)

The submitProcessor function processes operations from the queue and submits them to the io_uring instance:

Scala Implementation
def submitProcessor(entry: IoUringEntry): Task[Unit] = {
  val processSubmit = for {
    fiber <- ZIO.descriptor
    queueOpIO <- entry.q.take
    _ <- queueOpIO *> ZIO.succeed(entry.ring.submit())
  } yield ()

  processSubmit
    .catchAll { case e: Throwable =>
      ZIO.logError(s"${e.toString()} - IoUring: submission queue shutdown") *>
        ZIO.succeed(IoUringTbl.shutdown = true) *> server.shutdown
    }
    .catchAllDefect { deffect =>
      ZIO.logError(s"Server halted, cannot exit from callback, critical error") *>
        ZIO.succeed(deffect.printStackTrace()) *>
        ZIO.succeed(IoUringTbl.shutdown = true) *> server.shutdown
    }
    .repeatUntil(_ => IoUringTbl.shutdown || server.shutdownFlag)
}
Native Submit Implementation
JNIEXPORT jint JNICALL
Java_io_quartz_iouring_IoUring_submit(JNIEnv *env, jclass cls, jlong ring_address) {
    struct io_uring *ring = (struct io_uring *) ring_address;

    int32_t ret = io_uring_submit(ring);
    if (ret < 0) {
        if (ret != -EBUSY) { // if busy, continue handling completions
            throw_exception(env, "io_uring_submit", ret);
            return -1;
        }
    }
    return ret;
}

The submission process:

  1. Takes queued operations from the bounded queue
  2. Executes the operation, which prepares an SQE as shown in the queueRead/queueWrite sections
  3. Calls the native submit method to submit all prepared operations to the kernel
  4. Handles errors and continues processing until shutdown is requested

Complete Asynchronous Workflow

The complete workflow for an asynchronous I/O operation is:

  1. Operation Queuing: A ZIO effect calls queueRead/queueWrite, which prepares the operation and adds it to the queue
  2. Submission Processing: The submitProcessor fiber takes operations from the queue and submits them to the kernel via io_uring_submit
  3. Kernel Processing: The Linux kernel processes the operations asynchronously
  4. Completion Processing: The getCqesProcessor fiber waits for and processes completion events
  5. Callback Execution: When a completion is detected, the appropriate callback (onRead/onWrite) is invoked with the result
  6. ZIO Effect Completion: The ZIO.async effect completes, allowing the original caller to continue

Performance Considerations

The JNI implementation is designed for high performance:

  • Direct ByteBuffers: All I/O operations use direct ByteBuffers to minimize copying between Java and native memory
  • Reference Counting: The implementation uses reference counting to manage buffer lifetimes and prevent memory leaks
  • Batch Processing: The native code can process multiple completion events in a single call
  • Non-blocking I/O: The io_uring API provides true asynchronous I/O without blocking threads
  • Timeout Control: The getCqes method supports timeouts to prevent indefinite blocking

Error Handling

Errors are handled at multiple levels:

  • Native Level: Errors from io_uring operations are converted to Java exceptions
  • Java Level: Exceptions are caught and passed to registered exception handlers
  • Scala Level: ZIO's error handling captures exceptions and converts them to ZIO failures

Requirements

  • Linux kernel 5.1 or later (for io_uring support)
  • JDK 11 or later
  • ZIO 2.x
  • Native io_uring libraries properly installed