Skip to content

Parquet to CSV Pipeline

Exports Parquet files to CSV with optional leak-based filtering and config-hash idempotency.


Script

python scripts/run_parquet_csv_pipeline.py --config configs/pipelines_config.yml

Config section: parquet_csv_pipeline (or the section pointed to by your config).


Purpose

  • Input: Parquet files under parquet_root (or single file).
  • Output: CSV files in output_folder. Each Parquet can produce one CSV; optional logic removes or marks “no leak” outputs.

Main configuration

Key Description
parquet_root Directory (or file) of input Parquet.
output_folder Directory for output CSV.
overwrite If false (default), skip writing when CSV already exists.
remove_non_leak_outputs If true, remove or skip CSV when no leak is detected (configurable).
Encoding/separator Configurable via config (e.g. csv_sep, encoding).

Configuration reference (each item explained)

Item Meaning What it solves Notes
source_folder Path to the folder (or file) containing Parquet files to convert. Defines the input for the Parquet→CSV step; usually output of TPL/GENKEY or a processed dataset. Can be a directory (then glob is used) or a single file.
output_folder Directory where CSV files are written (one CSV per Parquet by default). Destination for downstream tools that expect CSV; structure can mirror subfolders.
glob Pattern to find Parquet files under source_folder (e.g. **/*.parquet). Lets you include only certain files or subdirectories.
overwrite If false, skips writing when the output CSV already exists. Idempotency: re-runs do not overwrite existing CSVs unless you force it. Set true to regenerate all.
csv Options for the CSV writer: sep, encoding, index, float_format. Controls delimiter, encoding, whether to write the index, and float formatting. index: false is typical for ML.
columns Map from Parquet column name to CSV header name. Renames and selects columns so CSV has short or standard names (e.g. for idetectfugas or other consumers). Only listed columns are exported.
max_workers Number of parallel workers for conversion. Speeds up conversion when many files are present.
show_progress If true, prints progress. Lets you monitor long runs.
extend Optional: leak_column, points_before_leak_start, points_before_leak_end. Inserts extra rows before leak start/end for offline testing; does not change the original leak segment. Use 0 to disable.
filter Options: only_leak_files, remove_non_leak_outputs, sample_fraction, leak_column, leak metric columns, reference_csv_name. Restricts to files with leak, removes no-leak outputs, and/or samples a fraction per folder; can write a reference CSV with per-file metrics. sample_fraction in (0, 1] for reproducible sampling.

Configuration template

Add this block to configs/pipelines_config.yml under the key parquet_csv_pipeline:

parquet_csv_pipeline:
  source_folder: "data/processed/supe_reducida/data"
  output_folder: "data/csv/supe_reducida"

  glob: "**/*.parquet"
  overwrite: false

  csv:
    sep: ","
    encoding: "utf-8"
    index: false
    float_format: null

  columns:
    # Map Parquet column names to CSV header names
    "PT 'POSITION:' 'POSITION_1378M' '(PA)' 'Pressure'": "PRESSURE_1378M"
    "GT 'POSITION:' 'POSITION_1378M' '(KG/S)' 'Total mass flow'": "MASS_FLOW_1378M"
    # ... add all columns to export
    "LEAK": "LEAK"
    "LEAK_SIZE_in": "LEAK_SIZE_in"
    "LEAK_LOCATION_m": "LEAK_LOCATION_m"
    "LEAK_FLOW_kg_s": "LEAK_FLOW_kg_s"

  max_workers: 8
  show_progress: true

  extend:
    leak_column: "LEAK"
    points_before_leak_start: 0
    points_before_leak_end: 0

  filter:
    only_leak_files: true
    remove_non_leak_outputs: true
    sample_fraction: 0.5
    leak_column: "LEAK"
    reference_csv_name: "reference.csv"

See the full file pipelines_config.yml in the repository for all options (e.g. filter, extend).


Default values applied by the script

run_parquet_csv_pipeline.py starts from an internal PARQUET_CSV_DEFAULTS block and fills missing optional keys with these values:

General defaults

Key Default
glob **/*.parquet
overwrite false
max_workers 8
show_progress true

csv defaults

Key Default
csv.sep ","
csv.encoding utf-8
csv.index false
csv.float_format null

extend defaults

Key Default
extend.leak_column "LEAK"
extend.points_before_leak_start 0
extend.points_before_leak_end 0
extend.context_points 50
extend.noise_scale 1.0
extend.random_seed null

filter defaults

Key Default
filter.only_leak_files true
filter.remove_non_leak_outputs true
filter.sample_fraction 1.0
filter.sample_seed 0
filter.leak_column "LEAK"
filter.leak_flow_column "LEAK_FLOW_kg_s"
filter.leak_size_column "LEAK_SIZE_in"
filter.leak_location_column "LEAK_LOCATION_m"
filter.reference_csv_name "reference.csv"

The script also ships template defaults for source_folder, output_folder, and columns, but for real runs keep those keys explicit because the config contract validates them as required.


Idempotency

  • Per file: If the output CSV exists and overwrite is false, the file is skipped.
  • Config hash: A hash of the pipeline config is stored (e.g. in metadata). If the config changes, the effective behavior can force regeneration even when overwrite is false (see overwrite_effective in code).

Storage and S3

  • Paths are resolved via resolve_config_paths.
  • S3 support in this ETL pipeline may be partial; for full S3, check the implementation or use local paths.