rust_data_processing/ingestion/
mod.rs1pub mod builder;
24pub mod csv;
25#[cfg(feature = "excel")]
26pub mod excel;
27#[cfg(not(feature = "excel"))]
28pub mod excel {
29 use std::path::Path;
35
36 use crate::error::{IngestionError, IngestionResult};
37 use crate::types::{DataSet, Schema};
38
39 fn disabled() -> IngestionError {
40 IngestionError::SchemaMismatch {
41 message: "excel ingestion is disabled; enable Cargo feature 'excel'".to_string(),
42 }
43 }
44
45 pub fn ingest_excel_from_path(
46 _path: impl AsRef<Path>,
47 _sheet_name: Option<&str>,
48 _schema: &Schema,
49 ) -> IngestionResult<DataSet> {
50 Err(disabled())
51 }
52
53 pub fn ingest_excel_workbook_from_path(
54 _path: impl AsRef<Path>,
55 _sheet_names: Option<&[&str]>,
56 _schema: &Schema,
57 ) -> IngestionResult<DataSet> {
58 Err(disabled())
59 }
60
61 pub fn infer_excel_schema_from_path(
62 _path: impl AsRef<Path>,
63 _sheet_name: Option<&str>,
64 ) -> IngestionResult<Schema> {
65 Err(disabled())
66 }
67
68 pub fn infer_excel_schema_workbook_from_path(
69 _path: impl AsRef<Path>,
70 _sheet_names: Option<&[&str]>,
71 ) -> IngestionResult<Schema> {
72 Err(disabled())
73 }
74}
75#[cfg(feature = "file_transfer")]
76pub mod file_transfer;
77#[cfg(not(feature = "file_transfer"))]
78pub mod file_transfer {
79 use super::IngestionOptions;
80 use crate::error::{IngestionError, IngestionResult};
81 use crate::types::{DataSet, Schema};
82
83 fn disabled() -> IngestionError {
84 IngestionError::SchemaMismatch {
85 message: "file_transfer support disabled; enable Cargo feature 'file_transfer' or 'cloud_connectors'"
86 .to_string(),
87 }
88 }
89
90 pub fn is_file_transfer_uri(_uri: &str) -> bool {
91 false
92 }
93
94 pub fn file_transfer_scheme(_uri: &str) -> Option<&'static str> {
95 None
96 }
97
98 pub fn ingest_from_file_transfer_uri(
99 _uri: &str,
100 _schema: &Schema,
101 _options: &IngestionOptions,
102 ) -> IngestionResult<DataSet> {
103 Err(disabled())
104 }
105}
106
107#[cfg(feature = "object_store")]
108pub mod object_store;
109#[cfg(not(feature = "object_store"))]
110pub mod object_store {
111 use super::IngestionOptions;
112 use crate::error::{IngestionError, IngestionResult};
113 use crate::types::{DataSet, Schema};
114
115 fn disabled() -> IngestionError {
116 IngestionError::SchemaMismatch {
117 message: "object_store support disabled; enable Cargo feature 'object_store'"
118 .to_string(),
119 }
120 }
121
122 pub fn ingest_from_object_store_uri(
123 _uri: &str,
124 _schema: &Schema,
125 _options: &IngestionOptions,
126 ) -> IngestionResult<DataSet> {
127 Err(disabled())
128 }
129
130 pub fn export_dataset_to_object_store_uri(_uri: &str, _ds: &DataSet) -> IngestionResult<()> {
131 Err(disabled())
132 }
133}
134
135#[cfg(feature = "delta_lake")]
136pub mod delta_lake;
137#[cfg(not(feature = "delta_lake"))]
138pub mod delta_lake {
139 use crate::error::{IngestionError, IngestionResult};
140 use crate::types::DataSet;
141
142 pub fn delta_table_uri(warehouse: &str, namespace: Option<&str>, table: &str) -> String {
143 let base = warehouse.trim_end_matches('/');
144 match namespace.filter(|n| !n.is_empty()) {
145 Some(ns) => format!("{base}/{ns}/{table}"),
146 None => format!("{base}/{table}"),
147 }
148 }
149
150 pub fn write_dataset_to_delta_table(_table_uri: &str, _ds: &DataSet) -> IngestionResult<usize> {
151 Err(IngestionError::SchemaMismatch {
152 message: "delta_lake support disabled; enable Cargo feature 'delta_lake'".to_string(),
153 })
154 }
155}
156
157#[cfg(feature = "db_connectorx")]
158pub mod db;
159pub mod json;
160pub mod parquet;
161pub mod partition;
162pub mod snowflake;
163pub mod xml;
164#[cfg(not(feature = "db_connectorx"))]
165pub mod db {
166 use crate::error::{IngestionError, IngestionResult};
171 use crate::types::{DataSet, Schema};
172
173 fn disabled() -> IngestionError {
174 IngestionError::SchemaMismatch {
175 message: "db ingestion is disabled; enable Cargo feature 'db_connectorx'".to_string(),
176 }
177 }
178
179 pub fn ingest_from_db(
180 _conn: &str,
181 _query: &str,
182 _schema: &Schema,
183 _options: &super::IngestionOptions,
184 ) -> IngestionResult<DataSet> {
185 Err(disabled())
186 }
187
188 pub fn ingest_from_db_infer(
189 _conn: &str,
190 _query: &str,
191 _options: &super::IngestionOptions,
192 ) -> IngestionResult<DataSet> {
193 Err(disabled())
194 }
195}
196pub mod observability;
197pub(crate) mod polars_bridge;
198pub mod unified;
199pub mod watermark;
200
201pub use builder::IngestionOptionsBuilder;
202pub use delta_lake::{delta_table_uri, write_dataset_to_delta_table};
203pub use file_transfer::{
204 file_transfer_scheme, ingest_from_file_transfer_uri, is_file_transfer_uri,
205};
206pub use object_store::{export_dataset_to_object_store_uri, ingest_from_object_store_uri};
207pub use observability::{
208 CompositeObserver, FileObserver, IngestionContext, IngestionObserver, IngestionSeverity,
209 IngestionStats, StdErrObserver,
210};
211pub use partition::{
212 PartitionSegment, PartitionedFile, discover_hive_partitioned_files,
213 hive_segments_for_relative_parent, parse_partition_segment, paths_from_directory_scan,
214 paths_from_explicit_list, paths_from_glob,
215};
216pub use snowflake::{copy_into_table_from_stage, write_dataset_to_snowflake_stage};
217pub use unified::{
218 ExcelSheetSelection, IngestionFormat, IngestionOptions, IngestionRequest,
219 OrderedBatchIngestMetadata, export_dataset_to_arrow_ipc, export_dataset_to_parquet,
220 export_dataset_to_xml, infer_schema_from_path, ingest_from_ordered_paths, ingest_from_path,
221 ingest_from_path_infer,
222};
223pub use watermark::{
224 apply_watermark_after_ingest, apply_watermark_filter, max_value_in_column,
225 validate_watermark_config,
226};
227
228pub use db::{ingest_from_db, ingest_from_db_infer};