package runner
Runner package for spark-pipeline-framework.
This package provides the entry point for executing pipelines:
- runner.PipelineRunner - Trait defining the pipeline execution interface - runner.SimplePipelineRunner - Main class for spark-submit (sequential execution)
Lifecycle Hooks
Use PipelineHooks from the config package to monitor
or customize pipeline execution:
import io.github.dwsmith1983.spark.pipeline.config._ val hooks = new PipelineHooks { override def afterComponent(config: ComponentConfig, index: Int, total: Int, durationMs: Long): Unit = println(s"Component $${config.instanceName} completed in $${durationMs}ms") } SimplePipelineRunner.run(config, hooks)
Deployment
The runner JAR should be used as the main artifact for spark-submit, with user pipeline JARs added via --jars:
spark-submit \ --class io.github.dwsmith1983.spark.pipeline.runner.SimplePipelineRunner \ --jars /path/to/user-pipeline.jar \ /path/to/spark-pipeline-runner-spark3_2.12.jar \ -Dconfig.file=/path/to/pipeline.conf
Configuration Loading
Configuration is loaded via Typesafe Config in this order: 1. System properties (-Dconfig.file, -Dconfig.resource, -Dconfig.url) 2. application.conf in classpath 3. reference.conf in classpath
- Alphabetic
- By Inheritance
- runner
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Type Members
- trait PipelineRunner extends AnyRef
Trait defining the interface for pipeline execution.
Trait defining the interface for pipeline execution.
Implementations of this trait are responsible for: - Loading and parsing pipeline configuration - Configuring the SparkSession - Instantiating and executing pipeline components - Invoking lifecycle hooks at appropriate points
Standard Implementation
The framework provides SimplePipelineRunner as the standard implementation, which executes components sequentially in the order they appear in the configuration.
Custom Implementations
You can create custom runners for specialized behavior: - Parallel execution of independent components - Dry-run mode that validates without executing - Conditional execution based on external state
Example
// Using the standard runner with custom hooks val hooks = new PipelineHooks { override def afterComponent(config: ComponentConfig, index: Int, total: Int, durationMs: Long): Unit = println(s"Component $${config.instanceName} completed in $${durationMs}ms") } SimplePipelineRunner.run(config, hooks)
- class SchemaContractViolationException extends RuntimeException
Exception thrown when schema validation fails between components.
Value Members
- object SimplePipelineRunner extends PipelineRunner
Main entry point for running pipelines.
Main entry point for running pipelines.
SimplePipelineRunner reads a HOCON configuration file, configures the SparkSession, and executes pipeline components in sequence.
Usage
Via spark-submit:
spark-submit \ --class io.github.dwsmith1983.spark.pipeline.runner.SimplePipelineRunner \ --jars /path/to/user-pipeline.jar \ /path/to/spark-pipeline-runner.jar \ -Dconfig.file=/path/to/pipeline.confConfiguration
The configuration file should contain:
spark { master = "yarn" # Optional, defaults to spark-submit setting app-name = "MyPipeline" # Optional config { "spark.executor.memory" = "4g" "spark.executor.cores" = "2" } } pipeline { pipeline-name = "My Data Pipeline" pipeline-components = [ { instance-type = "com.mycompany.MyComponent" instance-name = "MyComponent(prod)" instance-config { input-table = "raw_data" output-path = "/data/processed" } } ] }Lifecycle Hooks
You can provide custom
PipelineHooksto monitor or customize execution:val hooks = new PipelineHooks { override def afterComponent(config: ComponentConfig, index: Int, total: Int, durationMs: Long): Unit = metrics.recordDuration(config.instanceName, durationMs) } SimplePipelineRunner.run(config, hooks)