Skip to content

Features Pipeline

Extracts wavelet-based (and optional raw) features from window Parquet files and writes a single aggregated Parquet plus metadata. Supports checkpoints and incremental buffer writes.


Script

python scripts/run_features_pipeline.py --config configs/pipelines_config.yml

Config section: features_pipeline.


Purpose

  • Input: Window Parquet files in source_folder (output of the Windows pipeline).
  • Output: One Parquet dataset (e.g. data/features_dataset.parquet) and metadata (e.g. metadata/features_metadata.json, metadata/features_schema.json) in output_folder (or under source_folder/features if output_folder is not set). Checkpoints allow resuming after interruption.

Main configuration

Key Description
source_folder Directory of window Parquet files.
output_folder Base directory for features output (optional).
feature_columns Column names to use for feature extraction (e.g. pressure, flow).
label_column Label column (e.g. LEAK).
wavelet Wavelet name (e.g. db2).
wavelet_level Decomposition level.
write_buffer_rows Rows to buffer before writing to Parquet (e.g. 5000).
checkpoint_frequency Batches between checkpoints (e.g. 1000).
parquet_row_group_size Row group size in Parquet.
use_parallel Use parallel file processing.
max_workers Number of parallel workers (null = CPU count).
include_raw_vectorization Include raw signal features in addition to wavelet.
compression Parquet compression (e.g. snappy).

Configuration reference (each item explained)

Item Meaning What it solves Notes
source_folder Path to window Parquet files (output of the Windows pipeline). Defines the input for feature extraction; each file is one window. Typically contains many small Parquet files.
output_folder Base directory for the features dataset and metadata (e.g. data/features_dataset.parquet, metadata/). Centralizes features and schema for training and Optuna; if null, output goes under source_folder/features/.
feature_columns List of signal column names to use for feature extraction (same naming as in windows). Restricts which signals are transformed into wavelet/raw features; must match columns present in the window Parquet. Order can affect schema; names must match exactly.
label_column Name of the binary label column (e.g. LEAK); the last value of each window is taken as the label. Defines the target for training; one label per window. Must exist in the window data.
enforce_homogeneous_rows If true, checks that every window has the same number of rows. Ensures consistent feature length and avoids silent errors from misaligned windows. Should match window_size from the Windows pipeline.
wavelet Wavelet family for decomposition (e.g. db2). Chooses the basis for time–frequency features; affects sensitivity to transients. Common: db2, db4.
wavelet_level Decomposition level (e.g. 3). Controls how many frequency bands you get; higher = more levels, more features. Must be compatible with window length.
save_metadata If true, writes features_metadata.json and schema files. Required for idempotency (config hash) and for downstream pipelines (schema).
checkpoint_frequency Save a checkpoint every N batches. Allows resuming after interruption without reprocessing all files. Larger = fewer checkpoints, less I/O; smaller = safer for long runs.
compression Parquet compression (e.g. snappy, zstd). Reduces size of the features dataset.
write_buffer_rows Number of rows to buffer in memory before writing to Parquet. Balances memory use and write efficiency.
parquet_row_group_size Row group size in the output Parquet. Affects read performance and compression; larger groups = fewer seeks.
show_progress If true, prints progress. Lets you monitor long runs.
progress_monitoring / progress_update_interval Enable progress and interval in seconds for updates. Fine-grained progress for very long runs.
include_raw_vectorization If true, adds raw signal stats (e.g. mean, std) in addition to wavelet features. Gives models access to simple time-domain stats; can help when wavelet alone is not enough. Increases feature count.
use_parallel If true, processes files in parallel. Speeds up extraction on multi-core machines; critical for 100K+ files.
max_workers Number of parallel workers; null = use all CPUs. Lets you limit concurrency to avoid memory or I/O saturation.

Configuration template

Add this block to configs/pipelines_config.yml under the key features_pipeline:

features_pipeline:
  source_folder: "data/windows/SS/data"
  output_folder: "data/features/SS"

  feature_columns:
    - "PT 'POSITION:' 'POSITION_1378M' '(PA)' 'Pressure'"
    - "GT 'POSITION:' 'POSITION_1378M' '(KG/S)' 'Total mass flow'"
    # ... add all signal columns to extract features from

  label_column: "LEAK"
  enforce_homogeneous_rows: true

  wavelet: "db2"
  wavelet_level: 3
  save_metadata: true
  checkpoint_frequency: 1000
  compression: "snappy"
  write_buffer_rows: 5000
  parquet_row_group_size: 10000
  show_progress: true
  progress_monitoring: true
  progress_update_interval: 1.0
  include_raw_vectorization: false

  use_parallel: true
  max_workers: null   # null = use all CPUs

See the full file pipelines_config.yml in the repository for the complete template.


Default values applied by the script

run_features_pipeline.py applies FEATURES_PIPELINE_DEFAULTS before calling the ETL factory, so omitted optional keys are completed with these values:

Key Default
label_column "LEAK"
enforce_homogeneous_rows true
wavelet "db2"
wavelet_level 3
save_metadata true
checkpoint_frequency 1000
compression "snappy"
write_buffer_rows 5000
parquet_row_group_size 10000
show_progress true
progress_monitoring true
progress_update_interval 1.0
include_raw_vectorization false
use_parallel true
max_workers null -> feature extractor uses cpu_count()

The ETL layer also keeps auxiliary defaults such as batch_size=5, output_format="parquet", memory_monitoring=true, gc_frequency=100, and use_prefect_progress=false.

Important nuance: the runtime logic can derive the destination as <source>/features/{data,metadata} when output_folder is omitted, but the current CLI wrapper still validates output_folder as required before launching the pipeline. For now, keep output_folder explicit in pipelines_config.yml.


Idempotency

  • If metadata/features_metadata.json exists and its config_hash matches the current config, the pipeline exits without reprocessing and reports “Dataset already processed (idempotent)”.

Checkpointing

  • The loader writes checkpoints (e.g. checkpoints/features_checkpoint.parquet) periodically. On restart, it can resume from the checkpoint and append to the same run.

Storage and S3

  • Paths are resolved via resolve_config_paths.
  • Full S3 I/O for this ETL pipeline is not yet implemented; use local paths for now.