Skip to main content

Streaming Pipelines

Build Spark Structured Streaming applications with the same configuration-driven approach as batch pipelines. Connect sources to sinks with optional transformations and automatic watermarking.

Quick Start

Create a streaming pipeline by extending StreamingPipeline:

import io.github.dwsmith1983.spark.pipeline.streaming._
import io.github.dwsmith1983.spark.pipeline.config.ConfigurableInstance
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming.OutputMode
import pureconfig._
import pureconfig.generic.auto._

object MyStreamingPipeline extends ConfigurableInstance {
case class Config(
kafkaServers: String,
inputTopic: String,
outputPath: String,
checkpointPath: String
)

override def createFromConfig(conf: com.typesafe.config.Config): MyStreamingPipeline =
new MyStreamingPipeline(ConfigSource.fromConfig(conf).loadOrThrow[Config])
}

class MyStreamingPipeline(conf: MyStreamingPipeline.Config) extends StreamingPipeline {

override def source: StreamingSource = new StreamingSource {
override def readStream(): DataFrame =
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.kafkaServers)
.option("subscribe", conf.inputTopic)
.load()
}

override def sink: StreamingSink = new StreamingSink {
override def writeStream(df: DataFrame) =
df.writeStream.format("parquet").option("path", conf.outputPath)

override def outputMode = OutputMode.Append()
override def checkpointLocation = conf.checkpointPath
}
}

Core Components

StreamingPipeline

The main orchestrator that connects sources to sinks:

MethodDescription
sourceReturns the StreamingSource to read from
sinkReturns the StreamingSink to write to
transform(df)Optional transformation between source and sink
triggerTrigger configuration (defaults to ProcessingTime 0s)
startStream()Starts query, returns StreamingQuery handle
run()Starts query and blocks until termination

StreamingSource

Base trait for streaming data sources:

trait StreamingSource extends DataFlow {
def readStream(): DataFrame
def watermarkColumn: Option[String] = None
def watermarkDelay: Option[String] = None
}

StreamingSink

Base trait for streaming data sinks:

trait StreamingSink extends DataFlow {
def writeStream(df: DataFrame): DataStreamWriter[Row]
def outputMode: OutputMode
def checkpointLocation: String
def queryName: Option[String] = None
}

Trigger Configuration

Control how often micro-batches execute:

import io.github.dwsmith1983.spark.pipeline.streaming.TriggerConfig

// Process every 10 seconds (most common)
override def trigger = TriggerConfig.ProcessingTime("10 seconds")

// Run once for testing or batch-like processing
override def trigger = TriggerConfig.Once

// Process all available data then stop (Spark 3.3+)
override def trigger = TriggerConfig.AvailableNow

// Low-latency continuous processing (experimental)
override def trigger = TriggerConfig.Continuous("1 second")
Trigger TypeUse Case
ProcessingTimeRegular streaming workloads
OnceTesting, one-time migrations
AvailableNowCatch up on backlog then stop
ContinuousSub-millisecond latency (limited sink support)

Watermarking

Enable event-time processing with automatic watermark application:

class MySource extends StreamingSource {
override def readStream(): DataFrame = ???

// Define watermark column and delay
override def watermarkColumn: Option[String] = Some("event_time")
override def watermarkDelay: Option[String] = Some("10 minutes")
}

The StreamingPipeline automatically applies watermarking when both watermarkColumn and watermarkDelay are defined.

Output Modes

Spark Structured Streaming supports three output modes:

ModeDescriptionUse When
AppendOnly new rows writtenMost streaming workloads
CompleteEntire result table writtenAggregations
UpdateOnly changed rows writtenStateful operations
import org.apache.spark.sql.streaming.OutputMode

override def outputMode = OutputMode.Append() // Default for most sinks
override def outputMode = OutputMode.Complete() // For aggregations
override def outputMode = OutputMode.Update() // For stateful ops

Checkpointing

Every streaming sink requires a checkpoint location for fault tolerance:

override def checkpointLocation = "s3://bucket/checkpoints/my-pipeline"

Best practices:

  • Use durable storage (HDFS, S3, GCS) for production
  • Each streaming query needs a unique checkpoint path
  • Never share checkpoint locations between queries

Lifecycle Hooks

Monitor streaming queries with StreamingHooks:

import io.github.dwsmith1983.spark.pipeline.streaming.StreamingHooks

val myHooks = new StreamingHooks {
override def onQueryStart(queryName: String, queryId: String): Unit =
println(s"Query started: $queryName ($queryId)")

override def onBatchProgress(
queryName: String,
batchId: Long,
numInputRows: Long,
durationMs: Long
): Unit =
println(s"Batch $batchId: $numInputRows rows in ${durationMs}ms")

override def onQueryTerminated(
queryName: String,
queryId: String,
exception: Option[Throwable]
): Unit =
exception.foreach(e => alerting.send(s"Query $queryName failed: ${e.getMessage}"))
}

// Compose multiple hooks
val compositeHooks = StreamingHooks.compose(loggingHooks, metricsHooks, alertingHooks)

Complete Example

A Kafka-to-Delta streaming pipeline:

import io.github.dwsmith1983.spark.pipeline.streaming._
import io.github.dwsmith1983.spark.pipeline.config.ConfigurableInstance
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming.OutputMode
import pureconfig._
import pureconfig.generic.auto._

object KafkaToDeltaPipeline extends ConfigurableInstance {
case class Config(
kafkaServers: String,
topic: String,
deltaTablePath: String,
checkpointPath: String,
triggerInterval: String = "30 seconds"
)

override def createFromConfig(conf: com.typesafe.config.Config): KafkaToDeltaPipeline =
new KafkaToDeltaPipeline(ConfigSource.fromConfig(conf).loadOrThrow[Config])
}

class KafkaToDeltaPipeline(conf: KafkaToDeltaPipeline.Config) extends StreamingPipeline {

override def source: StreamingSource = new StreamingSource {
override def readStream(): DataFrame =
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.kafkaServers)
.option("subscribe", conf.topic)
.option("startingOffsets", "latest")
.load()

override def watermarkColumn = Some("timestamp")
override def watermarkDelay = Some("5 minutes")
}

override def sink: StreamingSink = new StreamingSink {
override def writeStream(df: DataFrame) =
df.writeStream
.format("delta")
.option("path", conf.deltaTablePath)
.option("mergeSchema", "true")

override def outputMode = OutputMode.Append()
override def checkpointLocation = conf.checkpointPath
override def queryName = Some("kafka-to-delta")
}

override def transform(df: DataFrame): DataFrame = {
import spark.implicits._
df.selectExpr(
"CAST(key AS STRING) as key",
"CAST(value AS STRING) as value",
"timestamp",
"partition",
"offset"
).filter($"value".isNotNull)
}

override def trigger = TriggerConfig.ProcessingTime(conf.triggerInterval)
}

Configuration:

pipeline {
pipeline-name = "Kafka to Delta Pipeline"
pipeline-components = [
{
instance-type = "com.example.KafkaToDeltaPipeline"
instance-name = "kafka-to-delta"
instance-config {
kafka-servers = "kafka:9092"
topic = "events"
delta-table-path = "/data/delta/events"
checkpoint-path = "/checkpoints/kafka-to-delta"
trigger-interval = "30 seconds"
}
}
]
}

Testing

Use TriggerConfig.Once for testing:

class StreamingPipelineSpec extends AnyFlatSpec {
"MyStreamingPipeline" should "process data correctly" in {
val testPipeline = new MyStreamingPipeline(testConfig) {
override def trigger = TriggerConfig.Once
}

val query = testPipeline.startStream()
query.awaitTermination()

// Assert on output
}
}

For testing sources and sinks independently, use Spark's built-in rate source:

// Test sink with rate source
val testSource = new StreamingSource {
override def readStream(): DataFrame =
spark.readStream.format("rate").option("rowsPerSecond", 10).load()
}