TPL/GENKEY Pipeline (OLGA)¶
Converts OLGA simulator outputs (.tpl and .genkey file pairs) into Parquet with selected columns, optional instrument noise, and metadata.
Script¶
Config section: tpl_genkey_pipeline.
Purpose¶
- Input: Pairs of
.tpland.genkeyfiles insource_folder. - Output: Parquet files (and optional metadata) in
output_folder. Already-processed files are skipped (idempotent).
Main configuration¶
| Key | Description |
|---|---|
source_folder |
Directory containing .tpl/.genkey pairs. |
output_folder |
Directory for output Parquet and metadata. |
selected_columns |
List of column names to extract (e.g. pressure, flow, temperature). |
instrument_noise |
Optional list of objects. Each object has: column (exact name from selected_columns), error_pct (sensor error as decimal, e.g. 0.05 = 5%), repeatability_pct (repeatability as decimal, e.g. 0.0125 = 1.25%). Use [] to disable. |
batch_size |
Optional advanced parameter. Default: 10. |
max_workers |
Optional advanced parameter. Default: null -> CPU count. |
encoding |
Optional advanced parameter. Default: utf-8. |
output_format |
Optional advanced parameter. Default: parquet. |
compression |
Optional advanced parameter. Default: null. |
quality_checks |
Optional advanced parameter. Default: true. |
save_metadata |
Optional advanced parameter. Default: true. |
Configuration reference (each item explained)¶
| Item | Meaning | What it solves | Notes |
|---|---|---|---|
source_folder |
Path to the directory that contains pairs of .tpl and .genkey files (OLGA simulator output). |
Defines where the pipeline reads raw data. Can be local or, with storage, an S3 prefix. |
Must exist; the script discovers file pairs by name. |
output_folder |
Path where output Parquet files and optional metadata are written. | Defines the destination for processed time series. Idempotency skips inputs that already have a corresponding output here. | Created if missing. |
selected_columns |
List of column name strings to extract from the OLGA files (in addition to time, which is always included). |
Limits which signals are stored; reduces size and keeps only what you need for downstream pipelines. | Names must match exactly the OLGA variable format (e.g. "PT 'POSITION:' 'PIPE@1378M' '(PA)' 'Pressure'"). |
instrument_noise |
List of {column, error_pct, repeatability_pct}. Each entry applies simulated sensor error to one column. |
Simulates real sensor uncertainty so models are trained and evaluated under realistic measurement error. | Use [] to disable. column must match a name in selected_columns. |
batch_size |
Number of file pairs to process in one batch before writing or advancing. | Balances I/O and memory: larger = fewer passes, smaller = lower memory. | Typical: 5–20. |
encoding |
Text encoding of the .tpl/.genkey files (e.g. utf-8). |
Ensures correct parsing of non-ASCII characters and OLGA-specific symbols. | Match the encoding used by your OLGA export. |
show_progress |
If true, prints progress (e.g. files processed, ETA). |
Lets you monitor long runs and estimate completion time. | |
max_workers |
Number of parallel workers for file I/O and parsing. | Speeds up processing on multi-core machines. | Tune to avoid overloading disk or CPU. |
io_workers |
Workers dedicated to reading/writing files. | Separates I/O from parsing so both can run in parallel. | |
parse_workers |
Workers dedicated to parsing file content. | Speeds up the CPU-bound parsing step. | |
initial_rows_to_drop |
Number of rows to discard at the start of each case/simulation. | Removes transient or warm-up data that might bias features or labels. | Often 5–20 depending on simulation. |
output_format |
Output format; currently only parquet is supported. |
Keeps a single, efficient columnar format for downstream pipelines. | |
compression |
Parquet compression codec (e.g. snappy, gzip, or null). |
Reduces disk usage; null = uncompressed for maximum speed. |
|
quality_checks |
If true, runs basic checks on extracted data (e.g. shapes, nulls). |
Catches malformed or inconsistent outputs early. | |
save_metadata |
If true, writes per-file or run metadata (e.g. column list, row counts). |
Helps traceability and debugging; required for some idempotency logic. |
Configuration template¶
Add this block to configs/pipelines_config.yml under the key tpl_genkey_pipeline:
tpl_genkey_pipeline:
source_folder: "data/raw/SS"
output_folder: "data/processed/SS"
selected_columns:
- "PT 'POSITION:' 'PIPE@1378M' '(PA)' 'Pressure'"
- "GT 'POSITION:' 'PIPE@1378M' '(KG/S)' 'Total mass flow'"
- "ROHL 'POSITION:' 'PIPE@1378M' '(KG/M3)' 'Oil density'"
- "TM 'POSITION:' 'PIPE@1378M' '(C)' 'Fluid temperature'"
# ... add all OLGA columns you need (time is always included)
# Instrument noise: one entry per column. Each entry must have:
# - column: exact string from selected_columns (used to apply noise to that signal)
# - error_pct: sensor error as decimal (e.g. 0.05 = 5%)
# - repeatability_pct: repeatability as decimal (e.g. 0.0125 = 1.25%)
# Use [] to disable noise.
instrument_noise:
- column: "PT 'POSITION:' 'PIPE@1378M' '(PA)' 'Pressure'"
error_pct: 0.05
repeatability_pct: 0.0125
- column: "GT 'POSITION:' 'PIPE@1378M' '(KG/S)' 'Total mass flow'"
error_pct: 0.05
repeatability_pct: 0.0125
# Add one block per column that should have simulated sensor noise.
# Column name must match exactly one of the names in selected_columns.
instrument_noise — Each item is an object with three keys:
| Key | Type | Description |
|---|---|---|
column |
string | Exact column name as in selected_columns. Used to identify which signal to apply noise to. |
error_pct |
number | Sensor error as a decimal (e.g. 0.05 = 5%). |
repeatability_pct |
number | Repeatability as a decimal (e.g. 0.0125 = 1.25%). |
Use instrument_noise: [] if you do not want simulated sensor noise.
Default values applied by the script¶
run_tpl_genkey_pipeline.py delegates to TPLGenkeyPipelineFactory().get_config_template("tpl_genkey"),
so omitted optional keys are filled with these defaults:
| Key | Default |
|---|---|
instrument_noise |
[] |
batch_size |
10 |
encoding |
utf-8 |
show_progress |
true |
use_parallel |
true |
max_workers |
null -> use cpu_count() |
io_workers |
4 |
parse_workers |
2 |
initial_rows_to_drop |
0 |
output_format |
parquet |
compression |
null |
quality_checks |
true |
create_subdirectories |
true |
save_metadata |
true |
The factory template also defines:
| Key | Template default |
|---|---|
source_folder |
data/raw |
output_folder |
data/processed |
selected_columns |
["time"] |
In practice, keep source_folder, output_folder, and selected_columns explicit in
pipelines_config.yml, because they are core inputs of the run even if the factory has a base template.
Advanced override example:
tpl_genkey_pipeline:
source_folder: "data/raw/SS"
output_folder: "data/processed/SS"
selected_columns:
- "PT 'POSITION:' 'PIPE@1378M' '(PA)' 'Pressure'"
# Optional advanced overrides
batch_size: 20
use_parallel: true
max_workers: null
io_workers: 4
parse_workers: 2
initial_rows_to_drop: 10
output_format: "parquet"
compression: null
quality_checks: true
save_metadata: true
See the full file pipelines_config.yml in the repository for a complete example and storage (S3).
Storage and S3¶
- Paths are resolved via
resolve_config_paths(local or S3). - Full S3 read/write in this ETL pipeline is not yet implemented; if
source_folderoroutput_folderresolve to S3, the script raises a clear error. Use local paths for now.
Idempotency¶
- Files that already have a corresponding output in
output_folderare skipped. - The extractor reports
already_processedandskipped_filesin the result.
Scheduling¶
- APScheduler: Use
etl_scheduler.pywith a config whosepipelinesection matches this pipeline (same keys astpl_genkey_pipeline). - Prefect: Use
genkey_pipeline_flowand deploy with the desired schedule; the flow runs this pipeline with the same config.