Skip to content

Windows Pipeline

Builds fixed-size time windows from Parquet time-series data. Each input file (or each segment) produces one or more window Parquet files. Already-processed inputs are skipped.


Script

python scripts/run_windows_pipeline.py --config configs/pipelines_config.yml

Config section: windows_pipeline.


Purpose

  • Input: Parquet files (time-series) in source_folder (typically output of TPL/GENKEY).
  • Output: One Parquet per window in output_folder (e.g. data/ and metadata/). Windows are fixed length; step size can be configured.

Main configuration

Key Description
source_folder Directory of input Parquet time-series.
output_folder Directory for window Parquet files and metadata.
window_size Number of rows per window.
step_size Step between window starts (defaults or derived from config).
selected_columns Columns to keep in windows (must align with source).
max_workers Parallel workers.
batch_size Batching for processing.

Configuration reference (each item explained)

Item Meaning What it solves Notes
source_folder Path to Parquet time-series files (usually the output of the TPL/GENKEY pipeline). Defines the input for windowing; each file is one case or segment. Must match the structure expected by the extractor (one time column + signal columns + optional LEAK).
output_folder Directory where window Parquet files and metadata are written. Destination for fixed-size windows; idempotency skips inputs that already have outputs here. Subdirectories (e.g. data/, metadata/) are created if create_subdirectories is true.
window_size Number of rows per window (e.g. 40). Sets the temporal context length for ML; must match the feature/training pipeline (e.g. same as in features and test offline). Larger = more context but fewer windows; smaller = more windows, less context.
step_size Rows to advance between the start of consecutive windows (e.g. 2). Controls overlap: step smaller than window_size gives overlapping windows; equal gives non-overlapping. Smaller step = more windows, more compute.
previous_step_before_leak Rows (or steps) to include before the leak start when building “leak” windows. Ensures each leak window includes a short pre-leak segment so the model sees the transition.
start_fraction_leak_points Fraction of window_size that must be “leak” (e.g. LEAK=1) at the start of a leak window (e.g. 0.25 = 25%). Avoids windows that are mostly no-leak; keeps labels representative.
stop_fraction_leak_points Fraction of window_size that limits how the window ends relative to leak (e.g. 0.25). Controls where the window is cut so the leak segment is well represented.
start_fraction_no_leak_points / stop_fraction_no_leak_points Same idea for “no leak” windows. Used when generating negative (no-leak) windows with specific composition. Often 0 if you only care about leak windows.
leak_windows Number of windows to generate per case that contain leak (after leak start). Balances positive examples; more = more leak samples for training.
windows_no_leak Number of no-leak windows to sample per case (e.g. 15); null = sequential mode. Balances negative examples; avoids an excess of leak-only data. null disables random no-leak sampling.
leak_column Name of the column that indicates leak (0/1 or false/true). Tells the pipeline which column to use for leak start/stop and for filtering leak vs no-leak windows. Must exist in the source Parquet.
batch_size Number of input files to handle per batch. Balances memory and progress granularity.
show_progress If true, prints progress. Lets you monitor long runs.
output_format Output format (e.g. parquet). Keeps format consistent with the rest of the stack.
compression Parquet compression (e.g. snappy, null). Reduces size or maximizes speed.
quality_checks If true, runs checks on window data. Catches inconsistent window sizes or missing columns.
save_metadata If true, writes metadata for the run. Needed for idempotency and traceability.
create_subdirectories If true, creates subdirs (e.g. data/, metadata/) under output_folder. Keeps window files and metadata organized.

Configuration template

Add this block to configs/pipelines_config.yml under the key windows_pipeline:

windows_pipeline:
  source_folder: "data/processed/SS/data"
  output_folder: "data/windows/SS"

  window_size: 40
  step_size: 2

  previous_step_before_leak: 2
  start_fraction_leak_points: 0.25
  stop_fraction_leak_points: 0.25
  start_fraction_no_leak_points: 0.0
  stop_fraction_no_leak_points: 0.0

  leak_windows: 5
  windows_no_leak: 15
  leak_column: "LEAK"

  batch_size: 10
  show_progress: true

  output_format: "parquet"
  compression: null
  quality_checks: true
  save_metadata: true
  create_subdirectories: true

See the full file pipelines_config.yml in the repository for all options.


Default values applied by the script

run_windows_pipeline.py applies WINDOWS_PIPELINE_DEFAULTS before delegating to the ETL factory, so omitted optional keys are filled with these values:

Key Default
step_size 2
previous_step_before_leak 2
start_fraction_leak_points 0.25
stop_fraction_leak_points 0.25
start_fraction_no_leak_points 0.0
stop_fraction_no_leak_points 0.0
leak_windows 5
windows_no_leak 15
windows_no_leak_vf true
leak_column "LEAK"
batch_size 10
show_progress true
use_parallel true
max_workers null -> extractor uses cpu_count()
output_format parquet
windows_storage_format flat
compression null
quality_checks true
save_metadata true
create_subdirectories true

Additional ETL-layer defaults still come from the factory/template, for example progress_monitoring=true, progress_update_interval=1.0, memory_monitoring=true, and gc_frequency=100.

Although the factory template defines window_size=40, the CLI wrapper still expects window_size to be present explicitly in the config, so keep it declared in pipelines_config.yml.


Idempotency

  • The extractor checks existing windows in output_folder and skips input files that already have corresponding window outputs.
  • Stats: already_processed, skipped_files, pending_files.

Storage and S3

  • Paths are resolved via resolve_config_paths.
  • Full S3 I/O for this ETL pipeline is not yet implemented; S3-resolved paths will raise an error. Use local paths.