Quartz H2 Streaming Integration

FS2 Streams and Quartz H2: A Powerful Integration

Quartz H2 seamlessly integrates with Cats Effect's FS2 streaming ecosystem, providing a reactive, high-performance HTTP/2 server with sophisticated flow control and backpressure mechanisms. This integration offers significant advantages for building modern, resource-efficient web applications.

Key Benefits

  • End-to-End Backpressure: Complete backpressure chain from the network socket to your application logic
  • Reactive Flow Control: Adaptive system that responds to application processing capabilities in real-time
  • Resource Efficiency: Optimal memory and CPU utilization even under variable load conditions
  • Non-Blocking Operations: Leverages Cats Effect's concurrency primitives for maximum throughput without resource exhaustion

FS2 Stream Examples from Run.scala

File Upload with FS2 Stream

Using FS2 Stream to handle file uploads efficiently:

case req @ POST -> Root / "upload" / StringVar(file) =>
  val FOLDER_PATH = "/home/ols/web_root/"
  val FILE = s"$file"
  for {
    jpath <- IO.attempt(new java.io.File(FOLDER_PATH + FILE))
    u <- req.stream.through(fs2.io.file.writeAll[IO](java.nio.file.Paths.get(FOLDER_PATH + FILE))).compile.drain
  } yield (Response.Ok().asText("OK"))

Counting Stream Bytes

Using FS2 Stream to count bytes in a request:

case req @ GET -> Root =>
  for {
    x <- req.stream.compile.count
  } yield (Response.Ok().asText(s"OK bytes received: $x"))

Collecting Stream Data

Using FS2 Stream to collect all data from a request:

case req @ POST -> Root =>
  for {
    bytes <- req.stream.compile.toList
    data = bytes.toArray
  } yield (Response.Ok().asText("OK:" + new String(data)))

Draining a Stream

Using FS2 Stream to properly drain incoming data:

case req @ GET -> Root / "snihost" =>
  for {
    _ <- req.stream.compile.drain // properly ignore incoming data, we must flush it
    result_text <- IO.attempt(req.sniServerNames match {
      case Some(hosts) => s"Host names in TLS SNI extension: ${hosts.mkString(",")}"
      case None        => "No TLS SNI host names provided or unsecure connection"
    })
  } yield (Response.Ok().asText(result_text))

Streaming Response with Custom Chunks

Creating a stream from custom chunks for response:

case "localhost" ! GET -> Root / "example" =>
  // how to send data in separate H2 packets of various size.
  val ts = Stream.emits("Block1\n".getBytes())
  val ts2 = ts ++ Stream.emits("Block22\n".getBytes())
  IO(Response.Ok().asStream(ts2))

Streaming File Response

Using FS2 Stream to efficiently serve file contents:

case GET -> Root / StringVar(file) =>
  val FOLDER_PATH = "/home/ols/web_root/"
  val FILE = s"$file"
  val BLOCK_SIZE = 1024 * 14
  for {
    jpath <- IO.attempt(new java.io.File(FOLDER_PATH + FILE))
    present <- IO.attempt(jpath.exists())
    _ <- IO.raiseWhen(!present)(new java.io.FileNotFoundException(jpath.toString()))
  } yield (Response
    .Ok()
    .asStream(fs2.io.readInputStream(IO(new java.io.FileInputStream(jpath)), BLOCK_SIZE, true))
    .contentType(ContentType.contentTypeFromFileName(FILE)))