ZIO Quartz H2 ZStreams Integration

ZStreams and Quartz H2: A Powerful Integration

ZIO Quartz H2 seamlessly integrates with ZIO's streaming ecosystem, providing a reactive, high-performance HTTP/2 server with sophisticated flow control and backpressure mechanisms. This integration offers significant advantages for building modern, resource-efficient web applications.

Key Benefits

  • End-to-End Backpressure: Complete backpressure chain from the network socket to your application logic
  • Reactive Flow Control: Adaptive system that responds to application processing capabilities in real-time
  • Resource Efficiency: Optimal memory and CPU utilization even under variable load conditions
  • Non-Blocking Operations: Leverages ZIO's concurrency primitives for maximum throughput without resource exhaustion

ZStream Examples from Run.scala

File Upload with ZStream

Using ZStream to handle file uploads efficiently:

case req @ POST -> Root / "upload" / StringVar(file) =>
  val FOLDER_PATH = "/Users/ostrygun/web_root/"
  val FILE = s"$file"
  for {
    jpath <- ZIO.attempt(new java.io.File(FOLDER_PATH + FILE))
    u <- req.stream.run(ZSink.fromFile(jpath))
  } yield (Response.Ok().asText("OK"))

Counting Stream Bytes

Using ZStream to count bytes in a request:

case req @ GET -> Root =>
  for {
    x <- req.stream.runCount
  } yield (Response.Ok().asText(s"OK bytes received: $x"))

Collecting Stream Data

Using ZStream to collect all data from a request:

case req @ POST -> Root =>
  for {
    u <- req.stream.runCollect
  } yield (Response.Ok().asText("OK:" + String(u.toArray)))

Draining a Stream

Using ZStream to properly drain incoming data:

case req @ GET -> Root / "snihost" =>
  for {
    _ <- req.stream.runDrain // properly ignore incoming data, we must flush it
    result_text <- ZIO.attempt(req.sniServerNames match {
      case Some(hosts) => s"Host names in TLS SNI extension: ${hosts.mkString(",")}"
      case None        => "No TLS SNI host names provided or unsecure connection"
    })
  } yield (Response.Ok().asText(result_text))

Streaming Response with Custom Chunks

Creating a stream from custom chunks for response:

case "localhost" ! GET -> Root / "example" =>
  // how to send data in separate H2 packets of various size.
  val ts = ZStream.fromChunks(Chunk.fromArray("Block1\n".getBytes()), Chunk.fromArray("Block22\n".getBytes()))
  ZIO.attempt(Response.Ok().asStream(ts))

Streaming File Response

Using ZStream to efficiently serve file contents:

case GET -> Root / StringVar(file) =>
  val FOLDER_PATH = "/home/ols/web_root/"
  val FILE = s"$file"
  val BLOCK_SIZE = 1024 * 14
  for {
    jpath <- ZIO.attempt(new java.io.File(FOLDER_PATH + FILE))
    present <- ZIO.attempt(jpath.exists())
    _ <- ZIO.fail(new java.io.FileNotFoundException(jpath.toString())).when(present == false)
  } yield (Response
    .Ok()
    .asStream(ZStream.fromFile(jpath, BLOCK_SIZE))
    .contentType(ContentType.contentTypeFromFileName(FILE)))