Skip to main content

rust_data_processing/ingestion/
unified.rs

1//! Unified ingestion entrypoint.
2//!
3//! Most callers should use [`ingest_from_path`], which ingests a file into an in-memory
4//! [`crate::types::DataSet`] using a provided [`crate::types::Schema`].
5//!
6//! - If [`IngestionOptions::format`] is `None`, the ingestion format is inferred from the file
7//!   extension.
8//! - If an [`super::observability::IngestionObserver`] is provided, success/failure/alerts are
9//!   reported to it.
10
11use std::error::Error as StdError;
12use std::fmt;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15
16use crate::error::{IngestionError, IngestionResult};
17use crate::types::{DataSet, Schema, Value};
18
19use super::observability::{
20    IngestionContext, IngestionObserver, IngestionSeverity, IngestionStats,
21};
22use super::polars_bridge::{infer_schema_from_dataframe_lossy, polars_error_to_ingestion};
23use super::watermark::{
24    apply_watermark_after_ingest, max_value_in_column, validate_watermark_config,
25};
26use super::{csv, excel, json, parquet};
27use polars::prelude::*;
28
29/// Supported ingestion formats.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum IngestionFormat {
32    /// Comma-separated values.
33    Csv,
34    /// JSON array-of-objects or NDJSON.
35    Json,
36    /// Apache Parquet.
37    Parquet,
38    /// Spreadsheet/workbook formats (feature-gated behind `excel`).
39    Excel,
40}
41
42impl IngestionFormat {
43    /// Parse an ingestion format from a file extension (case-insensitive).
44    pub fn from_extension(ext: &str) -> Option<Self> {
45        match ext.to_ascii_lowercase().as_str() {
46            "csv" => Some(Self::Csv),
47            "json" | "ndjson" => Some(Self::Json),
48            "parquet" | "pq" => Some(Self::Parquet),
49            "xlsx" | "xls" | "xlsm" | "xlsb" | "ods" => Some(Self::Excel),
50            _ => None,
51        }
52    }
53}
54
55/// How to choose sheet(s) when ingesting an Excel workbook.
56#[derive(Debug, Clone, PartialEq, Eq, Default)]
57pub enum ExcelSheetSelection {
58    /// Ingest the first sheet (default).
59    #[default]
60    First,
61    /// Ingest a single named sheet.
62    Sheet(String),
63    /// Ingest all sheets and concatenate rows.
64    AllSheets,
65    /// Ingest only the listed sheets (in order) and concatenate rows.
66    Sheets(Vec<String>),
67}
68
69/// Options controlling unified ingestion behavior.
70///
71/// Use [`Default`] for common cases.
72#[derive(Clone)]
73pub struct IngestionOptions {
74    /// If `None`, auto-detect format from file extension.
75    pub format: Option<IngestionFormat>,
76    /// Excel-specific options.
77    pub excel_sheet_selection: ExcelSheetSelection,
78    /// Optional observer for logging/alerts.
79    pub observer: Option<Arc<dyn IngestionObserver>>,
80    /// Severity threshold at which `on_alert` is invoked.
81    pub alert_at_or_above: IngestionSeverity,
82    /// Column name for incremental / high-water filtering (must be used with
83    /// [`Self::watermark_exclusive_above`]).
84    pub watermark_column: Option<String>,
85    /// Keep only rows where `watermark_column` is **strictly greater than** this value (same
86    /// [`crate::types::DataType`] as the column). Applied after ingest for file and DB sources.
87    pub watermark_exclusive_above: Option<Value>,
88}
89
90impl fmt::Debug for IngestionOptions {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        f.debug_struct("IngestionOptions")
93            .field("format", &self.format)
94            .field("excel_sheet_selection", &self.excel_sheet_selection)
95            .field("observer_set", &self.observer.is_some())
96            .field("alert_at_or_above", &self.alert_at_or_above)
97            .field("watermark_column", &self.watermark_column)
98            .field("watermark_exclusive_above", &self.watermark_exclusive_above)
99            .finish()
100    }
101}
102
103impl Default for IngestionOptions {
104    fn default() -> Self {
105        Self {
106            format: None,
107            excel_sheet_selection: ExcelSheetSelection::default(),
108            observer: None,
109            alert_at_or_above: IngestionSeverity::Critical,
110            watermark_column: None,
111            watermark_exclusive_above: None,
112        }
113    }
114}
115
116/// Unified ingestion entry point for path-based sources.
117///
118/// - If `options.format` is `None`, format is inferred from the file extension.
119/// - Use `options.excel_sheet_selection` for Excel multi-tab behavior.
120///
121/// When an observer is configured, this function reports:
122///
123/// - `on_success` on success, with row count stats
124/// - `on_failure` on failure, with a computed severity
125/// - `on_alert` on failure when the computed severity is >= `options.alert_at_or_above`
126///
127/// # Examples
128///
129/// ## CSV (auto-detect by extension)
130///
131/// ```no_run
132/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
133/// use rust_data_processing::types::{DataType, Field, Schema};
134///
135/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
136/// let schema = Schema::new(vec![
137///     Field::new("id", DataType::Int64),
138///     Field::new("name", DataType::Utf8),
139/// ]);
140///
141/// // Uses `.csv` to select CSV ingestion.
142/// let ds = ingest_from_path("people.csv", &schema, &IngestionOptions::default())?;
143/// println!("rows={}", ds.row_count());
144/// # Ok(())
145/// # }
146/// ```
147///
148/// ## JSON (auto-detect by extension, with nested field paths)
149///
150/// ```no_run
151/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
152/// use rust_data_processing::types::{DataType, Field, Schema};
153///
154/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
155/// // JSON supports nested field access via dot paths.
156/// let schema = Schema::new(vec![
157///     Field::new("id", DataType::Int64),
158///     Field::new("user.name", DataType::Utf8),
159/// ]);
160///
161/// let ds = ingest_from_path("events.json", &schema, &IngestionOptions::default())?;
162/// println!("rows={}", ds.row_count());
163/// # Ok(())
164/// # }
165/// ```
166///
167/// ## Parquet (auto-detect by extension)
168///
169/// ```no_run
170/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
171/// use rust_data_processing::types::{DataType, Field, Schema};
172///
173/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
174/// let schema = Schema::new(vec![
175///     Field::new("id", DataType::Int64),
176///     Field::new("active", DataType::Bool),
177/// ]);
178///
179/// let ds = ingest_from_path("data.parquet", &schema, &IngestionOptions::default())?;
180/// println!("rows={}", ds.row_count());
181/// # Ok(())
182/// # }
183/// ```
184///
185/// ## Force a format explicitly (override extension inference)
186///
187/// ```no_run
188/// use rust_data_processing::ingestion::{ingest_from_path, IngestionFormat, IngestionOptions};
189/// use rust_data_processing::types::{DataType, Field, Schema};
190///
191/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
192/// let schema = Schema::new(vec![Field::new("id", DataType::Int64)]);
193///
194/// let opts = IngestionOptions {
195///     format: Some(IngestionFormat::Csv),
196///     ..Default::default()
197/// };
198///
199/// // Useful when a file has no extension or you want to override inference.
200/// let ds = ingest_from_path("input_without_extension", &schema, &opts)?;
201/// println!("rows={}", ds.row_count());
202/// # Ok(())
203/// # }
204/// ```
205///
206/// ## Observability (stderr logging + alert threshold)
207///
208/// ```no_run
209/// use std::sync::Arc;
210///
211/// use rust_data_processing::ingestion::{
212///     ingest_from_path, IngestionOptions, IngestionSeverity, StdErrObserver,
213/// };
214/// use rust_data_processing::types::{DataType, Field, Schema};
215///
216/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
217/// let schema = Schema::new(vec![Field::new("id", DataType::Int64)]);
218///
219/// let opts = IngestionOptions {
220///     observer: Some(Arc::new(StdErrObserver::default())),
221///     alert_at_or_above: IngestionSeverity::Critical,
222///     ..Default::default()
223/// };
224///
225/// // Missing files are treated as Critical and will trigger `on_alert` at this threshold.
226/// let _err = ingest_from_path("does_not_exist.csv", &schema, &opts).unwrap_err();
227/// # Ok(())
228/// # }
229/// ```
230///
231/// ## Incremental load (high-watermark on files)
232///
233/// Set [`IngestionOptions::watermark_column`] and [`IngestionOptions::watermark_exclusive_above`]
234/// to keep only rows where the watermark column is **strictly greater** than the floor value.
235/// Nulls in that column are dropped. The same options apply to
236/// [`ingest_from_db`](crate::ingestion::ingest_from_db) when the **`db_connectorx`** Cargo feature
237/// is enabled (filter runs after the query result is loaded).
238///
239/// ```no_run
240/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
241/// use rust_data_processing::types::{DataType, Field, Schema, Value};
242///
243/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
244/// let schema = Schema::new(vec![
245///     Field::new("id", DataType::Int64),
246///     Field::new("ts", DataType::Int64),
247/// ]);
248/// let opts = IngestionOptions {
249///     watermark_column: Some("ts".to_string()),
250///     watermark_exclusive_above: Some(Value::Int64(100)),
251///     ..Default::default()
252/// };
253/// let ds = ingest_from_path("tests/fixtures/watermark_events.csv", &schema, &opts)?;
254/// assert_eq!(ds.row_count(), 2);
255/// # Ok(())
256/// # }
257/// ```
258///
259/// ## Excel
260///
261/// Example. Marked `no_run` so it is **compiled** by doctests
262/// (no "not tested" banner), but not executed (it expects a real `workbook.xlsx` file).
263///
264/// ```no_run
265/// use rust_data_processing::ingestion::{
266///     ingest_from_path, ExcelSheetSelection, IngestionFormat, IngestionOptions,
267/// };
268/// use rust_data_processing::types::{DataType, Field, Schema};
269///
270/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
271/// let schema = Schema::new(vec![
272///     Field::new("id", DataType::Int64),
273///     Field::new("name", DataType::Utf8),
274/// ]);
275///
276/// let opts = IngestionOptions {
277///     format: Some(IngestionFormat::Excel),
278///     excel_sheet_selection: ExcelSheetSelection::Sheet("Sheet1".to_string()),
279///     ..Default::default()
280/// };
281///
282/// let ds = ingest_from_path("workbook.xlsx", &schema, &opts)?;
283/// println!("rows={}", ds.row_count());
284/// # Ok(())
285/// # }
286/// ```
287pub fn ingest_from_path(
288    path: impl AsRef<Path>,
289    schema: &Schema,
290    options: &IngestionOptions,
291) -> IngestionResult<DataSet> {
292    validate_watermark_config(schema, options)?;
293
294    let path = path.as_ref();
295    let fmt = match options.format {
296        Some(f) => f,
297        None => infer_format_from_path(path)?,
298    };
299
300    let ctx = IngestionContext {
301        path: path.to_path_buf(),
302        format: fmt,
303    };
304
305    let result = match fmt {
306        IngestionFormat::Csv => csv::ingest_csv_from_path(path, schema),
307        IngestionFormat::Json => json::ingest_json_from_path(path, schema),
308        IngestionFormat::Parquet => parquet::ingest_parquet_from_path(path, schema),
309        IngestionFormat::Excel => {
310            ingest_excel_dispatch(path, schema, &options.excel_sheet_selection)
311        }
312    };
313
314    let result = result.and_then(|ds| apply_watermark_after_ingest(ds, schema, options));
315
316    if let Some(obs) = options.observer.as_ref() {
317        match &result {
318            Ok(ds) => obs.on_success(
319                &ctx,
320                IngestionStats {
321                    rows: ds.row_count(),
322                },
323            ),
324            Err(e) => {
325                let sev = severity_for_error(e);
326                obs.on_failure(&ctx, sev, e);
327                if sev >= options.alert_at_or_above {
328                    obs.on_alert(&ctx, sev, e);
329                }
330            }
331        }
332    }
333
334    result
335}
336
337/// Per-file [`IngestionOptions`] with watermark fields cleared so each file is loaded in full; the
338/// caller applies [`apply_watermark_after_ingest`] once over the concatenated batch.
339fn options_without_watermark(options: &IngestionOptions) -> IngestionOptions {
340    let mut o = options.clone();
341    o.watermark_column = None;
342    o.watermark_exclusive_above = None;
343    o
344}
345
346/// Metadata from [`ingest_from_ordered_paths`]: which paths were read, the last path in that
347/// sequence, and (when incremental watermark options are set) the maximum value in the watermark
348/// column **after** the batch filter — useful for checkpointing the next run.
349#[derive(Debug, Clone, PartialEq)]
350pub struct OrderedBatchIngestMetadata {
351    /// Files ingested, in the order given.
352    pub paths: Vec<PathBuf>,
353    /// Last entry in [`Self::paths`] (typical append-only batch cursor).
354    pub last_path: Option<PathBuf>,
355    /// Maximum of the watermark column over non-null cells in the output dataset, when watermark
356    /// options were set; otherwise `None`.
357    pub max_watermark_value: Option<Value>,
358}
359
360/// Ingest many files **in order**, concatenate rows into one [`DataSet`], then apply the watermark
361/// filter **once** (if configured).
362///
363/// Semantics: the high-water filter applies to the **combined** rows, not per file. For each path,
364/// this calls [`ingest_from_path`] with watermark options temporarily cleared so observers see
365/// full per-file loads; the batch watermark runs after concatenation.
366pub fn ingest_from_ordered_paths<P: AsRef<Path>>(
367    paths: &[P],
368    schema: &Schema,
369    options: &IngestionOptions,
370) -> IngestionResult<(DataSet, OrderedBatchIngestMetadata)> {
371    validate_watermark_config(schema, options)?;
372    if paths.is_empty() {
373        return Err(IngestionError::SchemaMismatch {
374            message: "ingest_from_ordered_paths: empty path list".to_string(),
375        });
376    }
377
378    let path_bufs: Vec<PathBuf> = paths.iter().map(|p| p.as_ref().to_path_buf()).collect();
379    let per_file_opts = options_without_watermark(options);
380
381    let mut all_rows: Vec<Vec<Value>> = Vec::new();
382    for p in &path_bufs {
383        let ds = ingest_from_path(p, schema, &per_file_opts)?;
384        all_rows.extend(ds.rows);
385    }
386
387    let mut ds = DataSet::new(schema.clone(), all_rows);
388    ds = apply_watermark_after_ingest(ds, schema, options)?;
389
390    let max_watermark_value = match &options.watermark_column {
391        Some(col) => max_value_in_column(&ds, schema, col),
392        None => None,
393    };
394
395    let last_path = path_bufs.last().cloned();
396    let meta = OrderedBatchIngestMetadata {
397        paths: path_bufs,
398        last_path,
399        max_watermark_value,
400    };
401
402    Ok((ds, meta))
403}
404
405/// Infer a [`Schema`] for an input file.
406///
407/// This is intended for quick exploration and benchmarking when callers don't have a schema yet.
408/// It uses a **best-effort** mapping into `DataType::{Int64, Float64, Bool, Utf8}`.
409///
410/// Notes:
411/// - For JSON, nested fields are inferred only at the **top level** (no dot-path expansion).
412/// - For Excel, inference uses a scan-based heuristic.
413pub fn infer_schema_from_path(
414    path: impl AsRef<Path>,
415    options: &IngestionOptions,
416) -> IngestionResult<Schema> {
417    let path = path.as_ref();
418    let fmt = match options.format {
419        Some(f) => f,
420        None => infer_format_from_path(path)?,
421    };
422
423    match fmt {
424        IngestionFormat::Csv => {
425            let df = LazyCsvReader::new(path.to_string_lossy().as_ref().into())
426                .with_has_header(true)
427                .finish()
428                .map_err(|e| polars_error_to_ingestion("failed to read csv with polars", e))?
429                .collect()
430                .map_err(|e| polars_error_to_ingestion("failed to collect csv with polars", e))?;
431            infer_schema_from_dataframe_lossy(&df)
432        }
433        IngestionFormat::Json => {
434            use std::fs::File;
435
436            let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
437            let json_format = if ext.eq_ignore_ascii_case("ndjson") {
438                JsonFormat::JsonLines
439            } else {
440                JsonFormat::Json
441            };
442
443            let file = File::open(path)?;
444            let df = JsonReader::new(file)
445                .with_json_format(json_format)
446                .finish()
447                .map_err(|e| polars_error_to_ingestion("failed to read json with polars", e))?;
448            infer_schema_from_dataframe_lossy(&df)
449        }
450        IngestionFormat::Parquet => {
451            let df = LazyFrame::scan_parquet(
452                path.to_string_lossy().as_ref().into(),
453                ScanArgsParquet::default(),
454            )
455            .map_err(|e| polars_error_to_ingestion("failed to read parquet with polars", e))?
456            .collect()
457            .map_err(|e| polars_error_to_ingestion("failed to collect parquet with polars", e))?;
458            infer_schema_from_dataframe_lossy(&df)
459        }
460        IngestionFormat::Excel => infer_excel_schema_dispatch(path, &options.excel_sheet_selection),
461    }
462}
463
464/// Convenience wrapper: infer schema and then ingest.
465pub fn ingest_from_path_infer(
466    path: impl AsRef<Path>,
467    options: &IngestionOptions,
468) -> IngestionResult<DataSet> {
469    let schema = infer_schema_from_path(path.as_ref(), options)?;
470    ingest_from_path(path, &schema, options)
471}
472
473fn severity_for_error(e: &IngestionError) -> IngestionSeverity {
474    match e {
475        IngestionError::Io(_) => IngestionSeverity::Critical,
476        IngestionError::Parquet(err) => {
477            // Best-effort: parquet errors often wrap IO, but not always in a structured way.
478            // If we can detect IO in the source chain, treat it as Critical.
479            if error_chain_contains_io(err) {
480                IngestionSeverity::Critical
481            } else {
482                IngestionSeverity::Error
483            }
484        }
485        IngestionError::Csv(err) => match err.kind() {
486            ::csv::ErrorKind::Io(_) => IngestionSeverity::Critical,
487            _ => IngestionSeverity::Error,
488        },
489        #[cfg(feature = "excel")]
490        IngestionError::Excel(_) => IngestionSeverity::Error,
491        IngestionError::Engine { source, .. } => {
492            if error_chain_contains_io(source.as_ref()) {
493                IngestionSeverity::Critical
494            } else {
495                IngestionSeverity::Error
496            }
497        }
498        IngestionError::SchemaMismatch { .. } => IngestionSeverity::Error,
499        IngestionError::ParseError { .. } => IngestionSeverity::Error,
500    }
501}
502
503fn error_chain_contains_io(e: &(dyn StdError + 'static)) -> bool {
504    let mut cur: Option<&(dyn StdError + 'static)> = Some(e);
505    while let Some(err) = cur {
506        if err.is::<std::io::Error>() {
507            return true;
508        }
509        cur = err.source();
510    }
511    false
512}
513
514fn infer_format_from_path(path: &Path) -> IngestionResult<IngestionFormat> {
515    let ext = path.extension().and_then(|s| s.to_str()).ok_or_else(|| {
516        IngestionError::SchemaMismatch {
517            message: format!(
518                "cannot infer format: path has no extension ({})",
519                path.display()
520            ),
521        }
522    })?;
523
524    IngestionFormat::from_extension(ext).ok_or_else(|| IngestionError::SchemaMismatch {
525        message: format!(
526            "cannot infer format from extension '{ext}' for path ({})",
527            path.display()
528        ),
529    })
530}
531
532fn ingest_excel_dispatch(
533    path: &Path,
534    schema: &Schema,
535    sel: &ExcelSheetSelection,
536) -> IngestionResult<DataSet> {
537    match sel {
538        ExcelSheetSelection::First => excel::ingest_excel_from_path(path, None, schema),
539        ExcelSheetSelection::Sheet(name) => {
540            excel::ingest_excel_from_path(path, Some(name.as_str()), schema)
541        }
542        ExcelSheetSelection::AllSheets => {
543            excel::ingest_excel_workbook_from_path(path, None, schema)
544        }
545        ExcelSheetSelection::Sheets(names) => {
546            let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
547            excel::ingest_excel_workbook_from_path(path, Some(refs.as_slice()), schema)
548        }
549    }
550}
551
552fn infer_excel_schema_dispatch(path: &Path, sel: &ExcelSheetSelection) -> IngestionResult<Schema> {
553    match sel {
554        ExcelSheetSelection::First => excel::infer_excel_schema_from_path(path, None),
555        ExcelSheetSelection::Sheet(name) => {
556            excel::infer_excel_schema_from_path(path, Some(name.as_str()))
557        }
558        ExcelSheetSelection::AllSheets => excel::infer_excel_schema_workbook_from_path(path, None),
559        ExcelSheetSelection::Sheets(names) => {
560            let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
561            excel::infer_excel_schema_workbook_from_path(path, Some(refs.as_slice()))
562        }
563    }
564}
565
566/// Convenience helper for callers that want an owned request object.
567///
568/// This can be useful if you want to enqueue ingestion work in a job system.
569#[derive(Clone)]
570pub struct IngestionRequest {
571    /// Path to the input file.
572    pub path: PathBuf,
573    /// Schema to validate/parse values into.
574    pub schema: Schema,
575    /// Options controlling ingestion.
576    pub options: IngestionOptions,
577}
578
579impl fmt::Debug for IngestionRequest {
580    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
581        f.debug_struct("IngestionRequest")
582            .field("path", &self.path)
583            .field("schema_fields", &self.schema.fields.len())
584            .field("options", &self.options)
585            .finish()
586    }
587}
588
589impl IngestionRequest {
590    /// Execute the request by calling [`ingest_from_path`].
591    pub fn run(&self) -> IngestionResult<DataSet> {
592        ingest_from_path(&self.path, &self.schema, &self.options)
593    }
594}