rust_data_processing/ingestion/unified.rs
1//! Unified ingestion entrypoint.
2//!
3//! Most callers should use [`ingest_from_path`], which ingests a file into an in-memory
4//! [`crate::types::DataSet`] using a provided [`crate::types::Schema`].
5//!
6//! - If [`IngestionOptions::format`] is `None`, the ingestion format is inferred from the file
7//! extension.
8//! - If an [`super::observability::IngestionObserver`] is provided, success/failure/alerts are
9//! reported to it.
10
11use std::error::Error as StdError;
12use std::fmt;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15
16use crate::error::{IngestionError, IngestionResult};
17use crate::types::{DataSet, Schema, Value};
18
19use super::observability::{
20 IngestionContext, IngestionObserver, IngestionSeverity, IngestionStats,
21};
22use super::polars_bridge::{infer_schema_from_dataframe_lossy, polars_error_to_ingestion};
23use super::watermark::{
24 apply_watermark_after_ingest, max_value_in_column, validate_watermark_config,
25};
26use super::{csv, excel, json, parquet, xml};
27use polars::prelude::*;
28
29/// Supported ingestion formats.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum IngestionFormat {
32 /// Comma-separated values.
33 Csv,
34 /// JSON array-of-objects or NDJSON.
35 Json,
36 /// Apache Parquet.
37 Parquet,
38 /// Spreadsheet/workbook formats (feature-gated behind `excel`).
39 Excel,
40 /// Row-oriented `<rdp_records>` XML (see [`super::xml`]).
41 Xml,
42}
43
44impl IngestionFormat {
45 /// Parse an ingestion format from a file extension (case-insensitive).
46 pub fn from_extension(ext: &str) -> Option<Self> {
47 match ext.to_ascii_lowercase().as_str() {
48 "csv" => Some(Self::Csv),
49 "json" | "ndjson" => Some(Self::Json),
50 "parquet" | "pq" => Some(Self::Parquet),
51 "xlsx" | "xls" | "xlsm" | "xlsb" | "ods" => Some(Self::Excel),
52 "xml" => Some(Self::Xml),
53 _ => None,
54 }
55 }
56}
57
58/// How to choose sheet(s) when ingesting an Excel workbook.
59#[derive(Debug, Clone, PartialEq, Eq, Default)]
60pub enum ExcelSheetSelection {
61 /// Ingest the first sheet (default).
62 #[default]
63 First,
64 /// Ingest a single named sheet.
65 Sheet(String),
66 /// Ingest all sheets and concatenate rows.
67 AllSheets,
68 /// Ingest only the listed sheets (in order) and concatenate rows.
69 Sheets(Vec<String>),
70}
71
72/// Options controlling unified ingestion behavior.
73///
74/// Use [`Default`] for common cases.
75#[derive(Clone)]
76pub struct IngestionOptions {
77 /// If `None`, auto-detect format from file extension.
78 pub format: Option<IngestionFormat>,
79 /// Excel-specific options.
80 pub excel_sheet_selection: ExcelSheetSelection,
81 /// Optional observer for logging/alerts.
82 pub observer: Option<Arc<dyn IngestionObserver>>,
83 /// Severity threshold at which `on_alert` is invoked.
84 pub alert_at_or_above: IngestionSeverity,
85 /// Column name for incremental / high-water filtering (must be used with
86 /// [`Self::watermark_exclusive_above`]).
87 pub watermark_column: Option<String>,
88 /// Keep only rows where `watermark_column` is **strictly greater than** this value (same
89 /// [`crate::types::DataType`] as the column). Applied after ingest for file and DB sources.
90 pub watermark_exclusive_above: Option<Value>,
91 /// After ingest (and watermark filter), keep at most this many rows from the start of the
92 /// dataset. **`None`** = no limit. Pipeline `sources.options.max_rows` maps here.
93 pub max_rows: Option<usize>,
94}
95
96impl fmt::Debug for IngestionOptions {
97 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98 f.debug_struct("IngestionOptions")
99 .field("format", &self.format)
100 .field("excel_sheet_selection", &self.excel_sheet_selection)
101 .field("observer_set", &self.observer.is_some())
102 .field("alert_at_or_above", &self.alert_at_or_above)
103 .field("watermark_column", &self.watermark_column)
104 .field("watermark_exclusive_above", &self.watermark_exclusive_above)
105 .field("max_rows", &self.max_rows)
106 .finish()
107 }
108}
109
110impl Default for IngestionOptions {
111 fn default() -> Self {
112 Self {
113 format: None,
114 excel_sheet_selection: ExcelSheetSelection::default(),
115 observer: None,
116 alert_at_or_above: IngestionSeverity::Critical,
117 watermark_column: None,
118 watermark_exclusive_above: None,
119 max_rows: None,
120 }
121 }
122}
123
124/// Truncate a dataset to at most `max` rows (no-op when `max` is zero).
125fn truncate_dataset_rows(mut ds: DataSet, max: usize) -> DataSet {
126 if max == 0 {
127 return ds;
128 }
129 if ds.row_count() > max {
130 ds.rows.truncate(max);
131 }
132 ds
133}
134
135fn apply_max_rows(ds: DataSet, max_rows: Option<usize>) -> DataSet {
136 match max_rows {
137 Some(max) if max > 0 => truncate_dataset_rows(ds, max),
138 _ => ds,
139 }
140}
141
142/// Unified ingestion entry point for path-based sources.
143///
144/// - If `options.format` is `None`, format is inferred from the file extension.
145/// - Use `options.excel_sheet_selection` for Excel multi-tab behavior.
146///
147/// When an observer is configured, this function reports:
148///
149/// - `on_success` on success, with row count stats
150/// - `on_failure` on failure, with a computed severity
151/// - `on_alert` on failure when the computed severity is >= `options.alert_at_or_above`
152///
153/// # Examples
154///
155/// ## CSV (auto-detect by extension)
156///
157/// ```no_run
158/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
159/// use rust_data_processing::types::{DataType, Field, Schema};
160///
161/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
162/// let schema = Schema::new(vec![
163/// Field::new("id", DataType::Int64),
164/// Field::new("name", DataType::Utf8),
165/// ]);
166///
167/// // Uses `.csv` to select CSV ingestion.
168/// let ds = ingest_from_path("people.csv", &schema, &IngestionOptions::default())?;
169/// println!("rows={}", ds.row_count());
170/// # Ok(())
171/// # }
172/// ```
173///
174/// ## JSON (auto-detect by extension, with nested field paths)
175///
176/// ```no_run
177/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
178/// use rust_data_processing::types::{DataType, Field, Schema};
179///
180/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
181/// // JSON supports nested field access via dot paths.
182/// let schema = Schema::new(vec![
183/// Field::new("id", DataType::Int64),
184/// Field::new("user.name", DataType::Utf8),
185/// ]);
186///
187/// let ds = ingest_from_path("events.json", &schema, &IngestionOptions::default())?;
188/// println!("rows={}", ds.row_count());
189/// # Ok(())
190/// # }
191/// ```
192///
193/// ## Parquet (auto-detect by extension)
194///
195/// ```no_run
196/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
197/// use rust_data_processing::types::{DataType, Field, Schema};
198///
199/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
200/// let schema = Schema::new(vec![
201/// Field::new("id", DataType::Int64),
202/// Field::new("active", DataType::Bool),
203/// ]);
204///
205/// let ds = ingest_from_path("data.parquet", &schema, &IngestionOptions::default())?;
206/// println!("rows={}", ds.row_count());
207/// # Ok(())
208/// # }
209/// ```
210///
211/// ## Force a format explicitly (override extension inference)
212///
213/// ```no_run
214/// use rust_data_processing::ingestion::{ingest_from_path, IngestionFormat, IngestionOptions};
215/// use rust_data_processing::types::{DataType, Field, Schema};
216///
217/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
218/// let schema = Schema::new(vec![Field::new("id", DataType::Int64)]);
219///
220/// let opts = IngestionOptions {
221/// format: Some(IngestionFormat::Csv),
222/// ..Default::default()
223/// };
224///
225/// // Useful when a file has no extension or you want to override inference.
226/// let ds = ingest_from_path("input_without_extension", &schema, &opts)?;
227/// println!("rows={}", ds.row_count());
228/// # Ok(())
229/// # }
230/// ```
231///
232/// ## Observability (stderr logging + alert threshold)
233///
234/// ```no_run
235/// use std::sync::Arc;
236///
237/// use rust_data_processing::ingestion::{
238/// ingest_from_path, IngestionOptions, IngestionSeverity, StdErrObserver,
239/// };
240/// use rust_data_processing::types::{DataType, Field, Schema};
241///
242/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
243/// let schema = Schema::new(vec![Field::new("id", DataType::Int64)]);
244///
245/// let opts = IngestionOptions {
246/// observer: Some(Arc::new(StdErrObserver::default())),
247/// alert_at_or_above: IngestionSeverity::Critical,
248/// ..Default::default()
249/// };
250///
251/// // Missing files are treated as Critical and will trigger `on_alert` at this threshold.
252/// let _err = ingest_from_path("does_not_exist.csv", &schema, &opts).unwrap_err();
253/// # Ok(())
254/// # }
255/// ```
256///
257/// ## Incremental load (high-watermark on files)
258///
259/// Set [`IngestionOptions::watermark_column`] and [`IngestionOptions::watermark_exclusive_above`]
260/// to keep only rows where the watermark column is **strictly greater** than the floor value.
261/// Nulls in that column are dropped. The same options apply to
262/// [`ingest_from_db`](crate::ingestion::ingest_from_db) when the **`db_connectorx`** Cargo feature
263/// is enabled (filter runs after the query result is loaded).
264///
265/// ```no_run
266/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
267/// use rust_data_processing::types::{DataType, Field, Schema, Value};
268///
269/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
270/// let schema = Schema::new(vec![
271/// Field::new("id", DataType::Int64),
272/// Field::new("ts", DataType::Int64),
273/// ]);
274/// let opts = IngestionOptions {
275/// watermark_column: Some("ts".to_string()),
276/// watermark_exclusive_above: Some(Value::Int64(100)),
277/// ..Default::default()
278/// };
279/// let ds = ingest_from_path("tests/fixtures/watermark_events.csv", &schema, &opts)?;
280/// assert_eq!(ds.row_count(), 2);
281/// # Ok(())
282/// # }
283/// ```
284///
285/// ## Excel
286///
287/// Example. Marked `no_run` so it is **compiled** by doctests
288/// (no "not tested" banner), but not executed (it expects a real `workbook.xlsx` file).
289///
290/// ```no_run
291/// use rust_data_processing::ingestion::{
292/// ingest_from_path, ExcelSheetSelection, IngestionFormat, IngestionOptions,
293/// };
294/// use rust_data_processing::types::{DataType, Field, Schema};
295///
296/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
297/// let schema = Schema::new(vec![
298/// Field::new("id", DataType::Int64),
299/// Field::new("name", DataType::Utf8),
300/// ]);
301///
302/// let opts = IngestionOptions {
303/// format: Some(IngestionFormat::Excel),
304/// excel_sheet_selection: ExcelSheetSelection::Sheet("Sheet1".to_string()),
305/// ..Default::default()
306/// };
307///
308/// let ds = ingest_from_path("workbook.xlsx", &schema, &opts)?;
309/// println!("rows={}", ds.row_count());
310/// # Ok(())
311/// # }
312/// ```
313pub fn ingest_from_path(
314 path: impl AsRef<Path>,
315 schema: &Schema,
316 options: &IngestionOptions,
317) -> IngestionResult<DataSet> {
318 validate_watermark_config(schema, options)?;
319
320 let path = path.as_ref();
321 let fmt = match options.format {
322 Some(f) => f,
323 None => infer_format_from_path(path)?,
324 };
325
326 let ctx = IngestionContext {
327 path: path.to_path_buf(),
328 format: fmt,
329 };
330
331 let result = match fmt {
332 IngestionFormat::Csv => csv::ingest_csv_from_path(path, schema),
333 IngestionFormat::Json => json::ingest_json_from_path(path, schema),
334 IngestionFormat::Parquet => parquet::ingest_parquet_from_path(path, schema),
335 IngestionFormat::Excel => {
336 ingest_excel_dispatch(path, schema, &options.excel_sheet_selection)
337 }
338 IngestionFormat::Xml => xml::ingest_xml_from_path(path, schema),
339 };
340
341 let result = result
342 .and_then(|ds| apply_watermark_after_ingest(ds, schema, options))
343 .map(|ds| apply_max_rows(ds, options.max_rows));
344
345 if let Some(obs) = options.observer.as_ref() {
346 match &result {
347 Ok(ds) => obs.on_success(
348 &ctx,
349 IngestionStats {
350 rows: ds.row_count(),
351 },
352 ),
353 Err(e) => {
354 let sev = severity_for_error(e);
355 obs.on_failure(&ctx, sev, e);
356 if sev >= options.alert_at_or_above {
357 obs.on_alert(&ctx, sev, e);
358 }
359 }
360 }
361 }
362
363 result
364}
365
366/// Per-file [`IngestionOptions`] with watermark fields cleared so each file is loaded in full; the
367/// caller applies [`apply_watermark_after_ingest`] once over the concatenated batch.
368fn options_without_watermark(options: &IngestionOptions) -> IngestionOptions {
369 let mut o = options.clone();
370 o.watermark_column = None;
371 o.watermark_exclusive_above = None;
372 o
373}
374
375/// Metadata from [`ingest_from_ordered_paths`]: which paths were read, the last path in that
376/// sequence, and (when incremental watermark options are set) the maximum value in the watermark
377/// column **after** the batch filter — useful for checkpointing the next run.
378#[derive(Debug, Clone, PartialEq)]
379pub struct OrderedBatchIngestMetadata {
380 /// Files ingested, in the order given.
381 pub paths: Vec<PathBuf>,
382 /// Last entry in [`Self::paths`] (typical append-only batch cursor).
383 pub last_path: Option<PathBuf>,
384 /// Maximum of the watermark column over non-null cells in the output dataset, when watermark
385 /// options were set; otherwise `None`.
386 pub max_watermark_value: Option<Value>,
387}
388
389/// Ingest many files **in order**, concatenate rows into one [`DataSet`], then apply the watermark
390/// filter **once** (if configured).
391///
392/// Semantics: the high-water filter applies to the **combined** rows, not per file. For each path,
393/// this calls [`ingest_from_path`] with watermark options temporarily cleared so observers see
394/// full per-file loads; the batch watermark runs after concatenation.
395pub fn ingest_from_ordered_paths<P: AsRef<Path>>(
396 paths: &[P],
397 schema: &Schema,
398 options: &IngestionOptions,
399) -> IngestionResult<(DataSet, OrderedBatchIngestMetadata)> {
400 validate_watermark_config(schema, options)?;
401 if paths.is_empty() {
402 return Err(IngestionError::SchemaMismatch {
403 message: "ingest_from_ordered_paths: empty path list".to_string(),
404 });
405 }
406
407 let path_bufs: Vec<PathBuf> = paths.iter().map(|p| p.as_ref().to_path_buf()).collect();
408 let per_file_opts = options_without_watermark(options);
409
410 let mut all_rows: Vec<Vec<Value>> = Vec::new();
411 for p in &path_bufs {
412 let ds = ingest_from_path(p, schema, &per_file_opts)?;
413 all_rows.extend(ds.rows);
414 }
415
416 let mut ds = DataSet::new(schema.clone(), all_rows);
417 ds = apply_watermark_after_ingest(ds, schema, options)?;
418 ds = apply_max_rows(ds, options.max_rows);
419
420 let max_watermark_value = match &options.watermark_column {
421 Some(col) => max_value_in_column(&ds, schema, col),
422 None => None,
423 };
424
425 let last_path = path_bufs.last().cloned();
426 let meta = OrderedBatchIngestMetadata {
427 paths: path_bufs,
428 last_path,
429 max_watermark_value,
430 };
431
432 Ok((ds, meta))
433}
434
435/// Infer a [`Schema`] for an input file.
436///
437/// This is intended for quick exploration and benchmarking when callers don't have a schema yet.
438/// It uses a **best-effort** mapping into `DataType::{Int64, Float64, Bool, Utf8}`.
439///
440/// Notes:
441/// - For JSON, nested fields are inferred only at the **top level** (no dot-path expansion).
442/// - For Excel, inference uses a scan-based heuristic.
443pub fn infer_schema_from_path(
444 path: impl AsRef<Path>,
445 options: &IngestionOptions,
446) -> IngestionResult<Schema> {
447 let path = path.as_ref();
448 let fmt = match options.format {
449 Some(f) => f,
450 None => infer_format_from_path(path)?,
451 };
452
453 match fmt {
454 IngestionFormat::Csv => {
455 let df = LazyCsvReader::new(path.to_string_lossy().as_ref().into())
456 .with_has_header(true)
457 .finish()
458 .map_err(|e| polars_error_to_ingestion("failed to read csv with polars", e))?
459 .collect()
460 .map_err(|e| polars_error_to_ingestion("failed to collect csv with polars", e))?;
461 infer_schema_from_dataframe_lossy(&df)
462 }
463 IngestionFormat::Json => {
464 use std::fs::File;
465
466 let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
467 let json_format = if ext.eq_ignore_ascii_case("ndjson") {
468 JsonFormat::JsonLines
469 } else {
470 JsonFormat::Json
471 };
472
473 let file = File::open(path)?;
474 let df = JsonReader::new(file)
475 .with_json_format(json_format)
476 .finish()
477 .map_err(|e| polars_error_to_ingestion("failed to read json with polars", e))?;
478 infer_schema_from_dataframe_lossy(&df)
479 }
480 IngestionFormat::Parquet => {
481 let df = LazyFrame::scan_parquet(
482 path.to_string_lossy().as_ref().into(),
483 ScanArgsParquet::default(),
484 )
485 .map_err(|e| polars_error_to_ingestion("failed to read parquet with polars", e))?
486 .collect()
487 .map_err(|e| polars_error_to_ingestion("failed to collect parquet with polars", e))?;
488 infer_schema_from_dataframe_lossy(&df)
489 }
490 IngestionFormat::Excel => infer_excel_schema_dispatch(path, &options.excel_sheet_selection),
491 IngestionFormat::Xml => Err(IngestionError::SchemaMismatch {
492 message: "infer_schema_from_path is not supported for XML; provide an explicit Schema"
493 .to_string(),
494 }),
495 }
496}
497
498/// Convenience wrapper: infer schema and then ingest.
499pub fn ingest_from_path_infer(
500 path: impl AsRef<Path>,
501 options: &IngestionOptions,
502) -> IngestionResult<DataSet> {
503 let schema = infer_schema_from_path(path.as_ref(), options)?;
504 ingest_from_path(path, &schema, options)
505}
506
507fn severity_for_error(e: &IngestionError) -> IngestionSeverity {
508 match e {
509 IngestionError::Io(_) => IngestionSeverity::Critical,
510 IngestionError::Parquet(err) => {
511 // Best-effort: parquet errors often wrap IO, but not always in a structured way.
512 // If we can detect IO in the source chain, treat it as Critical.
513 if error_chain_contains_io(err) {
514 IngestionSeverity::Critical
515 } else {
516 IngestionSeverity::Error
517 }
518 }
519 IngestionError::Csv(err) => match err.kind() {
520 ::csv::ErrorKind::Io(_) => IngestionSeverity::Critical,
521 _ => IngestionSeverity::Error,
522 },
523 #[cfg(feature = "excel")]
524 IngestionError::Excel(_) => IngestionSeverity::Error,
525 IngestionError::Engine { source, .. } => {
526 if error_chain_contains_io(source.as_ref()) {
527 IngestionSeverity::Critical
528 } else {
529 IngestionSeverity::Error
530 }
531 }
532 IngestionError::SchemaMismatch { .. } => IngestionSeverity::Error,
533 IngestionError::ParseError { .. } => IngestionSeverity::Error,
534 }
535}
536
537fn error_chain_contains_io(e: &(dyn StdError + 'static)) -> bool {
538 let mut cur: Option<&(dyn StdError + 'static)> = Some(e);
539 while let Some(err) = cur {
540 if err.is::<std::io::Error>() {
541 return true;
542 }
543 cur = err.source();
544 }
545 false
546}
547
548fn infer_format_from_path(path: &Path) -> IngestionResult<IngestionFormat> {
549 let ext = path.extension().and_then(|s| s.to_str()).ok_or_else(|| {
550 IngestionError::SchemaMismatch {
551 message: format!(
552 "cannot infer format: path has no extension ({})",
553 path.display()
554 ),
555 }
556 })?;
557
558 IngestionFormat::from_extension(ext).ok_or_else(|| IngestionError::SchemaMismatch {
559 message: format!(
560 "cannot infer format from extension '{ext}' for path ({})",
561 path.display()
562 ),
563 })
564}
565
566fn ingest_excel_dispatch(
567 path: &Path,
568 schema: &Schema,
569 sel: &ExcelSheetSelection,
570) -> IngestionResult<DataSet> {
571 match sel {
572 ExcelSheetSelection::First => excel::ingest_excel_from_path(path, None, schema),
573 ExcelSheetSelection::Sheet(name) => {
574 excel::ingest_excel_from_path(path, Some(name.as_str()), schema)
575 }
576 ExcelSheetSelection::AllSheets => {
577 excel::ingest_excel_workbook_from_path(path, None, schema)
578 }
579 ExcelSheetSelection::Sheets(names) => {
580 let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
581 excel::ingest_excel_workbook_from_path(path, Some(refs.as_slice()), schema)
582 }
583 }
584}
585
586fn infer_excel_schema_dispatch(path: &Path, sel: &ExcelSheetSelection) -> IngestionResult<Schema> {
587 match sel {
588 ExcelSheetSelection::First => excel::infer_excel_schema_from_path(path, None),
589 ExcelSheetSelection::Sheet(name) => {
590 excel::infer_excel_schema_from_path(path, Some(name.as_str()))
591 }
592 ExcelSheetSelection::AllSheets => excel::infer_excel_schema_workbook_from_path(path, None),
593 ExcelSheetSelection::Sheets(names) => {
594 let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
595 excel::infer_excel_schema_workbook_from_path(path, Some(refs.as_slice()))
596 }
597 }
598}
599
600/// Convenience helper for callers that want an owned request object.
601///
602/// This can be useful if you want to enqueue ingestion work in a job system.
603#[derive(Clone)]
604pub struct IngestionRequest {
605 /// Path to the input file.
606 pub path: PathBuf,
607 /// Schema to validate/parse values into.
608 pub schema: Schema,
609 /// Options controlling ingestion.
610 pub options: IngestionOptions,
611}
612
613impl fmt::Debug for IngestionRequest {
614 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
615 f.debug_struct("IngestionRequest")
616 .field("path", &self.path)
617 .field("schema_fields", &self.schema.fields.len())
618 .field("options", &self.options)
619 .finish()
620 }
621}
622
623impl IngestionRequest {
624 /// Execute the request by calling [`ingest_from_path`].
625 pub fn run(&self) -> IngestionResult<DataSet> {
626 ingest_from_path(&self.path, &self.schema, &self.options)
627 }
628}
629
630/// Write an in-memory [`DataSet`] to a single Parquet file using Polars.
631///
632/// Intended for JVM / Spark bridges that prefer reading Parquet from disk over large JSON payloads.
633pub fn export_dataset_to_xml(path: &Path, ds: &DataSet) -> IngestionResult<()> {
634 xml::export_dataset_to_xml(path, ds)
635}
636
637/// Write an in-memory [`DataSet`] to a single Parquet file using Polars.
638pub fn export_dataset_to_parquet(path: &Path, ds: &DataSet) -> IngestionResult<()> {
639 use super::polars_bridge::{dataset_to_dataframe, polars_error_to_ingestion};
640 use std::fs::File;
641
642 let mut df = dataset_to_dataframe(ds)?;
643 let mut file = File::create(path)?;
644 ParquetWriter::new(&mut file)
645 .finish(&mut df)
646 .map_err(|e| polars_error_to_ingestion("write parquet from DataSet", e))?;
647 Ok(())
648}
649
650/// Write an in-memory [`DataSet`] to a single Arrow IPC **file** using Polars (for JVM / Arrow
651/// bridges that prefer IPC over large JSON `DataSet` payloads).
652pub fn export_dataset_to_arrow_ipc(path: &Path, ds: &DataSet) -> IngestionResult<()> {
653 use super::polars_bridge::{dataset_to_dataframe, polars_error_to_ingestion};
654 use std::fs::File;
655
656 let mut df = dataset_to_dataframe(ds)?;
657 let mut file = File::create(path)?;
658 IpcWriter::new(&mut file)
659 .finish(&mut df)
660 .map_err(|e| polars_error_to_ingestion("write arrow ipc from DataSet", e))?;
661 Ok(())
662}