Skip to main content

rust_data_processing/ingestion/
mod.rs

1//! Ingestion entrypoints and implementations.
2//!
3//! Most callers should use [`ingest_from_path`] (from [`unified`]) which:
4//!
5//! - auto-detects format by file extension (or you can override via [`IngestionOptions`])
6//! - performs ingestion into an in-memory [`crate::types::DataSet`]
7//! - optionally reports success/failure/alerts to an [`IngestionObserver`]
8//!
9//! For **append-only ordered batches**, use [`ingest_from_ordered_paths`] (concatenate rows, then
10//! apply the watermark filter once). For stable directory listings, see [`paths_from_directory_scan`]
11//! and [`partition`] module docs on deterministic ordering.
12//!
13//! For ergonomic configuration, prefer [`IngestionOptionsBuilder`] over constructing
14//! [`IngestionOptions`] directly.
15//!
16//! Format-specific functions are also available under:
17//! - [`csv`]
18//! - [`excel`]
19//! - [`json`]
20//! - [`parquet`]
21//! - [`xml`]
22
23pub mod builder;
24pub mod csv;
25#[cfg(feature = "excel")]
26pub mod excel;
27#[cfg(not(feature = "excel"))]
28pub mod excel {
29    //! Excel ingestion stubs when the `excel` feature is disabled.
30    //!
31    //! This keeps the public module path stable (`rust_data_processing::ingestion::excel`)
32    //! while avoiding pulling Excel dependencies into the default build.
33
34    use std::path::Path;
35
36    use crate::error::{IngestionError, IngestionResult};
37    use crate::types::{DataSet, Schema};
38
39    fn disabled() -> IngestionError {
40        IngestionError::SchemaMismatch {
41            message: "excel ingestion is disabled; enable Cargo feature 'excel'".to_string(),
42        }
43    }
44
45    pub fn ingest_excel_from_path(
46        _path: impl AsRef<Path>,
47        _sheet_name: Option<&str>,
48        _schema: &Schema,
49    ) -> IngestionResult<DataSet> {
50        Err(disabled())
51    }
52
53    pub fn ingest_excel_workbook_from_path(
54        _path: impl AsRef<Path>,
55        _sheet_names: Option<&[&str]>,
56        _schema: &Schema,
57    ) -> IngestionResult<DataSet> {
58        Err(disabled())
59    }
60
61    pub fn infer_excel_schema_from_path(
62        _path: impl AsRef<Path>,
63        _sheet_name: Option<&str>,
64    ) -> IngestionResult<Schema> {
65        Err(disabled())
66    }
67
68    pub fn infer_excel_schema_workbook_from_path(
69        _path: impl AsRef<Path>,
70        _sheet_names: Option<&[&str]>,
71    ) -> IngestionResult<Schema> {
72        Err(disabled())
73    }
74}
75#[cfg(feature = "file_transfer")]
76pub mod file_transfer;
77#[cfg(not(feature = "file_transfer"))]
78pub mod file_transfer {
79    use super::IngestionOptions;
80    use crate::error::{IngestionError, IngestionResult};
81    use crate::types::{DataSet, Schema};
82
83    fn disabled() -> IngestionError {
84        IngestionError::SchemaMismatch {
85            message: "file_transfer support disabled; enable Cargo feature 'file_transfer' or 'cloud_connectors'"
86                .to_string(),
87        }
88    }
89
90    pub fn is_file_transfer_uri(_uri: &str) -> bool {
91        false
92    }
93
94    pub fn file_transfer_scheme(_uri: &str) -> Option<&'static str> {
95        None
96    }
97
98    pub fn ingest_from_file_transfer_uri(
99        _uri: &str,
100        _schema: &Schema,
101        _options: &IngestionOptions,
102    ) -> IngestionResult<DataSet> {
103        Err(disabled())
104    }
105}
106
107#[cfg(feature = "object_store")]
108pub mod object_store;
109#[cfg(not(feature = "object_store"))]
110pub mod object_store {
111    use super::IngestionOptions;
112    use crate::error::{IngestionError, IngestionResult};
113    use crate::types::{DataSet, Schema};
114
115    fn disabled() -> IngestionError {
116        IngestionError::SchemaMismatch {
117            message: "object_store support disabled; enable Cargo feature 'object_store'"
118                .to_string(),
119        }
120    }
121
122    pub fn ingest_from_object_store_uri(
123        _uri: &str,
124        _schema: &Schema,
125        _options: &IngestionOptions,
126    ) -> IngestionResult<DataSet> {
127        Err(disabled())
128    }
129
130    pub fn export_dataset_to_object_store_uri(_uri: &str, _ds: &DataSet) -> IngestionResult<()> {
131        Err(disabled())
132    }
133}
134
135#[cfg(feature = "delta_lake")]
136pub mod delta_lake;
137#[cfg(not(feature = "delta_lake"))]
138pub mod delta_lake {
139    use crate::error::{IngestionError, IngestionResult};
140    use crate::types::DataSet;
141
142    pub fn delta_table_uri(warehouse: &str, namespace: Option<&str>, table: &str) -> String {
143        let base = warehouse.trim_end_matches('/');
144        match namespace.filter(|n| !n.is_empty()) {
145            Some(ns) => format!("{base}/{ns}/{table}"),
146            None => format!("{base}/{table}"),
147        }
148    }
149
150    pub fn write_dataset_to_delta_table(_table_uri: &str, _ds: &DataSet) -> IngestionResult<usize> {
151        Err(IngestionError::SchemaMismatch {
152            message: "delta_lake support disabled; enable Cargo feature 'delta_lake'".to_string(),
153        })
154    }
155}
156
157#[cfg(feature = "db_connectorx")]
158pub mod db;
159pub mod json;
160pub mod parquet;
161pub mod partition;
162pub mod snowflake;
163pub mod xml;
164#[cfg(not(feature = "db_connectorx"))]
165pub mod db {
166    //! Direct DB ingestion stubs when `db_connectorx` is disabled.
167    //!
168    //! Enable with `--features db_connectorx` plus a source, e.g. `--features db_mysql`.
169
170    use crate::error::{IngestionError, IngestionResult};
171    use crate::types::{DataSet, Schema};
172
173    fn disabled() -> IngestionError {
174        IngestionError::SchemaMismatch {
175            message: "db ingestion is disabled; enable Cargo feature 'db_connectorx'".to_string(),
176        }
177    }
178
179    pub fn ingest_from_db(
180        _conn: &str,
181        _query: &str,
182        _schema: &Schema,
183        _options: &super::IngestionOptions,
184    ) -> IngestionResult<DataSet> {
185        Err(disabled())
186    }
187
188    pub fn ingest_from_db_infer(
189        _conn: &str,
190        _query: &str,
191        _options: &super::IngestionOptions,
192    ) -> IngestionResult<DataSet> {
193        Err(disabled())
194    }
195}
196pub mod observability;
197pub(crate) mod polars_bridge;
198pub mod unified;
199pub mod watermark;
200
201pub use builder::IngestionOptionsBuilder;
202pub use delta_lake::{delta_table_uri, write_dataset_to_delta_table};
203pub use file_transfer::{
204    file_transfer_scheme, ingest_from_file_transfer_uri, is_file_transfer_uri,
205};
206pub use object_store::{export_dataset_to_object_store_uri, ingest_from_object_store_uri};
207pub use observability::{
208    CompositeObserver, FileObserver, IngestionContext, IngestionObserver, IngestionSeverity,
209    IngestionStats, StdErrObserver,
210};
211pub use partition::{
212    PartitionSegment, PartitionedFile, discover_hive_partitioned_files,
213    hive_segments_for_relative_parent, parse_partition_segment, paths_from_directory_scan,
214    paths_from_explicit_list, paths_from_glob,
215};
216pub use snowflake::{copy_into_table_from_stage, write_dataset_to_snowflake_stage};
217pub use unified::{
218    ExcelSheetSelection, IngestionFormat, IngestionOptions, IngestionRequest,
219    OrderedBatchIngestMetadata, export_dataset_to_arrow_ipc, export_dataset_to_parquet,
220    export_dataset_to_xml, infer_schema_from_path, ingest_from_ordered_paths, ingest_from_path,
221    ingest_from_path_infer,
222};
223pub use watermark::{
224    apply_watermark_after_ingest, apply_watermark_filter, max_value_in_column,
225    validate_watermark_config,
226};
227
228pub use db::{ingest_from_db, ingest_from_db_infer};