Quartz-H2 implements a cutting-edge reactive flow control system that seamlessly integrates with the fs2 streaming ecosystem. This implementation goes beyond the standard HTTP/2 flow control requirements to deliver exceptional performance, stability, and resource efficiency.
The inbound data flow in Quartz-H2 is regulated by a sophisticated backpressure mechanism that adapts to application processing capabilities in real-time:
bytesOfPendingInboundData
and inboundWindow
) creates a feedback loop that ensures optimal resource utilization even under variable load conditions.The outbound data flow is equally well-regulated, ensuring efficient data delivery without overwhelming receivers:
While most HTTP/2 implementations merely satisfy the specification requirements, Quartz-H2 takes flow control to the next level:
Quartz-H2 offers unparalleled ease of integration with your fs2-based applications:
HTTP/2 flow control is a mechanism that prevents senders from overwhelming receivers with data. It operates at two levels:
Quartz-H2 implements both levels with a sophisticated reactive approach that ensures optimal performance.
Quartz-H2 uses several reference counters to track window sizes and data consumption:
// Connection level
globalTransmitWindow: Ref[IO, Long]
globalBytesOfPendingInboundData: Ref[IO, Long]
globalInboundWindow: Ref[IO, Long]
// Stream level (in Http2StreamCommon)
bytesOfPendingInboundData: Ref[IO, Long]
inboundWindow: Ref[IO, Long]
transmitWindow: Ref[IO, Long]
Quartz-H2 uses specialized queues to coordinate flow control operations in a non-blocking, reactive manner:
// Flow control synchronization queues
xFlowSync <- Queue.unbounded[IO, Boolean] // Flow control sync queue
updSyncQ <- Queue.dropping[IO, Boolean](1) // Window update queue
The xFlowSync
queue is used to signal when data transmission can proceed, while the updSyncQ
queue coordinates window updates to prevent excessive WINDOW_UPDATE frames.
The windowsUpdate
method is the core of the flow control implementation:
private[this] def windowsUpdate(
c: Http2ConnectionCommon,
streamId: Int,
received: Ref[IO, Long],
window: Ref[IO, Long],
len: Int
) =
for {
bytes_received <- received.getAndUpdate(_ - len)
bytes_available <- window.getAndUpdate(_ - len)
send_update <- IO(
bytes_received < c.INITIAL_WINDOW_SIZE * 0.7 && bytes_available < c.INITIAL_WINDOW_SIZE * 0.3
)
upd = c.INITIAL_WINDOW_SIZE - bytes_available.toInt
_ <- (c.sendFrame(Frames.mkWindowUpdateFrame(streamId, upd)) *> window
.update(_ + upd) *> Logger[IO].debug(s"Send UPDATE_WINDOW $upd streamId= $streamId")).whenA(send_update)
} yield (send_update)
This method implements a sophisticated threshold-based approach to window updates:
received
counter (tracking consumed data) and the window
counter (tracking available window)The integration with fs2 streams is elegant and efficient:
dataEvalEffectProducer
methodStream
via makeDataStream
Quartz-H2 implements a sophisticated cancellation mechanism using a double offer(false)
pattern:
// In dropStreams()
_ <- streams.traverse(s0 => s0.outXFlowSync.offer(false) *> s0.outXFlowSync.offer(false))
This pattern ensures proper cleanup of resources when streams are terminated or when the connection is shut down:
offer(false)
unblocks transmissions waiting to send remaining dataoffer(false)
unblocks transmissions waiting for window creditfalse
value is received, it raises a QH2InterruptException
to properly terminate the operationZIO Quartz H2 implements a cutting-edge reactive flow control system that seamlessly integrates with the ZIO streaming ecosystem. This implementation goes beyond the standard HTTP/2 flow control requirements to deliver exceptional performance, stability, and resource efficiency.
The server maintains several types of flow control windows, implemented as Ref[Long]
values, which are atomic references that can be safely updated in a concurrent environment:
ZIO Queues play a central role in the flow control mechanism of ZIO Quartz H2. They provide a thread-safe way to handle data packets and control the flow of information between different parts of the system.
inDataQ
queue that holds incoming data frames. When data is received, it's placed in this queue:
_ <- c.inDataQ.offer(bb)
This queue acts as a buffer between the network layer and the application, allowing for asynchronous processing of data.
b <- stream.outXFlowSync.take
_ <- ZIO.when(b == false)(ZIO.fail(new java.lang.InterruptedException()))
_ <- txWindow_Transmit(stream, bb, data_len)
ZIO Queues provide natural backpressure capabilities. When a queue becomes full, any attempt to offer more elements will be suspended until space becomes available. This creates an automatic backpressure mechanism that propagates through the entire system.
In ZIO Quartz H2, this backpressure is integrated with the HTTP/2 flow control windows to create a complete end-to-end backpressure chain from the network socket to the application logic.
ZIO Streams are used extensively in ZIO Quartz H2 to process data in a functional and composable way. The flow control mechanism is tightly integrated with these streams.
The makePacketStream
method creates a ZStream that reads from the IOChannel and transforms raw bytes into HTTP/2 packets:
def makePacketStream(ch: IOChannel, keepAliveMs: Int, leftOver: Chunk[Byte]): ZStream[Any, Throwable, Chunk[Byte]]
This stream is then processed using the foreach
method to handle each packet:
Http2Connection
.makePacketStream(ch, HTTP2_KEEP_ALIVE_MS, leftOver)
.foreach(packet => { packet_handler(httpReq11, packet) })
The packetStreamPipe
is a ZPipeline that transforms a stream of bytes into a stream of HTTP/2 frames:
def packetStreamPipe: ZPipeline[Any, Exception, Byte, Chunk[Byte]]
This pipeline is a functional description of the transformation from raw bytes to HTTP/2 packets, not just a data container. It allows for composition with other transformations in a clean, functional way.
When data frames are received, the server:
incrementGlobalPendingInboundData
and bytesOfPendingInboundData.update
.inDataQ.offer
).The decision to send WINDOW_UPDATE frames is based on the following conditions:
send_update <- ZIO.succeed(
bytes_received < c.INITIAL_WINDOW_SIZE * 0.7 && bytes_available < c.INITIAL_WINDOW_SIZE * 0.3
)
This ensures that WINDOW_UPDATE frames are sent when the available window is less than 30% of the initial window size and the received bytes are less than 70% of the initial window size, optimizing network usage.
When sending data frames, the server:
The txWindow_Transmit
method handles this logic, ensuring that data is only sent when sufficient window space is available:
for {
tx_g <- globalTransmitWindow.get
tx_l <- stream.transmitWindow.get
bytesCredit <- ZIO.succeed(Math.min(tx_g, tx_l))
_ <-
if (bytesCredit > 0)
// Send data and update windows
else
// Wait for window update
} yield (bytesCredit)
The server processes WINDOW_UPDATE frames from peers through the updateWindow
method, which:
For stream-specific updates:
private[this] def updateWindowStream(streamId: Int, inc: Int) = {
streamTbl.get(streamId) match {
case None => ZIO.logDebug(s"Update window, streamId=$streamId invalid or closed already")
case Some(stream) => updateAndCheck(streamId, stream, inc)
}
}
For global updates, the server updates all stream windows as well:
if (streamId == 0)
updateAndCheckGlobalTx(streamId, inc) *>
ZIO.foreach(streamTbl.values.toSeq)(stream => updateAndCheck(streamId, stream, inc)).unit
else
updateWindowStream(streamId, inc)
The ZIO Quartz H2 server implements adaptive flow control that responds to application processing rates. This is achieved through:
outXFlowSync
queue is used to synchronize data transmission with window availability.The server provides end-to-end backpressure from the network socket to the application logic:
This complete chain ensures that all components of the system operate within their capacity, preventing resource exhaustion and optimizing performance.
The flow control implementation is tightly integrated with ZIO's resource management:
The ZIO Quartz H2 server's flow control implementation provides a robust, adaptive mechanism for managing data flow in HTTP/2 connections. By integrating with ZIO's concurrency primitives and resource management, it ensures efficient operation even under high load conditions.
The use of ZIO Queues and ZStreams creates a functional, composable system that is both powerful and easy to reason about. This approach allows for precise control over data flow while maintaining the benefits of functional programming.