Skip to main content

rust_data_processing/ingestion/
watermark.rs

1//! High-water / incremental row filter applied **after** ingest (file or DB).
2//!
3//! When [`super::IngestionOptions::watermark_column`] and
4//! [`super::IngestionOptions::watermark_exclusive_above`] are both set, only rows where the
5//! watermark column is **strictly greater than** the high-water value are kept. Rows with a null
6//! in that column are dropped.
7
8use std::cmp::Ordering;
9
10use crate::error::{IngestionError, IngestionResult};
11use crate::types::{DataSet, DataType, Schema, Value};
12
13use super::IngestionOptions;
14
15/// Ensure watermark options are consistent with `schema` and with each other.
16pub fn validate_watermark_config(
17    schema: &Schema,
18    options: &IngestionOptions,
19) -> IngestionResult<()> {
20    let col = &options.watermark_column;
21    let floor = &options.watermark_exclusive_above;
22    match (col.as_ref(), floor.as_ref()) {
23        (None, None) => Ok(()),
24        (Some(_), None) | (None, Some(_)) => Err(IngestionError::SchemaMismatch {
25            message:
26                "watermark_column and watermark_exclusive_above must both be set or both omitted"
27                    .to_string(),
28        }),
29        (Some(name), Some(floor_val)) => {
30            if matches!(floor_val, Value::Null) {
31                return Err(IngestionError::SchemaMismatch {
32                    message: "watermark_exclusive_above must not be Null".to_string(),
33                });
34            }
35            let idx = schema
36                .index_of(name)
37                .ok_or_else(|| IngestionError::SchemaMismatch {
38                    message: format!("watermark column '{name}' not found in schema"),
39                })?;
40            let field = &schema.fields[idx];
41            ensure_value_matches_type(floor_val, &field.data_type, "watermark_exclusive_above")?;
42            Ok(())
43        }
44    }
45}
46
47fn ensure_value_matches_type(v: &Value, dt: &DataType, ctx: &str) -> IngestionResult<()> {
48    let ok = match dt {
49        DataType::Int64 => matches!(v, Value::Int64(_)),
50        DataType::Float64 => matches!(v, Value::Float64(_)),
51        DataType::Bool => matches!(v, Value::Bool(_)),
52        DataType::Utf8 => matches!(v, Value::Utf8(_)),
53    };
54    if ok {
55        Ok(())
56    } else {
57        Err(IngestionError::SchemaMismatch {
58            message: format!(
59                "{ctx} does not match the watermark column type ({dt:?})",
60                dt = dt
61            ),
62        })
63    }
64}
65
66/// Keep only rows where `column` compares **strictly greater than** `floor` (per column [`DataType`]).
67pub fn apply_watermark_filter(
68    ds: DataSet,
69    schema: &Schema,
70    column: &str,
71    floor: &Value,
72) -> IngestionResult<DataSet> {
73    let idx = schema
74        .index_of(column)
75        .ok_or_else(|| IngestionError::SchemaMismatch {
76            message: format!("watermark column '{column}' not found in schema"),
77        })?;
78    let dt = &schema.fields[idx].data_type;
79
80    let mut kept = Vec::with_capacity(ds.rows.len());
81    for (row_i0, row) in ds.rows.iter().enumerate() {
82        let user_row = row_i0 + 1;
83        let cell = &row[idx];
84        if row_is_above_watermark(cell, floor, dt, user_row, column)? {
85            kept.push(row.clone());
86        }
87    }
88
89    Ok(DataSet::new(ds.schema, kept))
90}
91
92/// Apply watermark filtering when options request it (call after [`validate_watermark_config`]).
93pub fn apply_watermark_after_ingest(
94    ds: DataSet,
95    schema: &Schema,
96    options: &IngestionOptions,
97) -> IngestionResult<DataSet> {
98    match (
99        &options.watermark_column,
100        &options.watermark_exclusive_above,
101    ) {
102        (None, None) => Ok(ds),
103        (Some(col), Some(floor)) => apply_watermark_filter(ds, schema, col, floor),
104        _ => Err(IngestionError::SchemaMismatch {
105            message: "invalid watermark options state".to_string(),
106        }),
107    }
108}
109
110fn row_is_above_watermark(
111    cell: &Value,
112    floor: &Value,
113    dt: &DataType,
114    row: usize,
115    column: &str,
116) -> IngestionResult<bool> {
117    if matches!(cell, Value::Null) {
118        return Ok(false);
119    }
120    let ord = compare_cell_to_floor(cell, floor, dt, row, column)?;
121    Ok(ord == Ordering::Greater)
122}
123
124fn compare_cell_to_floor(
125    cell: &Value,
126    floor: &Value,
127    dt: &DataType,
128    row: usize,
129    column: &str,
130) -> IngestionResult<Ordering> {
131    match dt {
132        DataType::Int64 => {
133            let a = expect_int64(cell, row, column)?;
134            let b = match floor {
135                Value::Int64(v) => *v,
136                _ => {
137                    return Err(IngestionError::SchemaMismatch {
138                        message: "watermark value type mismatch (expected int64)".to_string(),
139                    });
140                }
141            };
142            Ok(a.cmp(&b))
143        }
144        DataType::Float64 => {
145            let a = expect_float64(cell, row, column)?;
146            let b = match floor {
147                Value::Float64(v) => *v,
148                _ => {
149                    return Err(IngestionError::SchemaMismatch {
150                        message: "watermark value type mismatch (expected float64)".to_string(),
151                    });
152                }
153            };
154            Ok(a.total_cmp(&b))
155        }
156        DataType::Bool => {
157            let a = expect_bool(cell, row, column)?;
158            let b = match floor {
159                Value::Bool(v) => *v,
160                _ => {
161                    return Err(IngestionError::SchemaMismatch {
162                        message: "watermark value type mismatch (expected bool)".to_string(),
163                    });
164                }
165            };
166            Ok(a.cmp(&b))
167        }
168        DataType::Utf8 => {
169            let a = match cell {
170                Value::Utf8(s) => s.as_str(),
171                _ => {
172                    return Err(IngestionError::ParseError {
173                        row,
174                        column: column.to_string(),
175                        raw: format!("{cell:?}"),
176                        message: "expected utf8 for watermark column".to_string(),
177                    });
178                }
179            };
180            let b = match floor {
181                Value::Utf8(s) => s.as_str(),
182                _ => {
183                    return Err(IngestionError::SchemaMismatch {
184                        message: "watermark value type mismatch (expected utf8)".to_string(),
185                    });
186                }
187            };
188            Ok(a.cmp(b))
189        }
190    }
191}
192
193fn expect_int64(v: &Value, row: usize, column: &str) -> IngestionResult<i64> {
194    match v {
195        Value::Int64(i) => Ok(*i),
196        _ => Err(IngestionError::ParseError {
197            row,
198            column: column.to_string(),
199            raw: format!("{v:?}"),
200            message: "expected int64 for watermark column".to_string(),
201        }),
202    }
203}
204
205fn expect_float64(v: &Value, row: usize, column: &str) -> IngestionResult<f64> {
206    match v {
207        Value::Float64(f) => Ok(*f),
208        _ => Err(IngestionError::ParseError {
209            row,
210            column: column.to_string(),
211            raw: format!("{v:?}"),
212            message: "expected float64 for watermark column".to_string(),
213        }),
214    }
215}
216
217fn expect_bool(v: &Value, row: usize, column: &str) -> IngestionResult<bool> {
218    match v {
219        Value::Bool(b) => Ok(*b),
220        _ => Err(IngestionError::ParseError {
221            row,
222            column: column.to_string(),
223            raw: format!("{v:?}"),
224            message: "expected bool for watermark column".to_string(),
225        }),
226    }
227}
228
229/// Maximum value in `column` over **non-null** cells (ordering matches the column [`DataType`]:
230/// `Int64` / `Bool` / `Utf8` use [`Ord`]; `Float64` uses IEEE total order via [`f64::total_cmp`]).
231///
232/// Returns `None` if the column is missing, there are no rows, or every value in that column is
233/// null. Non-finite floats are ignored.
234pub fn max_value_in_column(ds: &DataSet, schema: &Schema, column: &str) -> Option<Value> {
235    let idx = schema.index_of(column)?;
236    let dt = &schema.fields[idx].data_type;
237    let mut best: Option<Value> = None;
238    for row in &ds.rows {
239        let cell = &row[idx];
240        if matches!(cell, Value::Null) {
241            continue;
242        }
243        if matches!(dt, DataType::Float64) {
244            if let Value::Float64(f) = cell {
245                if !f.is_finite() {
246                    continue;
247                }
248            }
249        }
250        best = Some(match &best {
251            None => cell.clone(),
252            Some(cur) => max_of_typed(dt, cur, cell),
253        });
254    }
255    best
256}
257
258fn max_of_typed(dt: &DataType, a: &Value, b: &Value) -> Value {
259    match dt {
260        DataType::Int64 => {
261            let ai = match a {
262                Value::Int64(i) => *i,
263                _ => return b.clone(),
264            };
265            let bi = match b {
266                Value::Int64(i) => *i,
267                _ => return a.clone(),
268            };
269            Value::Int64(ai.max(bi))
270        }
271        DataType::Float64 => {
272            let af = match a {
273                Value::Float64(f) => *f,
274                _ => return b.clone(),
275            };
276            let bf = match b {
277                Value::Float64(f) => *f,
278                _ => return a.clone(),
279            };
280            if !af.is_finite() {
281                return b.clone();
282            }
283            if !bf.is_finite() {
284                return a.clone();
285            }
286            Value::Float64(if af.total_cmp(&bf) == Ordering::Less {
287                bf
288            } else {
289                af
290            })
291        }
292        DataType::Bool => {
293            let ab = match a {
294                Value::Bool(x) => *x,
295                _ => return b.clone(),
296            };
297            let bb = match b {
298                Value::Bool(x) => *x,
299                _ => return a.clone(),
300            };
301            Value::Bool(ab.max(bb))
302        }
303        DataType::Utf8 => match (a, b) {
304            (Value::Utf8(sa), Value::Utf8(sb)) => {
305                if sb.as_str() > sa.as_str() {
306                    b.clone()
307                } else {
308                    a.clone()
309                }
310            }
311            _ => b.clone(),
312        },
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319    use crate::types::Field;
320
321    fn ts_schema() -> Schema {
322        Schema::new(vec![
323            Field::new("id", DataType::Int64),
324            Field::new("ts", DataType::Int64),
325        ])
326    }
327
328    #[test]
329    fn filter_keeps_strictly_greater() {
330        let schema = ts_schema();
331        let ds = DataSet::new(
332            schema.clone(),
333            vec![
334                vec![Value::Int64(1), Value::Int64(100)],
335                vec![Value::Int64(2), Value::Int64(101)],
336            ],
337        );
338        let out = apply_watermark_filter(ds, &schema, "ts", &Value::Int64(100)).unwrap();
339        assert_eq!(out.row_count(), 1);
340        assert_eq!(out.rows[0][0], Value::Int64(2));
341    }
342
343    #[test]
344    fn filter_empty_when_none_above() {
345        let schema = ts_schema();
346        let ds = DataSet::new(
347            schema.clone(),
348            vec![vec![Value::Int64(1), Value::Int64(10)]],
349        );
350        let out = apply_watermark_filter(ds, &schema, "ts", &Value::Int64(99)).unwrap();
351        assert_eq!(out.row_count(), 0);
352    }
353
354    #[test]
355    fn max_value_in_column_int64_skips_null() {
356        let schema = ts_schema();
357        let ds = DataSet::new(
358            schema.clone(),
359            vec![
360                vec![Value::Int64(1), Value::Int64(100)],
361                vec![Value::Int64(2), Value::Null],
362                vec![Value::Int64(3), Value::Int64(50)],
363            ],
364        );
365        assert_eq!(
366            max_value_in_column(&ds, &schema, "ts"),
367            Some(Value::Int64(100))
368        );
369    }
370}