Skip to main content

rust_data_processing/ingestion/
builder.rs

1use std::sync::Arc;
2
3use crate::error::IngestionResult;
4use crate::types::{DataSet, Schema, Value};
5
6use super::observability::IngestionObserver;
7use super::observability::IngestionSeverity;
8use super::unified::{
9    self, ExcelSheetSelection, IngestionFormat, IngestionOptions, OrderedBatchIngestMetadata,
10    ingest_from_path,
11};
12
13/// Builder for [`IngestionOptions`].
14///
15/// Prefer this over constructing [`IngestionOptions`] directly when you want to:
16/// - avoid long struct literals in user code
17/// - keep configuration engine-agnostic (no Polars/DataFusion types leak into signatures)
18/// - lean on sensible defaults and override only what you need
19#[derive(Debug, Clone)]
20pub struct IngestionOptionsBuilder {
21    options: IngestionOptions,
22}
23
24impl Default for IngestionOptionsBuilder {
25    fn default() -> Self {
26        Self::new()
27    }
28}
29
30impl IngestionOptionsBuilder {
31    /// Create a builder with [`IngestionOptions::default`] values.
32    pub fn new() -> Self {
33        Self {
34            options: IngestionOptions::default(),
35        }
36    }
37
38    /// Force a specific ingestion format (otherwise inferred from path extension).
39    pub fn format(mut self, format: IngestionFormat) -> Self {
40        self.options.format = Some(format);
41        self
42    }
43
44    /// Configure Excel sheet selection.
45    pub fn excel_sheet_selection(mut self, sel: ExcelSheetSelection) -> Self {
46        self.options.excel_sheet_selection = sel;
47        self
48    }
49
50    /// Configure an observer for success/failure/alerts.
51    pub fn observer(mut self, observer: Arc<dyn IngestionObserver>) -> Self {
52        self.options.observer = Some(observer);
53        self
54    }
55
56    /// Configure the severity threshold at which `on_alert` is invoked.
57    pub fn alert_at_or_above(mut self, sev: IngestionSeverity) -> Self {
58        self.options.alert_at_or_above = sev;
59        self
60    }
61
62    /// High-water column for incremental loads (use with [`Self::watermark_exclusive_above`]).
63    pub fn watermark_column(mut self, column: impl Into<String>) -> Self {
64        self.options.watermark_column = Some(column.into());
65        self
66    }
67
68    /// Keep rows strictly greater than this value on `watermark_column` (after ingest).
69    pub fn watermark_exclusive_above(mut self, v: Value) -> Self {
70        self.options.watermark_exclusive_above = Some(v);
71        self
72    }
73
74    /// Build the configured [`IngestionOptions`].
75    pub fn build(self) -> IngestionOptions {
76        self.options
77    }
78
79    /// Convenience: ingest using the configured options.
80    pub fn ingest_from_path(
81        self,
82        path: impl AsRef<std::path::Path>,
83        schema: &Schema,
84    ) -> IngestionResult<DataSet> {
85        let opts = self.build();
86        ingest_from_path(path, schema, &opts)
87    }
88
89    /// Convenience: ordered multi-file ingest (see [`unified::ingest_from_ordered_paths`]).
90    pub fn ingest_from_ordered_paths(
91        self,
92        paths: &[std::path::PathBuf],
93        schema: &Schema,
94    ) -> IngestionResult<(DataSet, OrderedBatchIngestMetadata)> {
95        let opts = self.build();
96        unified::ingest_from_ordered_paths(paths, schema, &opts)
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::IngestionOptionsBuilder;
103    use crate::ingestion::{
104        ExcelSheetSelection, IngestionFormat, IngestionOptions, IngestionSeverity,
105    };
106
107    #[test]
108    fn builder_defaults_match_ingestion_options_default() {
109        let built = IngestionOptionsBuilder::new().build();
110        let direct = IngestionOptions::default();
111
112        assert_eq!(built.format, direct.format);
113        assert_eq!(built.excel_sheet_selection, direct.excel_sheet_selection);
114        assert_eq!(built.alert_at_or_above, direct.alert_at_or_above);
115        assert_eq!(built.observer.is_some(), direct.observer.is_some());
116        assert_eq!(built.watermark_column, direct.watermark_column);
117        assert_eq!(
118            built.watermark_exclusive_above,
119            direct.watermark_exclusive_above
120        );
121    }
122
123    #[test]
124    fn builder_sets_fields() {
125        use crate::types::Value;
126
127        let built = IngestionOptionsBuilder::new()
128            .format(IngestionFormat::Csv)
129            .excel_sheet_selection(ExcelSheetSelection::AllSheets)
130            .alert_at_or_above(IngestionSeverity::Error)
131            .watermark_column("ts")
132            .watermark_exclusive_above(Value::Int64(0))
133            .build();
134
135        assert_eq!(built.format, Some(IngestionFormat::Csv));
136        assert_eq!(built.excel_sheet_selection, ExcelSheetSelection::AllSheets);
137        assert_eq!(built.alert_at_or_above, IngestionSeverity::Error);
138        assert_eq!(built.watermark_column.as_deref(), Some("ts"));
139        assert_eq!(built.watermark_exclusive_above, Some(Value::Int64(0)));
140    }
141}