Skip to main content

rust_data_processing/ingestion/
mod.rs

1//! Ingestion entrypoints and implementations.
2//!
3//! Most callers should use [`ingest_from_path`] (from [`unified`]) which:
4//!
5//! - auto-detects format by file extension (or you can override via [`IngestionOptions`])
6//! - performs ingestion into an in-memory [`crate::types::DataSet`]
7//! - optionally reports success/failure/alerts to an [`IngestionObserver`]
8//!
9//! For **append-only ordered batches**, use [`ingest_from_ordered_paths`] (concatenate rows, then
10//! apply the watermark filter once). For stable directory listings, see [`paths_from_directory_scan`]
11//! and [`partition`] module docs on deterministic ordering.
12//!
13//! For ergonomic configuration, prefer [`IngestionOptionsBuilder`] over constructing
14//! [`IngestionOptions`] directly.
15//!
16//! Format-specific functions are also available under:
17//! - [`csv`]
18//! - [`excel`]
19//! - [`json`]
20//! - [`parquet`]
21
22pub mod builder;
23pub mod csv;
24#[cfg(feature = "excel")]
25pub mod excel;
26#[cfg(not(feature = "excel"))]
27pub mod excel {
28    //! Excel ingestion stubs when the `excel` feature is disabled.
29    //!
30    //! This keeps the public module path stable (`rust_data_processing::ingestion::excel`)
31    //! while avoiding pulling Excel dependencies into the default build.
32
33    use std::path::Path;
34
35    use crate::error::{IngestionError, IngestionResult};
36    use crate::types::{DataSet, Schema};
37
38    fn disabled() -> IngestionError {
39        IngestionError::SchemaMismatch {
40            message: "excel ingestion is disabled; enable Cargo feature 'excel'".to_string(),
41        }
42    }
43
44    pub fn ingest_excel_from_path(
45        _path: impl AsRef<Path>,
46        _sheet_name: Option<&str>,
47        _schema: &Schema,
48    ) -> IngestionResult<DataSet> {
49        Err(disabled())
50    }
51
52    pub fn ingest_excel_workbook_from_path(
53        _path: impl AsRef<Path>,
54        _sheet_names: Option<&[&str]>,
55        _schema: &Schema,
56    ) -> IngestionResult<DataSet> {
57        Err(disabled())
58    }
59
60    pub fn infer_excel_schema_from_path(
61        _path: impl AsRef<Path>,
62        _sheet_name: Option<&str>,
63    ) -> IngestionResult<Schema> {
64        Err(disabled())
65    }
66
67    pub fn infer_excel_schema_workbook_from_path(
68        _path: impl AsRef<Path>,
69        _sheet_names: Option<&[&str]>,
70    ) -> IngestionResult<Schema> {
71        Err(disabled())
72    }
73}
74#[cfg(feature = "db_connectorx")]
75pub mod db;
76pub mod json;
77pub mod parquet;
78pub mod partition;
79#[cfg(not(feature = "db_connectorx"))]
80pub mod db {
81    //! Direct DB ingestion stubs when `db_connectorx` is disabled.
82    //!
83    //! Enable with `--features db_connectorx` plus a source, e.g. `--features db_mysql`.
84
85    use crate::error::{IngestionError, IngestionResult};
86    use crate::types::{DataSet, Schema};
87
88    fn disabled() -> IngestionError {
89        IngestionError::SchemaMismatch {
90            message: "db ingestion is disabled; enable Cargo feature 'db_connectorx'".to_string(),
91        }
92    }
93
94    pub fn ingest_from_db(
95        _conn: &str,
96        _query: &str,
97        _schema: &Schema,
98        _options: &super::IngestionOptions,
99    ) -> IngestionResult<DataSet> {
100        Err(disabled())
101    }
102
103    pub fn ingest_from_db_infer(
104        _conn: &str,
105        _query: &str,
106        _options: &super::IngestionOptions,
107    ) -> IngestionResult<DataSet> {
108        Err(disabled())
109    }
110}
111pub mod observability;
112pub(crate) mod polars_bridge;
113pub mod unified;
114pub mod watermark;
115
116pub use builder::IngestionOptionsBuilder;
117pub use observability::{
118    CompositeObserver, FileObserver, IngestionContext, IngestionObserver, IngestionSeverity,
119    IngestionStats, StdErrObserver,
120};
121pub use partition::{
122    PartitionSegment, PartitionedFile, discover_hive_partitioned_files,
123    hive_segments_for_relative_parent, parse_partition_segment, paths_from_directory_scan,
124    paths_from_explicit_list, paths_from_glob,
125};
126pub use unified::{
127    ExcelSheetSelection, IngestionFormat, IngestionOptions, IngestionRequest,
128    OrderedBatchIngestMetadata, infer_schema_from_path, ingest_from_ordered_paths,
129    ingest_from_path, ingest_from_path_infer,
130};
131pub use watermark::{
132    apply_watermark_after_ingest, apply_watermark_filter, max_value_in_column,
133    validate_watermark_config,
134};
135
136pub use db::{ingest_from_db, ingest_from_db_infer};