Spark Pipeline Framework|DocsComponentsHooks

Packages

  • package root
    Definition Classes
    root
  • package io
    Definition Classes
    root
  • package github
    Definition Classes
    io
  • package dwsmith1983
    Definition Classes
    github
  • package spark
    Definition Classes
    dwsmith1983
  • package pipeline
    Definition Classes
    spark
  • package runner

    Runner package for spark-pipeline-framework.

    Runner package for spark-pipeline-framework.

    This package provides the entry point for executing pipelines:

    - runner.PipelineRunner - Trait defining the pipeline execution interface - runner.SimplePipelineRunner - Main class for spark-submit (sequential execution)

    Lifecycle Hooks

    Use PipelineHooks from the config package to monitor or customize pipeline execution:

    import io.github.dwsmith1983.spark.pipeline.config._
    
    val hooks = new PipelineHooks {
      override def afterComponent(config: ComponentConfig, index: Int, total: Int, durationMs: Long): Unit =
        println(s"Component $${config.instanceName} completed in $${durationMs}ms")
    }
    
    SimplePipelineRunner.run(config, hooks)

    Deployment

    The runner JAR should be used as the main artifact for spark-submit, with user pipeline JARs added via --jars:

    spark-submit \
      --class io.github.dwsmith1983.spark.pipeline.runner.SimplePipelineRunner \
      --jars /path/to/user-pipeline.jar \
      /path/to/spark-pipeline-runner-spark3_2.12.jar \
      -Dconfig.file=/path/to/pipeline.conf

    Configuration Loading

    Configuration is loaded via Typesafe Config in this order: 1. System properties (-Dconfig.file, -Dconfig.resource, -Dconfig.url) 2. application.conf in classpath 3. reference.conf in classpath

    Definition Classes
    pipeline
  • PipelineRunner
  • SchemaContractViolationException
  • SimplePipelineRunner

object SimplePipelineRunner extends PipelineRunner

Main entry point for running pipelines.

SimplePipelineRunner reads a HOCON configuration file, configures the SparkSession, and executes pipeline components in sequence.

Usage

Via spark-submit:

spark-submit \
  --class io.github.dwsmith1983.spark.pipeline.runner.SimplePipelineRunner \
  --jars /path/to/user-pipeline.jar \
  /path/to/spark-pipeline-runner.jar \
  -Dconfig.file=/path/to/pipeline.conf

Configuration

The configuration file should contain:

spark {
  master = "yarn"           # Optional, defaults to spark-submit setting
  app-name = "MyPipeline"   # Optional
  config {
    "spark.executor.memory" = "4g"
    "spark.executor.cores" = "2"
  }
}

pipeline {
  pipeline-name = "My Data Pipeline"
  pipeline-components = [
    {
      instance-type = "com.mycompany.MyComponent"
      instance-name = "MyComponent(prod)"
      instance-config {
        input-table = "raw_data"
        output-path = "/data/processed"
      }
    }
  ]
}

Lifecycle Hooks

You can provide custom PipelineHooks to monitor or customize execution:

val hooks = new PipelineHooks {
  override def afterComponent(config: ComponentConfig, index: Int, total: Int, durationMs: Long): Unit =
    metrics.recordDuration(config.instanceName, durationMs)
}

SimplePipelineRunner.run(config, hooks)
Linear Supertypes
PipelineRunner, AnyRef, Any
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. SimplePipelineRunner
  2. PipelineRunner
  3. AnyRef
  4. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. Protected

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##: Int
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  4. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  5. def clone(): AnyRef
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.CloneNotSupportedException]) @IntrinsicCandidate() @native()
  6. def dryRun(config: Config, hooks: PipelineHooks): DryRunResult

    Validates the pipeline configuration without executing components.

    Validates the pipeline configuration without executing components.

    This method parses the configuration and instantiates all components to verify they can be created, but does not call their run() methods.

    config

    The loaded HOCON configuration

    hooks

    Lifecycle hooks to invoke (only beforePipeline/afterPipeline are called)

    returns

    Validation result indicating success or listing errors

    Definition Classes
    SimplePipelineRunnerPipelineRunner
  7. def dryRun(config: Config): DryRunResult

    Validates the pipeline configuration without executing components.

    Validates the pipeline configuration without executing components.

    This method parses the configuration and instantiates all components to verify they can be created, but does not call their run() methods. Useful for CI/CD pipelines to validate configurations before deployment.

    This is a convenience method that uses PipelineHooks.NoOp.

    config

    The loaded HOCON configuration containing pipeline and optionally spark blocks

    returns

    Validation result indicating success or listing errors

    Definition Classes
    PipelineRunner
  8. def dryRunFromFile(configPath: String, hooks: PipelineHooks): DryRunResult

    Validates the pipeline from a config file path with lifecycle hooks.

    Validates the pipeline from a config file path with lifecycle hooks.

    configPath

    Path to the HOCON configuration file

    hooks

    Lifecycle hooks to invoke

    returns

    Validation result indicating success or listing errors

  9. def dryRunFromFile(configPath: String): DryRunResult

    Validates the pipeline from a config file path.

    Validates the pipeline from a config file path.

    configPath

    Path to the HOCON configuration file

    returns

    Validation result indicating success or listing errors

  10. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  11. def equals(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef → Any
  12. final def getClass(): Class[_ <: AnyRef]
    Definition Classes
    AnyRef → Any
    Annotations
    @IntrinsicCandidate() @native()
  13. def hashCode(): Int
    Definition Classes
    AnyRef → Any
    Annotations
    @IntrinsicCandidate() @native()
  14. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  15. def listResumableCheckpoints(pipelineName: String, store: CheckpointStore): List[CheckpointState]

    Lists all incomplete (resumable) checkpoints for a pipeline.

    Lists all incomplete (resumable) checkpoints for a pipeline.

    pipelineName

    Name of the pipeline

    store

    The checkpoint store to query

    returns

    List of checkpoint states that can be resumed

  16. def main(args: Array[String]): Unit
  17. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  18. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @IntrinsicCandidate() @native()
  19. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @IntrinsicCandidate() @native()
  20. def resume(configPath: String, runId: String, store: Option[CheckpointStore] = None, additionalHooks: PipelineHooks = PipelineHooks.NoOp): Unit

    Resumes a failed pipeline run from its checkpoint.

    Resumes a failed pipeline run from its checkpoint.

    Loads the checkpoint state for the specified run ID and continues execution from the next component after the last successful one.

    Example

    // Resume from a specific failed run
    SimplePipelineRunner.resume(
      configPath = "/path/to/pipeline.conf",
      runId = "abc123-def456"
    )
    configPath

    Path to the HOCON configuration file

    runId

    The run ID to resume from

    store

    Optional checkpoint store (uses config-based store if not provided)

    additionalHooks

    Additional hooks to compose with checkpoint hooks

    Note

    Throws CheckpointException if checkpoint not found or config mismatch

  21. def resumeFromConfig(config: Config, runId: String, store: Option[CheckpointStore] = None, additionalHooks: PipelineHooks = PipelineHooks.NoOp): Unit

    Resumes a failed pipeline run using a Config object.

    Resumes a failed pipeline run using a Config object.

    config

    The loaded HOCON configuration

    runId

    The run ID to resume from

    store

    Optional checkpoint store

    additionalHooks

    Additional hooks to compose with checkpoint hooks

  22. def run(config: Config, hooks: PipelineHooks): Unit

    Runs the pipeline defined in the given configuration with lifecycle hooks.

    Runs the pipeline defined in the given configuration with lifecycle hooks.

    config

    The loaded HOCON configuration

    hooks

    Lifecycle hooks to invoke during execution

    Definition Classes
    SimplePipelineRunnerPipelineRunner
  23. def run(config: Config): Unit

    Runs the pipeline defined in the given configuration.

    Runs the pipeline defined in the given configuration.

    This is a convenience method that uses PipelineHooks.NoOp.

    May throw ComponentInstantiationException if a component cannot be instantiated, or ConfigurationException if the configuration is invalid.

    config

    The loaded HOCON configuration containing pipeline and optionally spark blocks

    Definition Classes
    PipelineRunner
  24. def runFromFile(configPath: String, hooks: PipelineHooks): Unit

    Runs the pipeline from a config file path with lifecycle hooks.

    Runs the pipeline from a config file path with lifecycle hooks.

    configPath

    Path to the HOCON configuration file

    hooks

    Lifecycle hooks to invoke during execution

  25. def runFromFile(configPath: String): Unit

    Runs the pipeline from a config file path.

    Runs the pipeline from a config file path.

    configPath

    Path to the HOCON configuration file

  26. def runWithCheckpointing(configPath: String, additionalHooks: PipelineHooks = PipelineHooks.NoOp): Unit

    Runs a pipeline with automatic checkpoint support.

    Runs a pipeline with automatic checkpoint support.

    If checkpointing is enabled in the configuration and auto-resume is true, this method will automatically resume from the last failed checkpoint if one exists.

    Example

    // Run with checkpointing enabled (configured in pipeline.conf)
    SimplePipelineRunner.runWithCheckpointing("/path/to/pipeline.conf")
    configPath

    Path to the HOCON configuration file

    additionalHooks

    Additional hooks to compose with checkpoint hooks

  27. def runWithCheckpointingFromConfig(config: Config, additionalHooks: PipelineHooks = PipelineHooks.NoOp): Unit

    Runs a pipeline with automatic checkpoint support using a Config object.

    Runs a pipeline with automatic checkpoint support using a Config object.

    config

    The loaded HOCON configuration

    additionalHooks

    Additional hooks to compose with checkpoint hooks

  28. final def synchronized[T0](arg0: => T0): T0
    Definition Classes
    AnyRef
  29. def toString(): String
    Definition Classes
    AnyRef → Any
  30. def validate(config: Config, options: ValidationOptions): ValidationResult

    Validates the pipeline configuration with options.

    Validates the pipeline configuration with options.

    config

    The loaded HOCON configuration

    options

    Validation options (e.g., resource checking)

    returns

    Validation result with all errors and warnings

  31. def validate(config: Config): ValidationResult

    Validates the pipeline configuration without instantiating components.

    Validates the pipeline configuration without instantiating components.

    This method performs lightweight validation by checking:

    • HOCON syntax correctness
    • Required fields (pipelineName, pipelineComponents)
    • Component class existence (Class.forName)
    • Companion object existence and ConfigurableInstance trait

    Unlike dryRun, this does NOT call createFromConfig on components, making it faster and free of initialization side effects.

    Use validate for quick pre-flight checks, and dryRun when you need to verify that components can be fully instantiated with their configs.

    config

    The loaded HOCON configuration

    returns

    Validation result with all errors and warnings

  32. def validateFromFile(configPath: String, options: ValidationOptions): ValidationResult

    Validates a pipeline configuration from a file path with options.

    Validates a pipeline configuration from a file path with options.

    configPath

    Path to the HOCON configuration file

    options

    Validation options (e.g., resource checking)

    returns

    Validation result with all errors and warnings

  33. def validateFromFile(configPath: String): ValidationResult

    Validates a pipeline configuration from a file path.

    Validates a pipeline configuration from a file path.

    configPath

    Path to the HOCON configuration file

    returns

    Validation result with all errors and warnings

  34. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  35. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException]) @native()
  36. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])

Deprecated Value Members

  1. def finalize(): Unit
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.Throwable]) @Deprecated
    Deprecated

    (Since version 9)

Inherited from PipelineRunner

Inherited from AnyRef

Inherited from Any

Ungrouped