rust_data_processing/ingestion/unified.rs
1//! Unified ingestion entrypoint.
2//!
3//! Most callers should use [`ingest_from_path`], which ingests a file into an in-memory
4//! [`crate::types::DataSet`] using a provided [`crate::types::Schema`].
5//!
6//! - If [`IngestionOptions::format`] is `None`, the ingestion format is inferred from the file
7//! extension.
8//! - If an [`super::observability::IngestionObserver`] is provided, success/failure/alerts are
9//! reported to it.
10
11use std::error::Error as StdError;
12use std::fmt;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15
16use crate::error::{IngestionError, IngestionResult};
17use crate::types::{DataSet, Schema, Value};
18
19use super::observability::{
20 IngestionContext, IngestionObserver, IngestionSeverity, IngestionStats,
21};
22use super::polars_bridge::{infer_schema_from_dataframe_lossy, polars_error_to_ingestion};
23use super::watermark::{
24 apply_watermark_after_ingest, max_value_in_column, validate_watermark_config,
25};
26use super::{csv, excel, json, parquet};
27use polars::prelude::*;
28
29/// Supported ingestion formats.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum IngestionFormat {
32 /// Comma-separated values.
33 Csv,
34 /// JSON array-of-objects or NDJSON.
35 Json,
36 /// Apache Parquet.
37 Parquet,
38 /// Spreadsheet/workbook formats (feature-gated behind `excel`).
39 Excel,
40}
41
42impl IngestionFormat {
43 /// Parse an ingestion format from a file extension (case-insensitive).
44 pub fn from_extension(ext: &str) -> Option<Self> {
45 match ext.to_ascii_lowercase().as_str() {
46 "csv" => Some(Self::Csv),
47 "json" | "ndjson" => Some(Self::Json),
48 "parquet" | "pq" => Some(Self::Parquet),
49 "xlsx" | "xls" | "xlsm" | "xlsb" | "ods" => Some(Self::Excel),
50 _ => None,
51 }
52 }
53}
54
55/// How to choose sheet(s) when ingesting an Excel workbook.
56#[derive(Debug, Clone, PartialEq, Eq, Default)]
57pub enum ExcelSheetSelection {
58 /// Ingest the first sheet (default).
59 #[default]
60 First,
61 /// Ingest a single named sheet.
62 Sheet(String),
63 /// Ingest all sheets and concatenate rows.
64 AllSheets,
65 /// Ingest only the listed sheets (in order) and concatenate rows.
66 Sheets(Vec<String>),
67}
68
69/// Options controlling unified ingestion behavior.
70///
71/// Use [`Default`] for common cases.
72#[derive(Clone)]
73pub struct IngestionOptions {
74 /// If `None`, auto-detect format from file extension.
75 pub format: Option<IngestionFormat>,
76 /// Excel-specific options.
77 pub excel_sheet_selection: ExcelSheetSelection,
78 /// Optional observer for logging/alerts.
79 pub observer: Option<Arc<dyn IngestionObserver>>,
80 /// Severity threshold at which `on_alert` is invoked.
81 pub alert_at_or_above: IngestionSeverity,
82 /// Column name for incremental / high-water filtering (must be used with
83 /// [`Self::watermark_exclusive_above`]).
84 pub watermark_column: Option<String>,
85 /// Keep only rows where `watermark_column` is **strictly greater than** this value (same
86 /// [`crate::types::DataType`] as the column). Applied after ingest for file and DB sources.
87 pub watermark_exclusive_above: Option<Value>,
88}
89
90impl fmt::Debug for IngestionOptions {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 f.debug_struct("IngestionOptions")
93 .field("format", &self.format)
94 .field("excel_sheet_selection", &self.excel_sheet_selection)
95 .field("observer_set", &self.observer.is_some())
96 .field("alert_at_or_above", &self.alert_at_or_above)
97 .field("watermark_column", &self.watermark_column)
98 .field("watermark_exclusive_above", &self.watermark_exclusive_above)
99 .finish()
100 }
101}
102
103impl Default for IngestionOptions {
104 fn default() -> Self {
105 Self {
106 format: None,
107 excel_sheet_selection: ExcelSheetSelection::default(),
108 observer: None,
109 alert_at_or_above: IngestionSeverity::Critical,
110 watermark_column: None,
111 watermark_exclusive_above: None,
112 }
113 }
114}
115
116/// Unified ingestion entry point for path-based sources.
117///
118/// - If `options.format` is `None`, format is inferred from the file extension.
119/// - Use `options.excel_sheet_selection` for Excel multi-tab behavior.
120///
121/// When an observer is configured, this function reports:
122///
123/// - `on_success` on success, with row count stats
124/// - `on_failure` on failure, with a computed severity
125/// - `on_alert` on failure when the computed severity is >= `options.alert_at_or_above`
126///
127/// # Examples
128///
129/// ## CSV (auto-detect by extension)
130///
131/// ```no_run
132/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
133/// use rust_data_processing::types::{DataType, Field, Schema};
134///
135/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
136/// let schema = Schema::new(vec![
137/// Field::new("id", DataType::Int64),
138/// Field::new("name", DataType::Utf8),
139/// ]);
140///
141/// // Uses `.csv` to select CSV ingestion.
142/// let ds = ingest_from_path("people.csv", &schema, &IngestionOptions::default())?;
143/// println!("rows={}", ds.row_count());
144/// # Ok(())
145/// # }
146/// ```
147///
148/// ## JSON (auto-detect by extension, with nested field paths)
149///
150/// ```no_run
151/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
152/// use rust_data_processing::types::{DataType, Field, Schema};
153///
154/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
155/// // JSON supports nested field access via dot paths.
156/// let schema = Schema::new(vec![
157/// Field::new("id", DataType::Int64),
158/// Field::new("user.name", DataType::Utf8),
159/// ]);
160///
161/// let ds = ingest_from_path("events.json", &schema, &IngestionOptions::default())?;
162/// println!("rows={}", ds.row_count());
163/// # Ok(())
164/// # }
165/// ```
166///
167/// ## Parquet (auto-detect by extension)
168///
169/// ```no_run
170/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
171/// use rust_data_processing::types::{DataType, Field, Schema};
172///
173/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
174/// let schema = Schema::new(vec![
175/// Field::new("id", DataType::Int64),
176/// Field::new("active", DataType::Bool),
177/// ]);
178///
179/// let ds = ingest_from_path("data.parquet", &schema, &IngestionOptions::default())?;
180/// println!("rows={}", ds.row_count());
181/// # Ok(())
182/// # }
183/// ```
184///
185/// ## Force a format explicitly (override extension inference)
186///
187/// ```no_run
188/// use rust_data_processing::ingestion::{ingest_from_path, IngestionFormat, IngestionOptions};
189/// use rust_data_processing::types::{DataType, Field, Schema};
190///
191/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
192/// let schema = Schema::new(vec![Field::new("id", DataType::Int64)]);
193///
194/// let opts = IngestionOptions {
195/// format: Some(IngestionFormat::Csv),
196/// ..Default::default()
197/// };
198///
199/// // Useful when a file has no extension or you want to override inference.
200/// let ds = ingest_from_path("input_without_extension", &schema, &opts)?;
201/// println!("rows={}", ds.row_count());
202/// # Ok(())
203/// # }
204/// ```
205///
206/// ## Observability (stderr logging + alert threshold)
207///
208/// ```no_run
209/// use std::sync::Arc;
210///
211/// use rust_data_processing::ingestion::{
212/// ingest_from_path, IngestionOptions, IngestionSeverity, StdErrObserver,
213/// };
214/// use rust_data_processing::types::{DataType, Field, Schema};
215///
216/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
217/// let schema = Schema::new(vec![Field::new("id", DataType::Int64)]);
218///
219/// let opts = IngestionOptions {
220/// observer: Some(Arc::new(StdErrObserver::default())),
221/// alert_at_or_above: IngestionSeverity::Critical,
222/// ..Default::default()
223/// };
224///
225/// // Missing files are treated as Critical and will trigger `on_alert` at this threshold.
226/// let _err = ingest_from_path("does_not_exist.csv", &schema, &opts).unwrap_err();
227/// # Ok(())
228/// # }
229/// ```
230///
231/// ## Incremental load (high-watermark on files)
232///
233/// Set [`IngestionOptions::watermark_column`] and [`IngestionOptions::watermark_exclusive_above`]
234/// to keep only rows where the watermark column is **strictly greater** than the floor value.
235/// Nulls in that column are dropped. The same options apply to
236/// [`ingest_from_db`](crate::ingestion::ingest_from_db) when the **`db_connectorx`** Cargo feature
237/// is enabled (filter runs after the query result is loaded).
238///
239/// ```no_run
240/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
241/// use rust_data_processing::types::{DataType, Field, Schema, Value};
242///
243/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
244/// let schema = Schema::new(vec![
245/// Field::new("id", DataType::Int64),
246/// Field::new("ts", DataType::Int64),
247/// ]);
248/// let opts = IngestionOptions {
249/// watermark_column: Some("ts".to_string()),
250/// watermark_exclusive_above: Some(Value::Int64(100)),
251/// ..Default::default()
252/// };
253/// let ds = ingest_from_path("tests/fixtures/watermark_events.csv", &schema, &opts)?;
254/// assert_eq!(ds.row_count(), 2);
255/// # Ok(())
256/// # }
257/// ```
258///
259/// ## Excel
260///
261/// Example. Marked `no_run` so it is **compiled** by doctests
262/// (no "not tested" banner), but not executed (it expects a real `workbook.xlsx` file).
263///
264/// ```no_run
265/// use rust_data_processing::ingestion::{
266/// ingest_from_path, ExcelSheetSelection, IngestionFormat, IngestionOptions,
267/// };
268/// use rust_data_processing::types::{DataType, Field, Schema};
269///
270/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
271/// let schema = Schema::new(vec![
272/// Field::new("id", DataType::Int64),
273/// Field::new("name", DataType::Utf8),
274/// ]);
275///
276/// let opts = IngestionOptions {
277/// format: Some(IngestionFormat::Excel),
278/// excel_sheet_selection: ExcelSheetSelection::Sheet("Sheet1".to_string()),
279/// ..Default::default()
280/// };
281///
282/// let ds = ingest_from_path("workbook.xlsx", &schema, &opts)?;
283/// println!("rows={}", ds.row_count());
284/// # Ok(())
285/// # }
286/// ```
287pub fn ingest_from_path(
288 path: impl AsRef<Path>,
289 schema: &Schema,
290 options: &IngestionOptions,
291) -> IngestionResult<DataSet> {
292 validate_watermark_config(schema, options)?;
293
294 let path = path.as_ref();
295 let fmt = match options.format {
296 Some(f) => f,
297 None => infer_format_from_path(path)?,
298 };
299
300 let ctx = IngestionContext {
301 path: path.to_path_buf(),
302 format: fmt,
303 };
304
305 let result = match fmt {
306 IngestionFormat::Csv => csv::ingest_csv_from_path(path, schema),
307 IngestionFormat::Json => json::ingest_json_from_path(path, schema),
308 IngestionFormat::Parquet => parquet::ingest_parquet_from_path(path, schema),
309 IngestionFormat::Excel => {
310 ingest_excel_dispatch(path, schema, &options.excel_sheet_selection)
311 }
312 };
313
314 let result = result.and_then(|ds| apply_watermark_after_ingest(ds, schema, options));
315
316 if let Some(obs) = options.observer.as_ref() {
317 match &result {
318 Ok(ds) => obs.on_success(
319 &ctx,
320 IngestionStats {
321 rows: ds.row_count(),
322 },
323 ),
324 Err(e) => {
325 let sev = severity_for_error(e);
326 obs.on_failure(&ctx, sev, e);
327 if sev >= options.alert_at_or_above {
328 obs.on_alert(&ctx, sev, e);
329 }
330 }
331 }
332 }
333
334 result
335}
336
337/// Per-file [`IngestionOptions`] with watermark fields cleared so each file is loaded in full; the
338/// caller applies [`apply_watermark_after_ingest`] once over the concatenated batch.
339fn options_without_watermark(options: &IngestionOptions) -> IngestionOptions {
340 let mut o = options.clone();
341 o.watermark_column = None;
342 o.watermark_exclusive_above = None;
343 o
344}
345
346/// Metadata from [`ingest_from_ordered_paths`]: which paths were read, the last path in that
347/// sequence, and (when incremental watermark options are set) the maximum value in the watermark
348/// column **after** the batch filter — useful for checkpointing the next run.
349#[derive(Debug, Clone, PartialEq)]
350pub struct OrderedBatchIngestMetadata {
351 /// Files ingested, in the order given.
352 pub paths: Vec<PathBuf>,
353 /// Last entry in [`Self::paths`] (typical append-only batch cursor).
354 pub last_path: Option<PathBuf>,
355 /// Maximum of the watermark column over non-null cells in the output dataset, when watermark
356 /// options were set; otherwise `None`.
357 pub max_watermark_value: Option<Value>,
358}
359
360/// Ingest many files **in order**, concatenate rows into one [`DataSet`], then apply the watermark
361/// filter **once** (if configured).
362///
363/// Semantics: the high-water filter applies to the **combined** rows, not per file. For each path,
364/// this calls [`ingest_from_path`] with watermark options temporarily cleared so observers see
365/// full per-file loads; the batch watermark runs after concatenation.
366pub fn ingest_from_ordered_paths<P: AsRef<Path>>(
367 paths: &[P],
368 schema: &Schema,
369 options: &IngestionOptions,
370) -> IngestionResult<(DataSet, OrderedBatchIngestMetadata)> {
371 validate_watermark_config(schema, options)?;
372 if paths.is_empty() {
373 return Err(IngestionError::SchemaMismatch {
374 message: "ingest_from_ordered_paths: empty path list".to_string(),
375 });
376 }
377
378 let path_bufs: Vec<PathBuf> = paths.iter().map(|p| p.as_ref().to_path_buf()).collect();
379 let per_file_opts = options_without_watermark(options);
380
381 let mut all_rows: Vec<Vec<Value>> = Vec::new();
382 for p in &path_bufs {
383 let ds = ingest_from_path(p, schema, &per_file_opts)?;
384 all_rows.extend(ds.rows);
385 }
386
387 let mut ds = DataSet::new(schema.clone(), all_rows);
388 ds = apply_watermark_after_ingest(ds, schema, options)?;
389
390 let max_watermark_value = match &options.watermark_column {
391 Some(col) => max_value_in_column(&ds, schema, col),
392 None => None,
393 };
394
395 let last_path = path_bufs.last().cloned();
396 let meta = OrderedBatchIngestMetadata {
397 paths: path_bufs,
398 last_path,
399 max_watermark_value,
400 };
401
402 Ok((ds, meta))
403}
404
405/// Infer a [`Schema`] for an input file.
406///
407/// This is intended for quick exploration and benchmarking when callers don't have a schema yet.
408/// It uses a **best-effort** mapping into `DataType::{Int64, Float64, Bool, Utf8}`.
409///
410/// Notes:
411/// - For JSON, nested fields are inferred only at the **top level** (no dot-path expansion).
412/// - For Excel, inference uses a scan-based heuristic.
413pub fn infer_schema_from_path(
414 path: impl AsRef<Path>,
415 options: &IngestionOptions,
416) -> IngestionResult<Schema> {
417 let path = path.as_ref();
418 let fmt = match options.format {
419 Some(f) => f,
420 None => infer_format_from_path(path)?,
421 };
422
423 match fmt {
424 IngestionFormat::Csv => {
425 let df = LazyCsvReader::new(path.to_string_lossy().as_ref().into())
426 .with_has_header(true)
427 .finish()
428 .map_err(|e| polars_error_to_ingestion("failed to read csv with polars", e))?
429 .collect()
430 .map_err(|e| polars_error_to_ingestion("failed to collect csv with polars", e))?;
431 infer_schema_from_dataframe_lossy(&df)
432 }
433 IngestionFormat::Json => {
434 use std::fs::File;
435
436 let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
437 let json_format = if ext.eq_ignore_ascii_case("ndjson") {
438 JsonFormat::JsonLines
439 } else {
440 JsonFormat::Json
441 };
442
443 let file = File::open(path)?;
444 let df = JsonReader::new(file)
445 .with_json_format(json_format)
446 .finish()
447 .map_err(|e| polars_error_to_ingestion("failed to read json with polars", e))?;
448 infer_schema_from_dataframe_lossy(&df)
449 }
450 IngestionFormat::Parquet => {
451 let df = LazyFrame::scan_parquet(
452 path.to_string_lossy().as_ref().into(),
453 ScanArgsParquet::default(),
454 )
455 .map_err(|e| polars_error_to_ingestion("failed to read parquet with polars", e))?
456 .collect()
457 .map_err(|e| polars_error_to_ingestion("failed to collect parquet with polars", e))?;
458 infer_schema_from_dataframe_lossy(&df)
459 }
460 IngestionFormat::Excel => infer_excel_schema_dispatch(path, &options.excel_sheet_selection),
461 }
462}
463
464/// Convenience wrapper: infer schema and then ingest.
465pub fn ingest_from_path_infer(
466 path: impl AsRef<Path>,
467 options: &IngestionOptions,
468) -> IngestionResult<DataSet> {
469 let schema = infer_schema_from_path(path.as_ref(), options)?;
470 ingest_from_path(path, &schema, options)
471}
472
473fn severity_for_error(e: &IngestionError) -> IngestionSeverity {
474 match e {
475 IngestionError::Io(_) => IngestionSeverity::Critical,
476 IngestionError::Parquet(err) => {
477 // Best-effort: parquet errors often wrap IO, but not always in a structured way.
478 // If we can detect IO in the source chain, treat it as Critical.
479 if error_chain_contains_io(err) {
480 IngestionSeverity::Critical
481 } else {
482 IngestionSeverity::Error
483 }
484 }
485 IngestionError::Csv(err) => match err.kind() {
486 ::csv::ErrorKind::Io(_) => IngestionSeverity::Critical,
487 _ => IngestionSeverity::Error,
488 },
489 #[cfg(feature = "excel")]
490 IngestionError::Excel(_) => IngestionSeverity::Error,
491 IngestionError::Engine { source, .. } => {
492 if error_chain_contains_io(source.as_ref()) {
493 IngestionSeverity::Critical
494 } else {
495 IngestionSeverity::Error
496 }
497 }
498 IngestionError::SchemaMismatch { .. } => IngestionSeverity::Error,
499 IngestionError::ParseError { .. } => IngestionSeverity::Error,
500 }
501}
502
503fn error_chain_contains_io(e: &(dyn StdError + 'static)) -> bool {
504 let mut cur: Option<&(dyn StdError + 'static)> = Some(e);
505 while let Some(err) = cur {
506 if err.is::<std::io::Error>() {
507 return true;
508 }
509 cur = err.source();
510 }
511 false
512}
513
514fn infer_format_from_path(path: &Path) -> IngestionResult<IngestionFormat> {
515 let ext = path.extension().and_then(|s| s.to_str()).ok_or_else(|| {
516 IngestionError::SchemaMismatch {
517 message: format!(
518 "cannot infer format: path has no extension ({})",
519 path.display()
520 ),
521 }
522 })?;
523
524 IngestionFormat::from_extension(ext).ok_or_else(|| IngestionError::SchemaMismatch {
525 message: format!(
526 "cannot infer format from extension '{ext}' for path ({})",
527 path.display()
528 ),
529 })
530}
531
532fn ingest_excel_dispatch(
533 path: &Path,
534 schema: &Schema,
535 sel: &ExcelSheetSelection,
536) -> IngestionResult<DataSet> {
537 match sel {
538 ExcelSheetSelection::First => excel::ingest_excel_from_path(path, None, schema),
539 ExcelSheetSelection::Sheet(name) => {
540 excel::ingest_excel_from_path(path, Some(name.as_str()), schema)
541 }
542 ExcelSheetSelection::AllSheets => {
543 excel::ingest_excel_workbook_from_path(path, None, schema)
544 }
545 ExcelSheetSelection::Sheets(names) => {
546 let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
547 excel::ingest_excel_workbook_from_path(path, Some(refs.as_slice()), schema)
548 }
549 }
550}
551
552fn infer_excel_schema_dispatch(path: &Path, sel: &ExcelSheetSelection) -> IngestionResult<Schema> {
553 match sel {
554 ExcelSheetSelection::First => excel::infer_excel_schema_from_path(path, None),
555 ExcelSheetSelection::Sheet(name) => {
556 excel::infer_excel_schema_from_path(path, Some(name.as_str()))
557 }
558 ExcelSheetSelection::AllSheets => excel::infer_excel_schema_workbook_from_path(path, None),
559 ExcelSheetSelection::Sheets(names) => {
560 let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
561 excel::infer_excel_schema_workbook_from_path(path, Some(refs.as_slice()))
562 }
563 }
564}
565
566/// Convenience helper for callers that want an owned request object.
567///
568/// This can be useful if you want to enqueue ingestion work in a job system.
569#[derive(Clone)]
570pub struct IngestionRequest {
571 /// Path to the input file.
572 pub path: PathBuf,
573 /// Schema to validate/parse values into.
574 pub schema: Schema,
575 /// Options controlling ingestion.
576 pub options: IngestionOptions,
577}
578
579impl fmt::Debug for IngestionRequest {
580 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
581 f.debug_struct("IngestionRequest")
582 .field("path", &self.path)
583 .field("schema_fields", &self.schema.fields.len())
584 .field("options", &self.options)
585 .finish()
586 }
587}
588
589impl IngestionRequest {
590 /// Execute the request by calling [`ingest_from_path`].
591 pub fn run(&self) -> IngestionResult<DataSet> {
592 ingest_from_path(&self.path, &self.schema, &self.options)
593 }
594}