Skip to main content

Getting Started

Spark Pipeline Framework is a configuration-driven framework for building Apache Spark pipelines using HOCON config files and PureConfig.

Features

  • Type-safe configuration via PureConfig with automatic case class binding
  • SparkSession management from config files
  • Dynamic component instantiation via reflection (no compile-time coupling)
  • Lifecycle hooks for monitoring, metrics, and custom error handling
  • Dry-run validation to verify configs without executing pipelines
  • Flexible error handling with fail-fast or continue-on-error modes
  • Cross-compilation support for Spark 3.x and 4.x

Installation

Add the dependency to your build.sbt:

// For Spark 3.5.x with Scala 2.13
libraryDependencies += "io.github.dwsmith1983" %% "spark-pipeline-runtime-spark3" % "<version>"

// For Spark 4.0.x with Scala 2.13
libraryDependencies += "io.github.dwsmith1983" %% "spark-pipeline-runtime-spark4" % "<version>"

No resolver needed - artifacts are available on Maven Central.

Cross-Compilation Matrix

ArtifactSparkScalaJava
*-spark3_2.123.5.72.12.1817+
*-spark3_2.133.5.72.13.1217+
*-spark4_2.134.0.12.13.1217+

Modules

ModuleDescriptionSpark Dependency
spark-pipeline-coreTraits, config models, instantiationNone
spark-pipeline-runtimeSparkSessionWrapper, DataFlow traitYes (provided)
spark-pipeline-runnerSimplePipelineRunner entry pointYes (provided)

Quick Example

1. Create a pipeline component

import io.github.dwsmith1983.spark.pipeline.config.ConfigurableInstance
import io.github.dwsmith1983.spark.pipeline.runtime.DataFlow
import pureconfig._
import pureconfig.generic.auto._

object MyComponent extends ConfigurableInstance {
case class Config(inputTable: String, outputPath: String)

override def createFromConfig(conf: com.typesafe.config.Config): MyComponent = {
new MyComponent(ConfigSource.fromConfig(conf).loadOrThrow[Config])
}
}

class MyComponent(conf: MyComponent.Config) extends DataFlow {
override def run(): Unit = {
logger.info(s"Processing ${conf.inputTable}")
spark.table(conf.inputTable).write.parquet(conf.outputPath)
}
}

2. Create a pipeline config file

# pipeline.conf
spark {
app-name = "My Pipeline"
config {
"spark.executor.memory" = "4g"
}
}

pipeline {
pipeline-name = "My Data Pipeline"
pipeline-components = [
{
instance-type = "com.mycompany.MyComponent"
instance-name = "MyComponent(prod)"
instance-config {
input-table = "raw_data"
output-path = "/data/processed"
}
}
]
}

3. Run with spark-submit

spark-submit \
--class io.github.dwsmith1983.spark.pipeline.runner.SimplePipelineRunner \
--jars /path/to/my-pipeline.jar \
/path/to/spark-pipeline-runner-spark3_2.13.jar \
-Dconfig.file=/path/to/pipeline.conf

Next Steps