Spark Pipeline Framework|DocsComponentsHooks

Packages

  • package root
    Definition Classes
    root
  • package io
    Definition Classes
    root
  • package github
    Definition Classes
    io
  • package dwsmith1983
    Definition Classes
    github
  • package spark
    Definition Classes
    dwsmith1983
  • package pipeline
    Definition Classes
    spark
  • package runner

    Runner package for spark-pipeline-framework.

    Runner package for spark-pipeline-framework.

    This package provides the entry point for executing pipelines:

    - runner.PipelineRunner - Trait defining the pipeline execution interface - runner.SimplePipelineRunner - Main class for spark-submit (sequential execution)

    Lifecycle Hooks

    Use PipelineHooks from the config package to monitor or customize pipeline execution:

    import io.github.dwsmith1983.spark.pipeline.config._
    
    val hooks = new PipelineHooks {
      override def afterComponent(config: ComponentConfig, index: Int, total: Int, durationMs: Long): Unit =
        println(s"Component $${config.instanceName} completed in $${durationMs}ms")
    }
    
    SimplePipelineRunner.run(config, hooks)

    Deployment

    The runner JAR should be used as the main artifact for spark-submit, with user pipeline JARs added via --jars:

    spark-submit \
      --class io.github.dwsmith1983.spark.pipeline.runner.SimplePipelineRunner \
      --jars /path/to/user-pipeline.jar \
      /path/to/spark-pipeline-runner-spark3_2.12.jar \
      -Dconfig.file=/path/to/pipeline.conf

    Configuration Loading

    Configuration is loaded via Typesafe Config in this order: 1. System properties (-Dconfig.file, -Dconfig.resource, -Dconfig.url) 2. application.conf in classpath 3. reference.conf in classpath

    Definition Classes
    pipeline
  • PipelineRunner
  • SchemaContractViolationException
  • SimplePipelineRunner

package runner

Runner package for spark-pipeline-framework.

This package provides the entry point for executing pipelines:

- runner.PipelineRunner - Trait defining the pipeline execution interface - runner.SimplePipelineRunner - Main class for spark-submit (sequential execution)

Lifecycle Hooks

Use PipelineHooks from the config package to monitor or customize pipeline execution:

import io.github.dwsmith1983.spark.pipeline.config._

val hooks = new PipelineHooks {
  override def afterComponent(config: ComponentConfig, index: Int, total: Int, durationMs: Long): Unit =
    println(s"Component $${config.instanceName} completed in $${durationMs}ms")
}

SimplePipelineRunner.run(config, hooks)

Deployment

The runner JAR should be used as the main artifact for spark-submit, with user pipeline JARs added via --jars:

spark-submit \
  --class io.github.dwsmith1983.spark.pipeline.runner.SimplePipelineRunner \
  --jars /path/to/user-pipeline.jar \
  /path/to/spark-pipeline-runner-spark3_2.12.jar \
  -Dconfig.file=/path/to/pipeline.conf

Configuration Loading

Configuration is loaded via Typesafe Config in this order: 1. System properties (-Dconfig.file, -Dconfig.resource, -Dconfig.url) 2. application.conf in classpath 3. reference.conf in classpath

Linear Supertypes
AnyRef, Any
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. runner
  2. AnyRef
  3. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. Protected

Type Members

  1. trait PipelineRunner extends AnyRef

    Trait defining the interface for pipeline execution.

    Trait defining the interface for pipeline execution.

    Implementations of this trait are responsible for: - Loading and parsing pipeline configuration - Configuring the SparkSession - Instantiating and executing pipeline components - Invoking lifecycle hooks at appropriate points

    Standard Implementation

    The framework provides SimplePipelineRunner as the standard implementation, which executes components sequentially in the order they appear in the configuration.

    Custom Implementations

    You can create custom runners for specialized behavior: - Parallel execution of independent components - Dry-run mode that validates without executing - Conditional execution based on external state

    Example

    // Using the standard runner with custom hooks
    val hooks = new PipelineHooks {
      override def afterComponent(config: ComponentConfig, index: Int, total: Int, durationMs: Long): Unit =
        println(s"Component $${config.instanceName} completed in $${durationMs}ms")
    }
    
    SimplePipelineRunner.run(config, hooks)
  2. class SchemaContractViolationException extends RuntimeException

    Exception thrown when schema validation fails between components.

Value Members

  1. object SimplePipelineRunner extends PipelineRunner

    Main entry point for running pipelines.

    Main entry point for running pipelines.

    SimplePipelineRunner reads a HOCON configuration file, configures the SparkSession, and executes pipeline components in sequence.

    Usage

    Via spark-submit:

    spark-submit \
      --class io.github.dwsmith1983.spark.pipeline.runner.SimplePipelineRunner \
      --jars /path/to/user-pipeline.jar \
      /path/to/spark-pipeline-runner.jar \
      -Dconfig.file=/path/to/pipeline.conf

    Configuration

    The configuration file should contain:

    spark {
      master = "yarn"           # Optional, defaults to spark-submit setting
      app-name = "MyPipeline"   # Optional
      config {
        "spark.executor.memory" = "4g"
        "spark.executor.cores" = "2"
      }
    }
    
    pipeline {
      pipeline-name = "My Data Pipeline"
      pipeline-components = [
        {
          instance-type = "com.mycompany.MyComponent"
          instance-name = "MyComponent(prod)"
          instance-config {
            input-table = "raw_data"
            output-path = "/data/processed"
          }
        }
      ]
    }

    Lifecycle Hooks

    You can provide custom PipelineHooks to monitor or customize execution:

    val hooks = new PipelineHooks {
      override def afterComponent(config: ComponentConfig, index: Int, total: Int, durationMs: Long): Unit =
        metrics.recordDuration(config.instanceName, durationMs)
    }
    
    SimplePipelineRunner.run(config, hooks)

Inherited from AnyRef

Inherited from Any

Ungrouped