1use std::cmp::Ordering;
9
10use crate::error::{IngestionError, IngestionResult};
11use crate::types::{DataSet, DataType, Schema, Value};
12
13use super::IngestionOptions;
14
15pub fn validate_watermark_config(
17 schema: &Schema,
18 options: &IngestionOptions,
19) -> IngestionResult<()> {
20 let col = &options.watermark_column;
21 let floor = &options.watermark_exclusive_above;
22 match (col.as_ref(), floor.as_ref()) {
23 (None, None) => Ok(()),
24 (Some(_), None) | (None, Some(_)) => Err(IngestionError::SchemaMismatch {
25 message:
26 "watermark_column and watermark_exclusive_above must both be set or both omitted"
27 .to_string(),
28 }),
29 (Some(name), Some(floor_val)) => {
30 if matches!(floor_val, Value::Null) {
31 return Err(IngestionError::SchemaMismatch {
32 message: "watermark_exclusive_above must not be Null".to_string(),
33 });
34 }
35 let idx = schema
36 .index_of(name)
37 .ok_or_else(|| IngestionError::SchemaMismatch {
38 message: format!("watermark column '{name}' not found in schema"),
39 })?;
40 let field = &schema.fields[idx];
41 ensure_value_matches_type(floor_val, &field.data_type, "watermark_exclusive_above")?;
42 Ok(())
43 }
44 }
45}
46
47fn ensure_value_matches_type(v: &Value, dt: &DataType, ctx: &str) -> IngestionResult<()> {
48 let ok = match dt {
49 DataType::Int64 => matches!(v, Value::Int64(_)),
50 DataType::Float64 => matches!(v, Value::Float64(_)),
51 DataType::Bool => matches!(v, Value::Bool(_)),
52 DataType::Utf8 => matches!(v, Value::Utf8(_)),
53 };
54 if ok {
55 Ok(())
56 } else {
57 Err(IngestionError::SchemaMismatch {
58 message: format!(
59 "{ctx} does not match the watermark column type ({dt:?})",
60 dt = dt
61 ),
62 })
63 }
64}
65
66pub fn apply_watermark_filter(
68 ds: DataSet,
69 schema: &Schema,
70 column: &str,
71 floor: &Value,
72) -> IngestionResult<DataSet> {
73 let idx = schema
74 .index_of(column)
75 .ok_or_else(|| IngestionError::SchemaMismatch {
76 message: format!("watermark column '{column}' not found in schema"),
77 })?;
78 let dt = &schema.fields[idx].data_type;
79
80 let mut kept = Vec::with_capacity(ds.rows.len());
81 for (row_i0, row) in ds.rows.iter().enumerate() {
82 let user_row = row_i0 + 1;
83 let cell = &row[idx];
84 if row_is_above_watermark(cell, floor, dt, user_row, column)? {
85 kept.push(row.clone());
86 }
87 }
88
89 Ok(DataSet::new(ds.schema, kept))
90}
91
92pub fn apply_watermark_after_ingest(
94 ds: DataSet,
95 schema: &Schema,
96 options: &IngestionOptions,
97) -> IngestionResult<DataSet> {
98 match (
99 &options.watermark_column,
100 &options.watermark_exclusive_above,
101 ) {
102 (None, None) => Ok(ds),
103 (Some(col), Some(floor)) => apply_watermark_filter(ds, schema, col, floor),
104 _ => Err(IngestionError::SchemaMismatch {
105 message: "invalid watermark options state".to_string(),
106 }),
107 }
108}
109
110fn row_is_above_watermark(
111 cell: &Value,
112 floor: &Value,
113 dt: &DataType,
114 row: usize,
115 column: &str,
116) -> IngestionResult<bool> {
117 if matches!(cell, Value::Null) {
118 return Ok(false);
119 }
120 let ord = compare_cell_to_floor(cell, floor, dt, row, column)?;
121 Ok(ord == Ordering::Greater)
122}
123
124fn compare_cell_to_floor(
125 cell: &Value,
126 floor: &Value,
127 dt: &DataType,
128 row: usize,
129 column: &str,
130) -> IngestionResult<Ordering> {
131 match dt {
132 DataType::Int64 => {
133 let a = expect_int64(cell, row, column)?;
134 let b = match floor {
135 Value::Int64(v) => *v,
136 _ => {
137 return Err(IngestionError::SchemaMismatch {
138 message: "watermark value type mismatch (expected int64)".to_string(),
139 });
140 }
141 };
142 Ok(a.cmp(&b))
143 }
144 DataType::Float64 => {
145 let a = expect_float64(cell, row, column)?;
146 let b = match floor {
147 Value::Float64(v) => *v,
148 _ => {
149 return Err(IngestionError::SchemaMismatch {
150 message: "watermark value type mismatch (expected float64)".to_string(),
151 });
152 }
153 };
154 Ok(a.total_cmp(&b))
155 }
156 DataType::Bool => {
157 let a = expect_bool(cell, row, column)?;
158 let b = match floor {
159 Value::Bool(v) => *v,
160 _ => {
161 return Err(IngestionError::SchemaMismatch {
162 message: "watermark value type mismatch (expected bool)".to_string(),
163 });
164 }
165 };
166 Ok(a.cmp(&b))
167 }
168 DataType::Utf8 => {
169 let a = match cell {
170 Value::Utf8(s) => s.as_str(),
171 _ => {
172 return Err(IngestionError::ParseError {
173 row,
174 column: column.to_string(),
175 raw: format!("{cell:?}"),
176 message: "expected utf8 for watermark column".to_string(),
177 });
178 }
179 };
180 let b = match floor {
181 Value::Utf8(s) => s.as_str(),
182 _ => {
183 return Err(IngestionError::SchemaMismatch {
184 message: "watermark value type mismatch (expected utf8)".to_string(),
185 });
186 }
187 };
188 Ok(a.cmp(b))
189 }
190 }
191}
192
193fn expect_int64(v: &Value, row: usize, column: &str) -> IngestionResult<i64> {
194 match v {
195 Value::Int64(i) => Ok(*i),
196 _ => Err(IngestionError::ParseError {
197 row,
198 column: column.to_string(),
199 raw: format!("{v:?}"),
200 message: "expected int64 for watermark column".to_string(),
201 }),
202 }
203}
204
205fn expect_float64(v: &Value, row: usize, column: &str) -> IngestionResult<f64> {
206 match v {
207 Value::Float64(f) => Ok(*f),
208 _ => Err(IngestionError::ParseError {
209 row,
210 column: column.to_string(),
211 raw: format!("{v:?}"),
212 message: "expected float64 for watermark column".to_string(),
213 }),
214 }
215}
216
217fn expect_bool(v: &Value, row: usize, column: &str) -> IngestionResult<bool> {
218 match v {
219 Value::Bool(b) => Ok(*b),
220 _ => Err(IngestionError::ParseError {
221 row,
222 column: column.to_string(),
223 raw: format!("{v:?}"),
224 message: "expected bool for watermark column".to_string(),
225 }),
226 }
227}
228
229pub fn max_value_in_column(ds: &DataSet, schema: &Schema, column: &str) -> Option<Value> {
235 let idx = schema.index_of(column)?;
236 let dt = &schema.fields[idx].data_type;
237 let mut best: Option<Value> = None;
238 for row in &ds.rows {
239 let cell = &row[idx];
240 if matches!(cell, Value::Null) {
241 continue;
242 }
243 if matches!(dt, DataType::Float64) {
244 if let Value::Float64(f) = cell {
245 if !f.is_finite() {
246 continue;
247 }
248 }
249 }
250 best = Some(match &best {
251 None => cell.clone(),
252 Some(cur) => max_of_typed(dt, cur, cell),
253 });
254 }
255 best
256}
257
258fn max_of_typed(dt: &DataType, a: &Value, b: &Value) -> Value {
259 match dt {
260 DataType::Int64 => {
261 let ai = match a {
262 Value::Int64(i) => *i,
263 _ => return b.clone(),
264 };
265 let bi = match b {
266 Value::Int64(i) => *i,
267 _ => return a.clone(),
268 };
269 Value::Int64(ai.max(bi))
270 }
271 DataType::Float64 => {
272 let af = match a {
273 Value::Float64(f) => *f,
274 _ => return b.clone(),
275 };
276 let bf = match b {
277 Value::Float64(f) => *f,
278 _ => return a.clone(),
279 };
280 if !af.is_finite() {
281 return b.clone();
282 }
283 if !bf.is_finite() {
284 return a.clone();
285 }
286 Value::Float64(if af.total_cmp(&bf) == Ordering::Less {
287 bf
288 } else {
289 af
290 })
291 }
292 DataType::Bool => {
293 let ab = match a {
294 Value::Bool(x) => *x,
295 _ => return b.clone(),
296 };
297 let bb = match b {
298 Value::Bool(x) => *x,
299 _ => return a.clone(),
300 };
301 Value::Bool(ab.max(bb))
302 }
303 DataType::Utf8 => match (a, b) {
304 (Value::Utf8(sa), Value::Utf8(sb)) => {
305 if sb.as_str() > sa.as_str() {
306 b.clone()
307 } else {
308 a.clone()
309 }
310 }
311 _ => b.clone(),
312 },
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319 use crate::types::Field;
320
321 fn ts_schema() -> Schema {
322 Schema::new(vec![
323 Field::new("id", DataType::Int64),
324 Field::new("ts", DataType::Int64),
325 ])
326 }
327
328 #[test]
329 fn filter_keeps_strictly_greater() {
330 let schema = ts_schema();
331 let ds = DataSet::new(
332 schema.clone(),
333 vec![
334 vec![Value::Int64(1), Value::Int64(100)],
335 vec![Value::Int64(2), Value::Int64(101)],
336 ],
337 );
338 let out = apply_watermark_filter(ds, &schema, "ts", &Value::Int64(100)).unwrap();
339 assert_eq!(out.row_count(), 1);
340 assert_eq!(out.rows[0][0], Value::Int64(2));
341 }
342
343 #[test]
344 fn filter_empty_when_none_above() {
345 let schema = ts_schema();
346 let ds = DataSet::new(
347 schema.clone(),
348 vec![vec![Value::Int64(1), Value::Int64(10)]],
349 );
350 let out = apply_watermark_filter(ds, &schema, "ts", &Value::Int64(99)).unwrap();
351 assert_eq!(out.row_count(), 0);
352 }
353
354 #[test]
355 fn max_value_in_column_int64_skips_null() {
356 let schema = ts_schema();
357 let ds = DataSet::new(
358 schema.clone(),
359 vec![
360 vec![Value::Int64(1), Value::Int64(100)],
361 vec![Value::Int64(2), Value::Null],
362 vec![Value::Int64(3), Value::Int64(50)],
363 ],
364 );
365 assert_eq!(
366 max_value_in_column(&ds, &schema, "ts"),
367 Some(Value::Int64(100))
368 );
369 }
370}