Parquet to CSV Pipeline¶
Exports Parquet files to CSV with optional leak-based filtering and config-hash idempotency.
Script¶
Config section: parquet_csv_pipeline (or the section pointed to by your config).
Purpose¶
- Input: Parquet files under
parquet_root(or single file). - Output: CSV files in
output_folder. Each Parquet can produce one CSV; optional logic removes or marks “no leak” outputs.
Main configuration¶
| Key | Description |
|---|---|
parquet_root |
Directory (or file) of input Parquet. |
output_folder |
Directory for output CSV. |
overwrite |
If false (default), skip writing when CSV already exists. |
remove_non_leak_outputs |
If true, remove or skip CSV when no leak is detected (configurable). |
| Encoding/separator | Configurable via config (e.g. csv_sep, encoding). |
Configuration reference (each item explained)¶
| Item | Meaning | What it solves | Notes |
|---|---|---|---|
source_folder |
Path to the folder (or file) containing Parquet files to convert. | Defines the input for the Parquet→CSV step; usually output of TPL/GENKEY or a processed dataset. | Can be a directory (then glob is used) or a single file. |
output_folder |
Directory where CSV files are written (one CSV per Parquet by default). | Destination for downstream tools that expect CSV; structure can mirror subfolders. | |
glob |
Pattern to find Parquet files under source_folder (e.g. **/*.parquet). |
Lets you include only certain files or subdirectories. | |
overwrite |
If false, skips writing when the output CSV already exists. |
Idempotency: re-runs do not overwrite existing CSVs unless you force it. | Set true to regenerate all. |
csv |
Options for the CSV writer: sep, encoding, index, float_format. |
Controls delimiter, encoding, whether to write the index, and float formatting. | index: false is typical for ML. |
columns |
Map from Parquet column name to CSV header name. | Renames and selects columns so CSV has short or standard names (e.g. for idetectfugas or other consumers). | Only listed columns are exported. |
max_workers |
Number of parallel workers for conversion. | Speeds up conversion when many files are present. | |
show_progress |
If true, prints progress. |
Lets you monitor long runs. | |
extend |
Optional: leak_column, points_before_leak_start, points_before_leak_end. |
Inserts extra rows before leak start/end for offline testing; does not change the original leak segment. | Use 0 to disable. |
filter |
Options: only_leak_files, remove_non_leak_outputs, sample_fraction, leak_column, leak metric columns, reference_csv_name. |
Restricts to files with leak, removes no-leak outputs, and/or samples a fraction per folder; can write a reference CSV with per-file metrics. | sample_fraction in (0, 1] for reproducible sampling. |
Configuration template¶
Add this block to configs/pipelines_config.yml under the key parquet_csv_pipeline:
parquet_csv_pipeline:
source_folder: "data/processed/supe_reducida/data"
output_folder: "data/csv/supe_reducida"
glob: "**/*.parquet"
overwrite: false
csv:
sep: ","
encoding: "utf-8"
index: false
float_format: null
columns:
# Map Parquet column names to CSV header names
"PT 'POSITION:' 'POSITION_1378M' '(PA)' 'Pressure'": "PRESSURE_1378M"
"GT 'POSITION:' 'POSITION_1378M' '(KG/S)' 'Total mass flow'": "MASS_FLOW_1378M"
# ... add all columns to export
"LEAK": "LEAK"
"LEAK_SIZE_in": "LEAK_SIZE_in"
"LEAK_LOCATION_m": "LEAK_LOCATION_m"
"LEAK_FLOW_kg_s": "LEAK_FLOW_kg_s"
max_workers: 8
show_progress: true
extend:
leak_column: "LEAK"
points_before_leak_start: 0
points_before_leak_end: 0
filter:
only_leak_files: true
remove_non_leak_outputs: true
sample_fraction: 0.5
leak_column: "LEAK"
reference_csv_name: "reference.csv"
See the full file pipelines_config.yml in the repository for all options (e.g. filter, extend).
Default values applied by the script¶
run_parquet_csv_pipeline.py starts from an internal PARQUET_CSV_DEFAULTS block and fills
missing optional keys with these values:
General defaults¶
| Key | Default |
|---|---|
glob |
**/*.parquet |
overwrite |
false |
max_workers |
8 |
show_progress |
true |
csv defaults¶
| Key | Default |
|---|---|
csv.sep |
"," |
csv.encoding |
utf-8 |
csv.index |
false |
csv.float_format |
null |
extend defaults¶
| Key | Default |
|---|---|
extend.leak_column |
"LEAK" |
extend.points_before_leak_start |
0 |
extend.points_before_leak_end |
0 |
extend.context_points |
50 |
extend.noise_scale |
1.0 |
extend.random_seed |
null |
filter defaults¶
| Key | Default |
|---|---|
filter.only_leak_files |
true |
filter.remove_non_leak_outputs |
true |
filter.sample_fraction |
1.0 |
filter.sample_seed |
0 |
filter.leak_column |
"LEAK" |
filter.leak_flow_column |
"LEAK_FLOW_kg_s" |
filter.leak_size_column |
"LEAK_SIZE_in" |
filter.leak_location_column |
"LEAK_LOCATION_m" |
filter.reference_csv_name |
"reference.csv" |
The script also ships template defaults for source_folder, output_folder, and columns,
but for real runs keep those keys explicit because the config contract validates them as required.
Idempotency¶
- Per file: If the output CSV exists and
overwriteisfalse, the file is skipped. - Config hash: A hash of the pipeline config is stored (e.g. in metadata). If the config changes, the effective behavior can force regeneration even when
overwriteisfalse(seeoverwrite_effectivein code).
Storage and S3¶
- Paths are resolved via
resolve_config_paths. - S3 support in this ETL pipeline may be partial; for full S3, check the implementation or use local paths.