object SimplePipelineRunner extends PipelineRunner
Main entry point for running pipelines.
SimplePipelineRunner reads a HOCON configuration file, configures the SparkSession, and executes pipeline components in sequence.
Usage
Via spark-submit:
spark-submit \
--class io.github.dwsmith1983.spark.pipeline.runner.SimplePipelineRunner \
--jars /path/to/user-pipeline.jar \
/path/to/spark-pipeline-runner.jar \
-Dconfig.file=/path/to/pipeline.confConfiguration
The configuration file should contain:
spark {
master = "yarn" # Optional, defaults to spark-submit setting
app-name = "MyPipeline" # Optional
config {
"spark.executor.memory" = "4g"
"spark.executor.cores" = "2"
}
}
pipeline {
pipeline-name = "My Data Pipeline"
pipeline-components = [
{
instance-type = "com.mycompany.MyComponent"
instance-name = "MyComponent(prod)"
instance-config {
input-table = "raw_data"
output-path = "/data/processed"
}
}
]
}Lifecycle Hooks
You can provide custom PipelineHooks to monitor or customize execution:
val hooks = new PipelineHooks { override def afterComponent(config: ComponentConfig, index: Int, total: Int, durationMs: Long): Unit = metrics.recordDuration(config.instanceName, durationMs) } SimplePipelineRunner.run(config, hooks)
- Alphabetic
- By Inheritance
- SimplePipelineRunner
- PipelineRunner
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @IntrinsicCandidate() @native()
- def dryRun(config: Config, hooks: PipelineHooks): DryRunResult
Validates the pipeline configuration without executing components.
Validates the pipeline configuration without executing components.
This method parses the configuration and instantiates all components to verify they can be created, but does not call their
run()methods.- config
The loaded HOCON configuration
- hooks
Lifecycle hooks to invoke (only beforePipeline/afterPipeline are called)
- returns
Validation result indicating success or listing errors
- Definition Classes
- SimplePipelineRunner → PipelineRunner
- def dryRun(config: Config): DryRunResult
Validates the pipeline configuration without executing components.
Validates the pipeline configuration without executing components.
This method parses the configuration and instantiates all components to verify they can be created, but does not call their
run()methods. Useful for CI/CD pipelines to validate configurations before deployment.This is a convenience method that uses
PipelineHooks.NoOp.- config
The loaded HOCON configuration containing
pipelineand optionallysparkblocks- returns
Validation result indicating success or listing errors
- Definition Classes
- PipelineRunner
- def dryRunFromFile(configPath: String, hooks: PipelineHooks): DryRunResult
Validates the pipeline from a config file path with lifecycle hooks.
Validates the pipeline from a config file path with lifecycle hooks.
- configPath
Path to the HOCON configuration file
- hooks
Lifecycle hooks to invoke
- returns
Validation result indicating success or listing errors
- def dryRunFromFile(configPath: String): DryRunResult
Validates the pipeline from a config file path.
Validates the pipeline from a config file path.
- configPath
Path to the HOCON configuration file
- returns
Validation result indicating success or listing errors
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @IntrinsicCandidate() @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @IntrinsicCandidate() @native()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- def listResumableCheckpoints(pipelineName: String, store: CheckpointStore): List[CheckpointState]
Lists all incomplete (resumable) checkpoints for a pipeline.
Lists all incomplete (resumable) checkpoints for a pipeline.
- pipelineName
Name of the pipeline
- store
The checkpoint store to query
- returns
List of checkpoint states that can be resumed
- def main(args: Array[String]): Unit
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @IntrinsicCandidate() @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @IntrinsicCandidate() @native()
- def resume(configPath: String, runId: String, store: Option[CheckpointStore] = None, additionalHooks: PipelineHooks = PipelineHooks.NoOp): Unit
Resumes a failed pipeline run from its checkpoint.
Resumes a failed pipeline run from its checkpoint.
Loads the checkpoint state for the specified run ID and continues execution from the next component after the last successful one.
Example
// Resume from a specific failed run SimplePipelineRunner.resume( configPath = "/path/to/pipeline.conf", runId = "abc123-def456" )
- configPath
Path to the HOCON configuration file
- runId
The run ID to resume from
- store
Optional checkpoint store (uses config-based store if not provided)
- additionalHooks
Additional hooks to compose with checkpoint hooks
- Note
Throws
CheckpointExceptionif checkpoint not found or config mismatch
- def resumeFromConfig(config: Config, runId: String, store: Option[CheckpointStore] = None, additionalHooks: PipelineHooks = PipelineHooks.NoOp): Unit
Resumes a failed pipeline run using a Config object.
Resumes a failed pipeline run using a Config object.
- config
The loaded HOCON configuration
- runId
The run ID to resume from
- store
Optional checkpoint store
- additionalHooks
Additional hooks to compose with checkpoint hooks
- def run(config: Config, hooks: PipelineHooks): Unit
Runs the pipeline defined in the given configuration with lifecycle hooks.
Runs the pipeline defined in the given configuration with lifecycle hooks.
- config
The loaded HOCON configuration
- hooks
Lifecycle hooks to invoke during execution
- Definition Classes
- SimplePipelineRunner → PipelineRunner
- def run(config: Config): Unit
Runs the pipeline defined in the given configuration.
Runs the pipeline defined in the given configuration.
This is a convenience method that uses
PipelineHooks.NoOp.May throw
ComponentInstantiationExceptionif a component cannot be instantiated, orConfigurationExceptionif the configuration is invalid.- config
The loaded HOCON configuration containing
pipelineand optionallysparkblocks
- Definition Classes
- PipelineRunner
- def runFromFile(configPath: String, hooks: PipelineHooks): Unit
Runs the pipeline from a config file path with lifecycle hooks.
Runs the pipeline from a config file path with lifecycle hooks.
- configPath
Path to the HOCON configuration file
- hooks
Lifecycle hooks to invoke during execution
- def runFromFile(configPath: String): Unit
Runs the pipeline from a config file path.
Runs the pipeline from a config file path.
- configPath
Path to the HOCON configuration file
- def runWithCheckpointing(configPath: String, additionalHooks: PipelineHooks = PipelineHooks.NoOp): Unit
Runs a pipeline with automatic checkpoint support.
Runs a pipeline with automatic checkpoint support.
If checkpointing is enabled in the configuration and auto-resume is true, this method will automatically resume from the last failed checkpoint if one exists.
Example
// Run with checkpointing enabled (configured in pipeline.conf) SimplePipelineRunner.runWithCheckpointing("/path/to/pipeline.conf")
- configPath
Path to the HOCON configuration file
- additionalHooks
Additional hooks to compose with checkpoint hooks
- def runWithCheckpointingFromConfig(config: Config, additionalHooks: PipelineHooks = PipelineHooks.NoOp): Unit
Runs a pipeline with automatic checkpoint support using a Config object.
Runs a pipeline with automatic checkpoint support using a Config object.
- config
The loaded HOCON configuration
- additionalHooks
Additional hooks to compose with checkpoint hooks
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- def validate(config: Config, options: ValidationOptions): ValidationResult
Validates the pipeline configuration with options.
Validates the pipeline configuration with options.
- config
The loaded HOCON configuration
- options
Validation options (e.g., resource checking)
- returns
Validation result with all errors and warnings
- def validate(config: Config): ValidationResult
Validates the pipeline configuration without instantiating components.
Validates the pipeline configuration without instantiating components.
This method performs lightweight validation by checking:
- HOCON syntax correctness
- Required fields (pipelineName, pipelineComponents)
- Component class existence (Class.forName)
- Companion object existence and ConfigurableInstance trait
Unlike
dryRun, this does NOT callcreateFromConfigon components, making it faster and free of initialization side effects.Use
validatefor quick pre-flight checks, anddryRunwhen you need to verify that components can be fully instantiated with their configs.- config
The loaded HOCON configuration
- returns
Validation result with all errors and warnings
- def validateFromFile(configPath: String, options: ValidationOptions): ValidationResult
Validates a pipeline configuration from a file path with options.
Validates a pipeline configuration from a file path with options.
- configPath
Path to the HOCON configuration file
- options
Validation options (e.g., resource checking)
- returns
Validation result with all errors and warnings
- def validateFromFile(configPath: String): ValidationResult
Validates a pipeline configuration from a file path.
Validates a pipeline configuration from a file path.
- configPath
Path to the HOCON configuration file
- returns
Validation result with all errors and warnings
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
Deprecated Value Members
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable]) @Deprecated
- Deprecated
(Since version 9)