rust_data_processing/ingestion/
builder.rs1use std::sync::Arc;
2
3use crate::error::IngestionResult;
4use crate::types::{DataSet, Schema, Value};
5
6use super::observability::IngestionObserver;
7use super::observability::IngestionSeverity;
8use super::unified::{
9 self, ExcelSheetSelection, IngestionFormat, IngestionOptions, OrderedBatchIngestMetadata,
10 ingest_from_path,
11};
12
13#[derive(Debug, Clone)]
20pub struct IngestionOptionsBuilder {
21 options: IngestionOptions,
22}
23
24impl Default for IngestionOptionsBuilder {
25 fn default() -> Self {
26 Self::new()
27 }
28}
29
30impl IngestionOptionsBuilder {
31 pub fn new() -> Self {
33 Self {
34 options: IngestionOptions::default(),
35 }
36 }
37
38 pub fn format(mut self, format: IngestionFormat) -> Self {
40 self.options.format = Some(format);
41 self
42 }
43
44 pub fn excel_sheet_selection(mut self, sel: ExcelSheetSelection) -> Self {
46 self.options.excel_sheet_selection = sel;
47 self
48 }
49
50 pub fn observer(mut self, observer: Arc<dyn IngestionObserver>) -> Self {
52 self.options.observer = Some(observer);
53 self
54 }
55
56 pub fn alert_at_or_above(mut self, sev: IngestionSeverity) -> Self {
58 self.options.alert_at_or_above = sev;
59 self
60 }
61
62 pub fn watermark_column(mut self, column: impl Into<String>) -> Self {
64 self.options.watermark_column = Some(column.into());
65 self
66 }
67
68 pub fn watermark_exclusive_above(mut self, v: Value) -> Self {
70 self.options.watermark_exclusive_above = Some(v);
71 self
72 }
73
74 pub fn build(self) -> IngestionOptions {
76 self.options
77 }
78
79 pub fn ingest_from_path(
81 self,
82 path: impl AsRef<std::path::Path>,
83 schema: &Schema,
84 ) -> IngestionResult<DataSet> {
85 let opts = self.build();
86 ingest_from_path(path, schema, &opts)
87 }
88
89 pub fn ingest_from_ordered_paths(
91 self,
92 paths: &[std::path::PathBuf],
93 schema: &Schema,
94 ) -> IngestionResult<(DataSet, OrderedBatchIngestMetadata)> {
95 let opts = self.build();
96 unified::ingest_from_ordered_paths(paths, schema, &opts)
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::IngestionOptionsBuilder;
103 use crate::ingestion::{
104 ExcelSheetSelection, IngestionFormat, IngestionOptions, IngestionSeverity,
105 };
106
107 #[test]
108 fn builder_defaults_match_ingestion_options_default() {
109 let built = IngestionOptionsBuilder::new().build();
110 let direct = IngestionOptions::default();
111
112 assert_eq!(built.format, direct.format);
113 assert_eq!(built.excel_sheet_selection, direct.excel_sheet_selection);
114 assert_eq!(built.alert_at_or_above, direct.alert_at_or_above);
115 assert_eq!(built.observer.is_some(), direct.observer.is_some());
116 assert_eq!(built.watermark_column, direct.watermark_column);
117 assert_eq!(
118 built.watermark_exclusive_above,
119 direct.watermark_exclusive_above
120 );
121 }
122
123 #[test]
124 fn builder_sets_fields() {
125 use crate::types::Value;
126
127 let built = IngestionOptionsBuilder::new()
128 .format(IngestionFormat::Csv)
129 .excel_sheet_selection(ExcelSheetSelection::AllSheets)
130 .alert_at_or_above(IngestionSeverity::Error)
131 .watermark_column("ts")
132 .watermark_exclusive_above(Value::Int64(0))
133 .build();
134
135 assert_eq!(built.format, Some(IngestionFormat::Csv));
136 assert_eq!(built.excel_sheet_selection, ExcelSheetSelection::AllSheets);
137 assert_eq!(built.alert_at_or_above, IngestionSeverity::Error);
138 assert_eq!(built.watermark_column.as_deref(), Some("ts"));
139 assert_eq!(built.watermark_exclusive_above, Some(Value::Int64(0)));
140 }
141}