Skip to main content

Components

Pipeline components are the building blocks of your data pipelines. Each component implements specific data processing logic and is instantiated dynamically from configuration.

Component Architecture

Components are built using three key traits:

ConfigurableInstance (companion object)
└── createFromConfig(Config): T

DataFlow (class) extends:
├── PipelineComponent → run(): Unit
├── SparkSessionWrapper → spark: SparkSession
└── logger: Logger → logger.info(), logger.error(), etc.

Creating a Component

Step 1: Define the Companion Object

The companion object implements ConfigurableInstance and handles instantiation:

import io.github.dwsmith1983.spark.pipeline.config.ConfigurableInstance
import com.typesafe.config.Config
import pureconfig._
import pureconfig.generic.auto._

object DataTransform extends ConfigurableInstance {
// Type-safe configuration case class
case class Config(
inputPath: String,
outputPath: String,
filterColumn: String,
filterValue: String
)

override def createFromConfig(conf: Config): DataTransform = {
val config = ConfigSource.fromConfig(conf).loadOrThrow[Config]
new DataTransform(config)
}
}

Step 2: Implement the Class

The class extends DataFlow and implements the run() method:

import io.github.dwsmith1983.spark.pipeline.runtime.DataFlow

class DataTransform(conf: DataTransform.Config) extends DataFlow {

override def run(): Unit = {
logger.info(s"Reading from ${conf.inputPath}")

val data = spark.read.parquet(conf.inputPath)
.filter(col(conf.filterColumn) === conf.filterValue)
.select("id", "name", "timestamp")

logger.info(s"Writing to ${conf.outputPath}")
data.write.mode("overwrite").parquet(conf.outputPath)

logger.info(s"Completed with ${data.count()} records")
}
}

Step 3: Configure in HOCON

pipeline {
pipeline-components = [
{
instance-type = "com.example.DataTransform"
instance-name = "FilterActiveUsers"
instance-config {
input-path = "/data/users"
output-path = "/data/active_users"
filter-column = "status"
filter-value = "active"
}
}
]
}

See Also: For complete, runnable examples, refer to BatchPipelineExample which demonstrates ETL, aggregation, and enrichment patterns.

SparkSession Access

The DataFlow trait provides access to a configured SparkSession:

class MyComponent(conf: MyComponent.Config) extends DataFlow {
override def run(): Unit = {
// SparkSession is available via 'spark'
val df = spark.read.parquet(conf.inputPath)

// SparkContext is available via spark.sparkContext
val rdd = spark.sparkContext.parallelize(Seq(1, 2, 3))

// SQL context
spark.sql("SELECT * FROM my_table")
}
}

Logging

The DataFlow trait provides a Log4j2 logger instance:

class MyComponent(conf: MyComponent.Config) extends DataFlow {
override def run(): Unit = {
logger.info("Starting processing")
logger.debug(s"Configuration: $conf")

try {
// processing...
} catch {
case e: Exception =>
logger.error("Processing failed", e)
throw e
}

logger.info("Processing complete")
}
}

Available logging methods:

  • logger.trace(msg), logger.trace(msg, throwable)
  • logger.debug(msg), logger.debug(msg, throwable)
  • logger.info(msg), logger.info(msg, throwable)
  • logger.warn(msg), logger.warn(msg, throwable)
  • logger.error(msg), logger.error(msg, throwable)

Configuration Patterns

Optional Fields with Defaults

case class Config(
inputPath: String, // Required
outputPath: String, // Required
partitionBy: List[String] = List.empty, // Optional with default
coalesce: Option[Int] = None // Optional, may be absent
)

Nested Configuration

case class FilterConfig(column: String, value: String)
case class Config(
inputPath: String,
outputPath: String,
filters: List[FilterConfig] = List.empty
)

HOCON:

instance-config {
input-path = "/data/input"
output-path = "/data/output"
filters = [
{ column = "status", value = "active" },
{ column = "region", value = "US" }
]
}

Sealed Trait Configuration

sealed trait OutputFormat
case object Parquet extends OutputFormat
case class Csv(delimiter: String = ",") extends OutputFormat

case class Config(
inputPath: String,
outputPath: String,
format: OutputFormat = Parquet
)

Error Handling

Throw exceptions to signal component failure:

class MyComponent(conf: MyComponent.Config) extends DataFlow {
override def run(): Unit = {
val df = spark.read.parquet(conf.inputPath)

if (df.isEmpty) {
throw new IllegalStateException(s"No data found at ${conf.inputPath}")
}

// Continue processing...
}
}

The framework captures exceptions and reports them via lifecycle hooks.

Testing Components

import org.scalatest.flatspec.AnyFlatSpec
import org.apache.spark.sql.SparkSession

class DataTransformSpec extends AnyFlatSpec {
implicit val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("test")
.getOrCreate()

"DataTransform" should "filter data correctly" in {
val tempDir = Files.createTempDirectory("test")
val inputPath = tempDir.resolve("input")
val outputPath = tempDir.resolve("output")

// Create test data
import spark.implicits._
val testData = Seq(
("1", "Alice", "active"),
("2", "Bob", "inactive"),
("3", "Charlie", "active")
).toDF("id", "name", "status")
testData.write.parquet(inputPath.toString)

val config = DataTransform.Config(
inputPath = inputPath.toString,
outputPath = outputPath.toString,
filterColumn = "status",
filterValue = "active"
)

val component = new DataTransform(config)
component.run()

val result = spark.read.parquet(outputPath.toString)
assert(result.count() == 2)
}
}

Next Steps