Skip to content

TPL/GENKEY Pipeline (OLGA)

Converts OLGA simulator outputs (.tpl and .genkey file pairs) into Parquet with selected columns, optional instrument noise, and metadata.


Script

python scripts/run_tpl_genkey_pipeline.py --config configs/pipelines_config.yml

Config section: tpl_genkey_pipeline.


Purpose

  • Input: Pairs of .tpl and .genkey files in source_folder.
  • Output: Parquet files (and optional metadata) in output_folder. Already-processed files are skipped (idempotent).

Main configuration

Key Description
source_folder Directory containing .tpl/.genkey pairs.
output_folder Directory for output Parquet and metadata.
selected_columns List of column names to extract (e.g. pressure, flow, temperature).
instrument_noise Optional list of objects. Each object has: column (exact name from selected_columns), error_pct (sensor error as decimal, e.g. 0.05 = 5%), repeatability_pct (repeatability as decimal, e.g. 0.0125 = 1.25%). Use [] to disable.
batch_size Optional advanced parameter. Default: 10.
max_workers Optional advanced parameter. Default: null -> CPU count.
encoding Optional advanced parameter. Default: utf-8.
output_format Optional advanced parameter. Default: parquet.
compression Optional advanced parameter. Default: null.
quality_checks Optional advanced parameter. Default: true.
save_metadata Optional advanced parameter. Default: true.

Configuration reference (each item explained)

Item Meaning What it solves Notes
source_folder Path to the directory that contains pairs of .tpl and .genkey files (OLGA simulator output). Defines where the pipeline reads raw data. Can be local or, with storage, an S3 prefix. Must exist; the script discovers file pairs by name.
output_folder Path where output Parquet files and optional metadata are written. Defines the destination for processed time series. Idempotency skips inputs that already have a corresponding output here. Created if missing.
selected_columns List of column name strings to extract from the OLGA files (in addition to time, which is always included). Limits which signals are stored; reduces size and keeps only what you need for downstream pipelines. Names must match exactly the OLGA variable format (e.g. "PT 'POSITION:' 'PIPE@1378M' '(PA)' 'Pressure'").
instrument_noise List of {column, error_pct, repeatability_pct}. Each entry applies simulated sensor error to one column. Simulates real sensor uncertainty so models are trained and evaluated under realistic measurement error. Use [] to disable. column must match a name in selected_columns.
batch_size Number of file pairs to process in one batch before writing or advancing. Balances I/O and memory: larger = fewer passes, smaller = lower memory. Typical: 5–20.
encoding Text encoding of the .tpl/.genkey files (e.g. utf-8). Ensures correct parsing of non-ASCII characters and OLGA-specific symbols. Match the encoding used by your OLGA export.
show_progress If true, prints progress (e.g. files processed, ETA). Lets you monitor long runs and estimate completion time.
max_workers Number of parallel workers for file I/O and parsing. Speeds up processing on multi-core machines. Tune to avoid overloading disk or CPU.
io_workers Workers dedicated to reading/writing files. Separates I/O from parsing so both can run in parallel.
parse_workers Workers dedicated to parsing file content. Speeds up the CPU-bound parsing step.
initial_rows_to_drop Number of rows to discard at the start of each case/simulation. Removes transient or warm-up data that might bias features or labels. Often 5–20 depending on simulation.
output_format Output format; currently only parquet is supported. Keeps a single, efficient columnar format for downstream pipelines.
compression Parquet compression codec (e.g. snappy, gzip, or null). Reduces disk usage; null = uncompressed for maximum speed.
quality_checks If true, runs basic checks on extracted data (e.g. shapes, nulls). Catches malformed or inconsistent outputs early.
save_metadata If true, writes per-file or run metadata (e.g. column list, row counts). Helps traceability and debugging; required for some idempotency logic.

Configuration template

Add this block to configs/pipelines_config.yml under the key tpl_genkey_pipeline:

tpl_genkey_pipeline:
  source_folder: "data/raw/SS"
  output_folder: "data/processed/SS"

  selected_columns:
    - "PT 'POSITION:' 'PIPE@1378M' '(PA)' 'Pressure'"
    - "GT 'POSITION:' 'PIPE@1378M' '(KG/S)' 'Total mass flow'"
    - "ROHL 'POSITION:' 'PIPE@1378M' '(KG/M3)' 'Oil density'"
    - "TM 'POSITION:' 'PIPE@1378M' '(C)' 'Fluid temperature'"
    # ... add all OLGA columns you need (time is always included)

  # Instrument noise: one entry per column. Each entry must have:
  #   - column:  exact string from selected_columns (used to apply noise to that signal)
  #   - error_pct:  sensor error as decimal (e.g. 0.05 = 5%)
  #   - repeatability_pct:  repeatability as decimal (e.g. 0.0125 = 1.25%)
  # Use [] to disable noise.
  instrument_noise:
    - column: "PT 'POSITION:' 'PIPE@1378M' '(PA)' 'Pressure'"
      error_pct: 0.05
      repeatability_pct: 0.0125
    - column: "GT 'POSITION:' 'PIPE@1378M' '(KG/S)' 'Total mass flow'"
      error_pct: 0.05
      repeatability_pct: 0.0125
    # Add one block per column that should have simulated sensor noise.
    # Column name must match exactly one of the names in selected_columns.

instrument_noise — Each item is an object with three keys:

Key Type Description
column string Exact column name as in selected_columns. Used to identify which signal to apply noise to.
error_pct number Sensor error as a decimal (e.g. 0.05 = 5%).
repeatability_pct number Repeatability as a decimal (e.g. 0.0125 = 1.25%).

Use instrument_noise: [] if you do not want simulated sensor noise.

Default values applied by the script

run_tpl_genkey_pipeline.py delegates to TPLGenkeyPipelineFactory().get_config_template("tpl_genkey"), so omitted optional keys are filled with these defaults:

Key Default
instrument_noise []
batch_size 10
encoding utf-8
show_progress true
use_parallel true
max_workers null -> use cpu_count()
io_workers 4
parse_workers 2
initial_rows_to_drop 0
output_format parquet
compression null
quality_checks true
create_subdirectories true
save_metadata true

The factory template also defines:

Key Template default
source_folder data/raw
output_folder data/processed
selected_columns ["time"]

In practice, keep source_folder, output_folder, and selected_columns explicit in pipelines_config.yml, because they are core inputs of the run even if the factory has a base template.

Advanced override example:

tpl_genkey_pipeline:
  source_folder: "data/raw/SS"
  output_folder: "data/processed/SS"
  selected_columns:
    - "PT 'POSITION:' 'PIPE@1378M' '(PA)' 'Pressure'"

  # Optional advanced overrides
  batch_size: 20
  use_parallel: true
  max_workers: null
  io_workers: 4
  parse_workers: 2
  initial_rows_to_drop: 10
  output_format: "parquet"
  compression: null
  quality_checks: true
  save_metadata: true

See the full file pipelines_config.yml in the repository for a complete example and storage (S3).


Storage and S3

  • Paths are resolved via resolve_config_paths (local or S3).
  • Full S3 read/write in this ETL pipeline is not yet implemented; if source_folder or output_folder resolve to S3, the script raises a clear error. Use local paths for now.

Idempotency

  • Files that already have a corresponding output in output_folder are skipped.
  • The extractor reports already_processed and skipped_files in the result.

Scheduling

  • APScheduler: Use etl_scheduler.py with a config whose pipeline section matches this pipeline (same keys as tpl_genkey_pipeline).
  • Prefect: Use genkey_pipeline_flow and deploy with the desired schedule; the flow runs this pipeline with the same config.