Skip to main content

rust_data_processing/ingestion/
unified.rs

1//! Unified ingestion entrypoint.
2//!
3//! Most callers should use [`ingest_from_path`], which ingests a file into an in-memory
4//! [`crate::types::DataSet`] using a provided [`crate::types::Schema`].
5//!
6//! - If [`IngestionOptions::format`] is `None`, the ingestion format is inferred from the file
7//!   extension.
8//! - If an [`super::observability::IngestionObserver`] is provided, success/failure/alerts are
9//!   reported to it.
10
11use std::error::Error as StdError;
12use std::fmt;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15
16use crate::error::{IngestionError, IngestionResult};
17use crate::types::{DataSet, Schema, Value};
18
19use super::observability::{
20    IngestionContext, IngestionObserver, IngestionSeverity, IngestionStats,
21};
22use super::polars_bridge::{infer_schema_from_dataframe_lossy, polars_error_to_ingestion};
23use super::watermark::{
24    apply_watermark_after_ingest, max_value_in_column, validate_watermark_config,
25};
26use super::{csv, excel, json, parquet, xml};
27use polars::prelude::*;
28
29/// Supported ingestion formats.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum IngestionFormat {
32    /// Comma-separated values.
33    Csv,
34    /// JSON array-of-objects or NDJSON.
35    Json,
36    /// Apache Parquet.
37    Parquet,
38    /// Spreadsheet/workbook formats (feature-gated behind `excel`).
39    Excel,
40    /// Row-oriented `<rdp_records>` XML (see [`super::xml`]).
41    Xml,
42}
43
44impl IngestionFormat {
45    /// Parse an ingestion format from a file extension (case-insensitive).
46    pub fn from_extension(ext: &str) -> Option<Self> {
47        match ext.to_ascii_lowercase().as_str() {
48            "csv" => Some(Self::Csv),
49            "json" | "ndjson" => Some(Self::Json),
50            "parquet" | "pq" => Some(Self::Parquet),
51            "xlsx" | "xls" | "xlsm" | "xlsb" | "ods" => Some(Self::Excel),
52            "xml" => Some(Self::Xml),
53            _ => None,
54        }
55    }
56}
57
58/// How to choose sheet(s) when ingesting an Excel workbook.
59#[derive(Debug, Clone, PartialEq, Eq, Default)]
60pub enum ExcelSheetSelection {
61    /// Ingest the first sheet (default).
62    #[default]
63    First,
64    /// Ingest a single named sheet.
65    Sheet(String),
66    /// Ingest all sheets and concatenate rows.
67    AllSheets,
68    /// Ingest only the listed sheets (in order) and concatenate rows.
69    Sheets(Vec<String>),
70}
71
72/// Options controlling unified ingestion behavior.
73///
74/// Use [`Default`] for common cases.
75#[derive(Clone)]
76pub struct IngestionOptions {
77    /// If `None`, auto-detect format from file extension.
78    pub format: Option<IngestionFormat>,
79    /// Excel-specific options.
80    pub excel_sheet_selection: ExcelSheetSelection,
81    /// Optional observer for logging/alerts.
82    pub observer: Option<Arc<dyn IngestionObserver>>,
83    /// Severity threshold at which `on_alert` is invoked.
84    pub alert_at_or_above: IngestionSeverity,
85    /// Column name for incremental / high-water filtering (must be used with
86    /// [`Self::watermark_exclusive_above`]).
87    pub watermark_column: Option<String>,
88    /// Keep only rows where `watermark_column` is **strictly greater than** this value (same
89    /// [`crate::types::DataType`] as the column). Applied after ingest for file and DB sources.
90    pub watermark_exclusive_above: Option<Value>,
91    /// After ingest (and watermark filter), keep at most this many rows from the start of the
92    /// dataset. **`None`** = no limit. Pipeline `sources.options.max_rows` maps here.
93    pub max_rows: Option<usize>,
94}
95
96impl fmt::Debug for IngestionOptions {
97    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98        f.debug_struct("IngestionOptions")
99            .field("format", &self.format)
100            .field("excel_sheet_selection", &self.excel_sheet_selection)
101            .field("observer_set", &self.observer.is_some())
102            .field("alert_at_or_above", &self.alert_at_or_above)
103            .field("watermark_column", &self.watermark_column)
104            .field("watermark_exclusive_above", &self.watermark_exclusive_above)
105            .field("max_rows", &self.max_rows)
106            .finish()
107    }
108}
109
110impl Default for IngestionOptions {
111    fn default() -> Self {
112        Self {
113            format: None,
114            excel_sheet_selection: ExcelSheetSelection::default(),
115            observer: None,
116            alert_at_or_above: IngestionSeverity::Critical,
117            watermark_column: None,
118            watermark_exclusive_above: None,
119            max_rows: None,
120        }
121    }
122}
123
124/// Truncate a dataset to at most `max` rows (no-op when `max` is zero).
125fn truncate_dataset_rows(mut ds: DataSet, max: usize) -> DataSet {
126    if max == 0 {
127        return ds;
128    }
129    if ds.row_count() > max {
130        ds.rows.truncate(max);
131    }
132    ds
133}
134
135fn apply_max_rows(ds: DataSet, max_rows: Option<usize>) -> DataSet {
136    match max_rows {
137        Some(max) if max > 0 => truncate_dataset_rows(ds, max),
138        _ => ds,
139    }
140}
141
142/// Unified ingestion entry point for path-based sources.
143///
144/// - If `options.format` is `None`, format is inferred from the file extension.
145/// - Use `options.excel_sheet_selection` for Excel multi-tab behavior.
146///
147/// When an observer is configured, this function reports:
148///
149/// - `on_success` on success, with row count stats
150/// - `on_failure` on failure, with a computed severity
151/// - `on_alert` on failure when the computed severity is >= `options.alert_at_or_above`
152///
153/// # Examples
154///
155/// ## CSV (auto-detect by extension)
156///
157/// ```no_run
158/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
159/// use rust_data_processing::types::{DataType, Field, Schema};
160///
161/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
162/// let schema = Schema::new(vec![
163///     Field::new("id", DataType::Int64),
164///     Field::new("name", DataType::Utf8),
165/// ]);
166///
167/// // Uses `.csv` to select CSV ingestion.
168/// let ds = ingest_from_path("people.csv", &schema, &IngestionOptions::default())?;
169/// println!("rows={}", ds.row_count());
170/// # Ok(())
171/// # }
172/// ```
173///
174/// ## JSON (auto-detect by extension, with nested field paths)
175///
176/// ```no_run
177/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
178/// use rust_data_processing::types::{DataType, Field, Schema};
179///
180/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
181/// // JSON supports nested field access via dot paths.
182/// let schema = Schema::new(vec![
183///     Field::new("id", DataType::Int64),
184///     Field::new("user.name", DataType::Utf8),
185/// ]);
186///
187/// let ds = ingest_from_path("events.json", &schema, &IngestionOptions::default())?;
188/// println!("rows={}", ds.row_count());
189/// # Ok(())
190/// # }
191/// ```
192///
193/// ## Parquet (auto-detect by extension)
194///
195/// ```no_run
196/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
197/// use rust_data_processing::types::{DataType, Field, Schema};
198///
199/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
200/// let schema = Schema::new(vec![
201///     Field::new("id", DataType::Int64),
202///     Field::new("active", DataType::Bool),
203/// ]);
204///
205/// let ds = ingest_from_path("data.parquet", &schema, &IngestionOptions::default())?;
206/// println!("rows={}", ds.row_count());
207/// # Ok(())
208/// # }
209/// ```
210///
211/// ## Force a format explicitly (override extension inference)
212///
213/// ```no_run
214/// use rust_data_processing::ingestion::{ingest_from_path, IngestionFormat, IngestionOptions};
215/// use rust_data_processing::types::{DataType, Field, Schema};
216///
217/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
218/// let schema = Schema::new(vec![Field::new("id", DataType::Int64)]);
219///
220/// let opts = IngestionOptions {
221///     format: Some(IngestionFormat::Csv),
222///     ..Default::default()
223/// };
224///
225/// // Useful when a file has no extension or you want to override inference.
226/// let ds = ingest_from_path("input_without_extension", &schema, &opts)?;
227/// println!("rows={}", ds.row_count());
228/// # Ok(())
229/// # }
230/// ```
231///
232/// ## Observability (stderr logging + alert threshold)
233///
234/// ```no_run
235/// use std::sync::Arc;
236///
237/// use rust_data_processing::ingestion::{
238///     ingest_from_path, IngestionOptions, IngestionSeverity, StdErrObserver,
239/// };
240/// use rust_data_processing::types::{DataType, Field, Schema};
241///
242/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
243/// let schema = Schema::new(vec![Field::new("id", DataType::Int64)]);
244///
245/// let opts = IngestionOptions {
246///     observer: Some(Arc::new(StdErrObserver::default())),
247///     alert_at_or_above: IngestionSeverity::Critical,
248///     ..Default::default()
249/// };
250///
251/// // Missing files are treated as Critical and will trigger `on_alert` at this threshold.
252/// let _err = ingest_from_path("does_not_exist.csv", &schema, &opts).unwrap_err();
253/// # Ok(())
254/// # }
255/// ```
256///
257/// ## Incremental load (high-watermark on files)
258///
259/// Set [`IngestionOptions::watermark_column`] and [`IngestionOptions::watermark_exclusive_above`]
260/// to keep only rows where the watermark column is **strictly greater** than the floor value.
261/// Nulls in that column are dropped. The same options apply to
262/// [`ingest_from_db`](crate::ingestion::ingest_from_db) when the **`db_connectorx`** Cargo feature
263/// is enabled (filter runs after the query result is loaded).
264///
265/// ```no_run
266/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
267/// use rust_data_processing::types::{DataType, Field, Schema, Value};
268///
269/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
270/// let schema = Schema::new(vec![
271///     Field::new("id", DataType::Int64),
272///     Field::new("ts", DataType::Int64),
273/// ]);
274/// let opts = IngestionOptions {
275///     watermark_column: Some("ts".to_string()),
276///     watermark_exclusive_above: Some(Value::Int64(100)),
277///     ..Default::default()
278/// };
279/// let ds = ingest_from_path("tests/fixtures/watermark_events.csv", &schema, &opts)?;
280/// assert_eq!(ds.row_count(), 2);
281/// # Ok(())
282/// # }
283/// ```
284///
285/// ## Excel
286///
287/// Example. Marked `no_run` so it is **compiled** by doctests
288/// (no "not tested" banner), but not executed (it expects a real `workbook.xlsx` file).
289///
290/// ```no_run
291/// use rust_data_processing::ingestion::{
292///     ingest_from_path, ExcelSheetSelection, IngestionFormat, IngestionOptions,
293/// };
294/// use rust_data_processing::types::{DataType, Field, Schema};
295///
296/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
297/// let schema = Schema::new(vec![
298///     Field::new("id", DataType::Int64),
299///     Field::new("name", DataType::Utf8),
300/// ]);
301///
302/// let opts = IngestionOptions {
303///     format: Some(IngestionFormat::Excel),
304///     excel_sheet_selection: ExcelSheetSelection::Sheet("Sheet1".to_string()),
305///     ..Default::default()
306/// };
307///
308/// let ds = ingest_from_path("workbook.xlsx", &schema, &opts)?;
309/// println!("rows={}", ds.row_count());
310/// # Ok(())
311/// # }
312/// ```
313pub fn ingest_from_path(
314    path: impl AsRef<Path>,
315    schema: &Schema,
316    options: &IngestionOptions,
317) -> IngestionResult<DataSet> {
318    validate_watermark_config(schema, options)?;
319
320    let path = path.as_ref();
321    let fmt = match options.format {
322        Some(f) => f,
323        None => infer_format_from_path(path)?,
324    };
325
326    let ctx = IngestionContext {
327        path: path.to_path_buf(),
328        format: fmt,
329    };
330
331    let result = match fmt {
332        IngestionFormat::Csv => csv::ingest_csv_from_path(path, schema),
333        IngestionFormat::Json => json::ingest_json_from_path(path, schema),
334        IngestionFormat::Parquet => parquet::ingest_parquet_from_path(path, schema),
335        IngestionFormat::Excel => {
336            ingest_excel_dispatch(path, schema, &options.excel_sheet_selection)
337        }
338        IngestionFormat::Xml => xml::ingest_xml_from_path(path, schema),
339    };
340
341    let result = result
342        .and_then(|ds| apply_watermark_after_ingest(ds, schema, options))
343        .map(|ds| apply_max_rows(ds, options.max_rows));
344
345    if let Some(obs) = options.observer.as_ref() {
346        match &result {
347            Ok(ds) => obs.on_success(
348                &ctx,
349                IngestionStats {
350                    rows: ds.row_count(),
351                },
352            ),
353            Err(e) => {
354                let sev = severity_for_error(e);
355                obs.on_failure(&ctx, sev, e);
356                if sev >= options.alert_at_or_above {
357                    obs.on_alert(&ctx, sev, e);
358                }
359            }
360        }
361    }
362
363    result
364}
365
366/// Per-file [`IngestionOptions`] with watermark fields cleared so each file is loaded in full; the
367/// caller applies [`apply_watermark_after_ingest`] once over the concatenated batch.
368fn options_without_watermark(options: &IngestionOptions) -> IngestionOptions {
369    let mut o = options.clone();
370    o.watermark_column = None;
371    o.watermark_exclusive_above = None;
372    o
373}
374
375/// Metadata from [`ingest_from_ordered_paths`]: which paths were read, the last path in that
376/// sequence, and (when incremental watermark options are set) the maximum value in the watermark
377/// column **after** the batch filter — useful for checkpointing the next run.
378#[derive(Debug, Clone, PartialEq)]
379pub struct OrderedBatchIngestMetadata {
380    /// Files ingested, in the order given.
381    pub paths: Vec<PathBuf>,
382    /// Last entry in [`Self::paths`] (typical append-only batch cursor).
383    pub last_path: Option<PathBuf>,
384    /// Maximum of the watermark column over non-null cells in the output dataset, when watermark
385    /// options were set; otherwise `None`.
386    pub max_watermark_value: Option<Value>,
387}
388
389/// Ingest many files **in order**, concatenate rows into one [`DataSet`], then apply the watermark
390/// filter **once** (if configured).
391///
392/// Semantics: the high-water filter applies to the **combined** rows, not per file. For each path,
393/// this calls [`ingest_from_path`] with watermark options temporarily cleared so observers see
394/// full per-file loads; the batch watermark runs after concatenation.
395pub fn ingest_from_ordered_paths<P: AsRef<Path>>(
396    paths: &[P],
397    schema: &Schema,
398    options: &IngestionOptions,
399) -> IngestionResult<(DataSet, OrderedBatchIngestMetadata)> {
400    validate_watermark_config(schema, options)?;
401    if paths.is_empty() {
402        return Err(IngestionError::SchemaMismatch {
403            message: "ingest_from_ordered_paths: empty path list".to_string(),
404        });
405    }
406
407    let path_bufs: Vec<PathBuf> = paths.iter().map(|p| p.as_ref().to_path_buf()).collect();
408    let per_file_opts = options_without_watermark(options);
409
410    let mut all_rows: Vec<Vec<Value>> = Vec::new();
411    for p in &path_bufs {
412        let ds = ingest_from_path(p, schema, &per_file_opts)?;
413        all_rows.extend(ds.rows);
414    }
415
416    let mut ds = DataSet::new(schema.clone(), all_rows);
417    ds = apply_watermark_after_ingest(ds, schema, options)?;
418    ds = apply_max_rows(ds, options.max_rows);
419
420    let max_watermark_value = match &options.watermark_column {
421        Some(col) => max_value_in_column(&ds, schema, col),
422        None => None,
423    };
424
425    let last_path = path_bufs.last().cloned();
426    let meta = OrderedBatchIngestMetadata {
427        paths: path_bufs,
428        last_path,
429        max_watermark_value,
430    };
431
432    Ok((ds, meta))
433}
434
435/// Infer a [`Schema`] for an input file.
436///
437/// This is intended for quick exploration and benchmarking when callers don't have a schema yet.
438/// It uses a **best-effort** mapping into `DataType::{Int64, Float64, Bool, Utf8}`.
439///
440/// Notes:
441/// - For JSON, nested fields are inferred only at the **top level** (no dot-path expansion).
442/// - For Excel, inference uses a scan-based heuristic.
443pub fn infer_schema_from_path(
444    path: impl AsRef<Path>,
445    options: &IngestionOptions,
446) -> IngestionResult<Schema> {
447    let path = path.as_ref();
448    let fmt = match options.format {
449        Some(f) => f,
450        None => infer_format_from_path(path)?,
451    };
452
453    match fmt {
454        IngestionFormat::Csv => {
455            let df = LazyCsvReader::new(path.to_string_lossy().as_ref().into())
456                .with_has_header(true)
457                .finish()
458                .map_err(|e| polars_error_to_ingestion("failed to read csv with polars", e))?
459                .collect()
460                .map_err(|e| polars_error_to_ingestion("failed to collect csv with polars", e))?;
461            infer_schema_from_dataframe_lossy(&df)
462        }
463        IngestionFormat::Json => {
464            use std::fs::File;
465
466            let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
467            let json_format = if ext.eq_ignore_ascii_case("ndjson") {
468                JsonFormat::JsonLines
469            } else {
470                JsonFormat::Json
471            };
472
473            let file = File::open(path)?;
474            let df = JsonReader::new(file)
475                .with_json_format(json_format)
476                .finish()
477                .map_err(|e| polars_error_to_ingestion("failed to read json with polars", e))?;
478            infer_schema_from_dataframe_lossy(&df)
479        }
480        IngestionFormat::Parquet => {
481            let df = LazyFrame::scan_parquet(
482                path.to_string_lossy().as_ref().into(),
483                ScanArgsParquet::default(),
484            )
485            .map_err(|e| polars_error_to_ingestion("failed to read parquet with polars", e))?
486            .collect()
487            .map_err(|e| polars_error_to_ingestion("failed to collect parquet with polars", e))?;
488            infer_schema_from_dataframe_lossy(&df)
489        }
490        IngestionFormat::Excel => infer_excel_schema_dispatch(path, &options.excel_sheet_selection),
491        IngestionFormat::Xml => Err(IngestionError::SchemaMismatch {
492            message: "infer_schema_from_path is not supported for XML; provide an explicit Schema"
493                .to_string(),
494        }),
495    }
496}
497
498/// Convenience wrapper: infer schema and then ingest.
499pub fn ingest_from_path_infer(
500    path: impl AsRef<Path>,
501    options: &IngestionOptions,
502) -> IngestionResult<DataSet> {
503    let schema = infer_schema_from_path(path.as_ref(), options)?;
504    ingest_from_path(path, &schema, options)
505}
506
507fn severity_for_error(e: &IngestionError) -> IngestionSeverity {
508    match e {
509        IngestionError::Io(_) => IngestionSeverity::Critical,
510        IngestionError::Parquet(err) => {
511            // Best-effort: parquet errors often wrap IO, but not always in a structured way.
512            // If we can detect IO in the source chain, treat it as Critical.
513            if error_chain_contains_io(err) {
514                IngestionSeverity::Critical
515            } else {
516                IngestionSeverity::Error
517            }
518        }
519        IngestionError::Csv(err) => match err.kind() {
520            ::csv::ErrorKind::Io(_) => IngestionSeverity::Critical,
521            _ => IngestionSeverity::Error,
522        },
523        #[cfg(feature = "excel")]
524        IngestionError::Excel(_) => IngestionSeverity::Error,
525        IngestionError::Engine { source, .. } => {
526            if error_chain_contains_io(source.as_ref()) {
527                IngestionSeverity::Critical
528            } else {
529                IngestionSeverity::Error
530            }
531        }
532        IngestionError::SchemaMismatch { .. } => IngestionSeverity::Error,
533        IngestionError::ParseError { .. } => IngestionSeverity::Error,
534    }
535}
536
537fn error_chain_contains_io(e: &(dyn StdError + 'static)) -> bool {
538    let mut cur: Option<&(dyn StdError + 'static)> = Some(e);
539    while let Some(err) = cur {
540        if err.is::<std::io::Error>() {
541            return true;
542        }
543        cur = err.source();
544    }
545    false
546}
547
548fn infer_format_from_path(path: &Path) -> IngestionResult<IngestionFormat> {
549    let ext = path.extension().and_then(|s| s.to_str()).ok_or_else(|| {
550        IngestionError::SchemaMismatch {
551            message: format!(
552                "cannot infer format: path has no extension ({})",
553                path.display()
554            ),
555        }
556    })?;
557
558    IngestionFormat::from_extension(ext).ok_or_else(|| IngestionError::SchemaMismatch {
559        message: format!(
560            "cannot infer format from extension '{ext}' for path ({})",
561            path.display()
562        ),
563    })
564}
565
566fn ingest_excel_dispatch(
567    path: &Path,
568    schema: &Schema,
569    sel: &ExcelSheetSelection,
570) -> IngestionResult<DataSet> {
571    match sel {
572        ExcelSheetSelection::First => excel::ingest_excel_from_path(path, None, schema),
573        ExcelSheetSelection::Sheet(name) => {
574            excel::ingest_excel_from_path(path, Some(name.as_str()), schema)
575        }
576        ExcelSheetSelection::AllSheets => {
577            excel::ingest_excel_workbook_from_path(path, None, schema)
578        }
579        ExcelSheetSelection::Sheets(names) => {
580            let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
581            excel::ingest_excel_workbook_from_path(path, Some(refs.as_slice()), schema)
582        }
583    }
584}
585
586fn infer_excel_schema_dispatch(path: &Path, sel: &ExcelSheetSelection) -> IngestionResult<Schema> {
587    match sel {
588        ExcelSheetSelection::First => excel::infer_excel_schema_from_path(path, None),
589        ExcelSheetSelection::Sheet(name) => {
590            excel::infer_excel_schema_from_path(path, Some(name.as_str()))
591        }
592        ExcelSheetSelection::AllSheets => excel::infer_excel_schema_workbook_from_path(path, None),
593        ExcelSheetSelection::Sheets(names) => {
594            let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
595            excel::infer_excel_schema_workbook_from_path(path, Some(refs.as_slice()))
596        }
597    }
598}
599
600/// Convenience helper for callers that want an owned request object.
601///
602/// This can be useful if you want to enqueue ingestion work in a job system.
603#[derive(Clone)]
604pub struct IngestionRequest {
605    /// Path to the input file.
606    pub path: PathBuf,
607    /// Schema to validate/parse values into.
608    pub schema: Schema,
609    /// Options controlling ingestion.
610    pub options: IngestionOptions,
611}
612
613impl fmt::Debug for IngestionRequest {
614    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
615        f.debug_struct("IngestionRequest")
616            .field("path", &self.path)
617            .field("schema_fields", &self.schema.fields.len())
618            .field("options", &self.options)
619            .finish()
620    }
621}
622
623impl IngestionRequest {
624    /// Execute the request by calling [`ingest_from_path`].
625    pub fn run(&self) -> IngestionResult<DataSet> {
626        ingest_from_path(&self.path, &self.schema, &self.options)
627    }
628}
629
630/// Write an in-memory [`DataSet`] to a single Parquet file using Polars.
631///
632/// Intended for JVM / Spark bridges that prefer reading Parquet from disk over large JSON payloads.
633pub fn export_dataset_to_xml(path: &Path, ds: &DataSet) -> IngestionResult<()> {
634    xml::export_dataset_to_xml(path, ds)
635}
636
637/// Write an in-memory [`DataSet`] to a single Parquet file using Polars.
638pub fn export_dataset_to_parquet(path: &Path, ds: &DataSet) -> IngestionResult<()> {
639    use super::polars_bridge::{dataset_to_dataframe, polars_error_to_ingestion};
640    use std::fs::File;
641
642    let mut df = dataset_to_dataframe(ds)?;
643    let mut file = File::create(path)?;
644    ParquetWriter::new(&mut file)
645        .finish(&mut df)
646        .map_err(|e| polars_error_to_ingestion("write parquet from DataSet", e))?;
647    Ok(())
648}
649
650/// Write an in-memory [`DataSet`] to a single Arrow IPC **file** using Polars (for JVM / Arrow
651/// bridges that prefer IPC over large JSON `DataSet` payloads).
652pub fn export_dataset_to_arrow_ipc(path: &Path, ds: &DataSet) -> IngestionResult<()> {
653    use super::polars_bridge::{dataset_to_dataframe, polars_error_to_ingestion};
654    use std::fs::File;
655
656    let mut df = dataset_to_dataframe(ds)?;
657    let mut file = File::create(path)?;
658    IpcWriter::new(&mut file)
659        .finish(&mut df)
660        .map_err(|e| polars_error_to_ingestion("write arrow ipc from DataSet", e))?;
661    Ok(())
662}