Features Pipeline¶
Extracts wavelet-based (and optional raw) features from window Parquet files and writes a single aggregated Parquet plus metadata. Supports checkpoints and incremental buffer writes.
Script¶
Config section: features_pipeline.
Purpose¶
- Input: Window Parquet files in
source_folder(output of the Windows pipeline). - Output: One Parquet dataset (e.g.
data/features_dataset.parquet) and metadata (e.g.metadata/features_metadata.json,metadata/features_schema.json) inoutput_folder(or undersource_folder/featuresifoutput_folderis not set). Checkpoints allow resuming after interruption.
Main configuration¶
| Key | Description |
|---|---|
source_folder |
Directory of window Parquet files. |
output_folder |
Base directory for features output (optional). |
feature_columns |
Column names to use for feature extraction (e.g. pressure, flow). |
label_column |
Label column (e.g. LEAK). |
wavelet |
Wavelet name (e.g. db2). |
wavelet_level |
Decomposition level. |
write_buffer_rows |
Rows to buffer before writing to Parquet (e.g. 5000). |
checkpoint_frequency |
Batches between checkpoints (e.g. 1000). |
parquet_row_group_size |
Row group size in Parquet. |
use_parallel |
Use parallel file processing. |
max_workers |
Number of parallel workers (null = CPU count). |
include_raw_vectorization |
Include raw signal features in addition to wavelet. |
compression |
Parquet compression (e.g. snappy). |
Configuration reference (each item explained)¶
| Item | Meaning | What it solves | Notes |
|---|---|---|---|
source_folder |
Path to window Parquet files (output of the Windows pipeline). | Defines the input for feature extraction; each file is one window. | Typically contains many small Parquet files. |
output_folder |
Base directory for the features dataset and metadata (e.g. data/features_dataset.parquet, metadata/). |
Centralizes features and schema for training and Optuna; if null, output goes under source_folder/features/. |
|
feature_columns |
List of signal column names to use for feature extraction (same naming as in windows). | Restricts which signals are transformed into wavelet/raw features; must match columns present in the window Parquet. | Order can affect schema; names must match exactly. |
label_column |
Name of the binary label column (e.g. LEAK); the last value of each window is taken as the label. |
Defines the target for training; one label per window. | Must exist in the window data. |
enforce_homogeneous_rows |
If true, checks that every window has the same number of rows. |
Ensures consistent feature length and avoids silent errors from misaligned windows. | Should match window_size from the Windows pipeline. |
wavelet |
Wavelet family for decomposition (e.g. db2). |
Chooses the basis for time–frequency features; affects sensitivity to transients. | Common: db2, db4. |
wavelet_level |
Decomposition level (e.g. 3). | Controls how many frequency bands you get; higher = more levels, more features. | Must be compatible with window length. |
save_metadata |
If true, writes features_metadata.json and schema files. |
Required for idempotency (config hash) and for downstream pipelines (schema). | |
checkpoint_frequency |
Save a checkpoint every N batches. | Allows resuming after interruption without reprocessing all files. | Larger = fewer checkpoints, less I/O; smaller = safer for long runs. |
compression |
Parquet compression (e.g. snappy, zstd). |
Reduces size of the features dataset. | |
write_buffer_rows |
Number of rows to buffer in memory before writing to Parquet. | Balances memory use and write efficiency. | |
parquet_row_group_size |
Row group size in the output Parquet. | Affects read performance and compression; larger groups = fewer seeks. | |
show_progress |
If true, prints progress. |
Lets you monitor long runs. | |
progress_monitoring / progress_update_interval |
Enable progress and interval in seconds for updates. | Fine-grained progress for very long runs. | |
include_raw_vectorization |
If true, adds raw signal stats (e.g. mean, std) in addition to wavelet features. |
Gives models access to simple time-domain stats; can help when wavelet alone is not enough. | Increases feature count. |
use_parallel |
If true, processes files in parallel. |
Speeds up extraction on multi-core machines; critical for 100K+ files. | |
max_workers |
Number of parallel workers; null = use all CPUs. |
Lets you limit concurrency to avoid memory or I/O saturation. |
Configuration template¶
Add this block to configs/pipelines_config.yml under the key features_pipeline:
features_pipeline:
source_folder: "data/windows/SS/data"
output_folder: "data/features/SS"
feature_columns:
- "PT 'POSITION:' 'POSITION_1378M' '(PA)' 'Pressure'"
- "GT 'POSITION:' 'POSITION_1378M' '(KG/S)' 'Total mass flow'"
# ... add all signal columns to extract features from
label_column: "LEAK"
enforce_homogeneous_rows: true
wavelet: "db2"
wavelet_level: 3
save_metadata: true
checkpoint_frequency: 1000
compression: "snappy"
write_buffer_rows: 5000
parquet_row_group_size: 10000
show_progress: true
progress_monitoring: true
progress_update_interval: 1.0
include_raw_vectorization: false
use_parallel: true
max_workers: null # null = use all CPUs
See the full file pipelines_config.yml in the repository for the complete template.
Default values applied by the script¶
run_features_pipeline.py applies FEATURES_PIPELINE_DEFAULTS before calling the ETL factory,
so omitted optional keys are completed with these values:
| Key | Default |
|---|---|
label_column |
"LEAK" |
enforce_homogeneous_rows |
true |
wavelet |
"db2" |
wavelet_level |
3 |
save_metadata |
true |
checkpoint_frequency |
1000 |
compression |
"snappy" |
write_buffer_rows |
5000 |
parquet_row_group_size |
10000 |
show_progress |
true |
progress_monitoring |
true |
progress_update_interval |
1.0 |
include_raw_vectorization |
false |
use_parallel |
true |
max_workers |
null -> feature extractor uses cpu_count() |
The ETL layer also keeps auxiliary defaults such as batch_size=5, output_format="parquet",
memory_monitoring=true, gc_frequency=100, and use_prefect_progress=false.
Important nuance: the runtime logic can derive the destination as <source>/features/{data,metadata}
when output_folder is omitted, but the current CLI wrapper still validates output_folder
as required before launching the pipeline. For now, keep output_folder explicit in
pipelines_config.yml.
Idempotency¶
- If
metadata/features_metadata.jsonexists and itsconfig_hashmatches the current config, the pipeline exits without reprocessing and reports “Dataset already processed (idempotent)”.
Checkpointing¶
- The loader writes checkpoints (e.g.
checkpoints/features_checkpoint.parquet) periodically. On restart, it can resume from the checkpoint and append to the same run.
Storage and S3¶
- Paths are resolved via
resolve_config_paths. - Full S3 I/O for this ETL pipeline is not yet implemented; use local paths for now.