Skip to main content

rust_data_processing/validation/
mod.rs

1//! Validation (Phase 1).
2//!
3//! A small validation DSL that compiles checks to Polars expressions (via our pipeline) while keeping
4//! the public API in crate-owned types.
5//!
6//! ## Example
7//!
8//! ```rust
9//! use rust_data_processing::validation::{validate_dataset, Check, Severity, ValidationSpec};
10//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
11//!
12//! # fn main() -> Result<(), rust_data_processing::IngestionError> {
13//! let ds = DataSet::new(
14//!     Schema::new(vec![
15//!         Field::new("id", DataType::Int64),
16//!         Field::new("name", DataType::Utf8),
17//!     ]),
18//!     vec![
19//!         vec![Value::Int64(1), Value::Utf8("Ada".to_string())],
20//!         vec![Value::Int64(2), Value::Null],
21//!     ],
22//! );
23//!
24//! let spec = ValidationSpec::new(vec![
25//!     Check::NotNull { column: "name".to_string(), severity: Severity::Error },
26//! ]);
27//! let rep = validate_dataset(&ds, &spec)?;
28//! assert_eq!(rep.summary.failed_checks, 1);
29//! # Ok(())
30//! # }
31//! ```
32
33use crate::error::{IngestionError, IngestionResult};
34use crate::pipeline::DataFrame;
35use crate::types::{DataSet, Value};
36
37use polars::prelude::*;
38
39/// Severity for a validation check.
40#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
41pub enum Severity {
42    Info,
43    Warn,
44    Error,
45}
46
47/// A single validation check.
48#[derive(Debug, Clone, PartialEq)]
49pub enum Check {
50    NotNull {
51        column: String,
52        severity: Severity,
53    },
54    RangeF64 {
55        column: String,
56        min: f64,
57        max: f64,
58        severity: Severity,
59    },
60    RegexMatch {
61        column: String,
62        pattern: String,
63        severity: Severity,
64        /// If true, invalid regex patterns become errors; if false, invalid regex evaluates to false.
65        strict: bool,
66    },
67    InSet {
68        column: String,
69        values: Vec<Value>,
70        severity: Severity,
71    },
72    Unique {
73        column: String,
74        severity: Severity,
75    },
76    /// UTF-8 string length in **Unicode scalar values** (Rust `char` count), nulls ignored for the length expr (null rows fail as “not in range” if you need strictness, combine with `NotNull`).
77    Utf8LenCharsBetween {
78        column: String,
79        min_chars: u32,
80        max_chars: u32,
81        severity: Severity,
82    },
83}
84
85/// A collection of checks.
86#[derive(Debug, Clone, PartialEq)]
87pub struct ValidationSpec {
88    pub checks: Vec<Check>,
89    /// Maximum number of example values to include for failing checks.
90    pub max_examples: usize,
91}
92
93impl ValidationSpec {
94    pub fn new(checks: Vec<Check>) -> Self {
95        Self {
96            checks,
97            max_examples: 5,
98        }
99    }
100}
101
102#[derive(Debug, Clone, PartialEq)]
103pub struct ValidationSummary {
104    pub total_checks: usize,
105    pub failed_checks: usize,
106    pub max_severity: Option<Severity>,
107}
108
109#[derive(Debug, Clone, PartialEq)]
110pub struct CheckResult {
111    pub check: Check,
112    pub failed_count: usize,
113    pub examples: Vec<Value>,
114    pub message: String,
115}
116
117#[derive(Debug, Clone, PartialEq)]
118pub struct ValidationReport {
119    pub results: Vec<CheckResult>,
120    pub summary: ValidationSummary,
121}
122
123pub fn validate_dataset(ds: &DataSet, spec: &ValidationSpec) -> IngestionResult<ValidationReport> {
124    let df = DataFrame::from_dataset(ds)?;
125    validate_frame(&df, spec)
126}
127
128pub fn validate_frame(df: &DataFrame, spec: &ValidationSpec) -> IngestionResult<ValidationReport> {
129    if spec.checks.is_empty() {
130        return Ok(ValidationReport {
131            results: Vec::new(),
132            summary: ValidationSummary {
133                total_checks: 0,
134                failed_checks: 0,
135                max_severity: None,
136            },
137        });
138    }
139
140    // One-shot aggregation to compute failed counts.
141    let lf = df.lazy_clone();
142    let mut exprs: Vec<Expr> = Vec::with_capacity(spec.checks.len());
143
144    for (i, chk) in spec.checks.iter().enumerate() {
145        exprs.push(fail_count_expr(chk).alias(fail_count_col_name(i)));
146    }
147
148    let agg = lf.select(exprs).collect().map_err(|e| {
149        crate::ingestion::polars_bridge::polars_error_to_ingestion(
150            "failed to compute validation counts",
151            e,
152        )
153    })?;
154
155    let mut results: Vec<CheckResult> = Vec::with_capacity(spec.checks.len());
156    let mut failed_checks = 0usize;
157    let mut max_sev: Option<Severity> = None;
158
159    for (i, chk) in spec.checks.iter().cloned().enumerate() {
160        let col = agg.column(&fail_count_col_name(i)).map_err(|e| {
161            crate::ingestion::polars_bridge::polars_error_to_ingestion(
162                "validation missing agg column",
163                e,
164            )
165        })?;
166        let failed_count = series_to_usize(col.as_materialized_series())?.unwrap_or(0);
167
168        if failed_count > 0 {
169            failed_checks += 1;
170            let sev = severity_of(&chk);
171            max_sev = Some(max_sev.map(|s| s.max(sev)).unwrap_or(sev));
172        }
173
174        let examples = if failed_count > 0 && spec.max_examples > 0 {
175            collect_examples(df, &chk, spec.max_examples).unwrap_or_default()
176        } else {
177            Vec::new()
178        };
179
180        results.push(CheckResult {
181            message: default_message(&chk, failed_count),
182            check: chk,
183            failed_count,
184            examples,
185        });
186    }
187
188    Ok(ValidationReport {
189        summary: ValidationSummary {
190            total_checks: spec.checks.len(),
191            failed_checks,
192            max_severity: max_sev,
193        },
194        results,
195    })
196}
197
198pub fn render_validation_report_json(rep: &ValidationReport) -> IngestionResult<String> {
199    let results: Vec<serde_json::Value> = rep
200        .results
201        .iter()
202        .map(|r| {
203            serde_json::json!({
204                "check": format!("{:?}", r.check),
205                "failed_count": r.failed_count,
206                "examples": r.examples.iter().map(value_to_json).collect::<Vec<_>>(),
207                "message": r.message,
208            })
209        })
210        .collect();
211
212    serde_json::to_string_pretty(&serde_json::json!({
213        "summary": {
214            "total_checks": rep.summary.total_checks,
215            "failed_checks": rep.summary.failed_checks,
216            "max_severity": rep.summary.max_severity.map(|s| format!("{s:?}")),
217        },
218        "results": results,
219    }))
220    .map_err(|e| IngestionError::SchemaMismatch {
221        message: format!("failed to serialize validation report json: {e}"),
222    })
223}
224
225pub fn render_validation_report_markdown(rep: &ValidationReport) -> String {
226    let mut out = String::new();
227    out.push_str("## Validation report\n\n");
228    out.push_str(&format!(
229        "- Total checks: **{}**\n- Failed checks: **{}**\n\n",
230        rep.summary.total_checks, rep.summary.failed_checks
231    ));
232
233    out.push_str("### Results\n\n");
234    for r in &rep.results {
235        let status = if r.failed_count == 0 { "PASS" } else { "FAIL" };
236        out.push_str(&format!("- **{status}**: `{:?}`\n", r.check));
237        out.push_str(&format!("  - Failed: **{}**\n", r.failed_count));
238        out.push_str(&format!("  - Message: {}\n", r.message));
239        if !r.examples.is_empty() {
240            out.push_str("  - Examples:\n");
241            for ex in &r.examples {
242                out.push_str(&format!("    - `{ex:?}`\n"));
243            }
244        }
245    }
246    out
247}
248
249fn fail_count_col_name(i: usize) -> String {
250    format!("__fail_{i}")
251}
252
253fn severity_of(chk: &Check) -> Severity {
254    match chk {
255        Check::NotNull { severity, .. }
256        | Check::RangeF64 { severity, .. }
257        | Check::RegexMatch { severity, .. }
258        | Check::InSet { severity, .. }
259        | Check::Unique { severity, .. }
260        | Check::Utf8LenCharsBetween { severity, .. } => *severity,
261    }
262}
263
264fn default_message(chk: &Check, failed: usize) -> String {
265    match chk {
266        Check::NotNull { column, .. } => format!("column '{column}' has {failed} null(s)"),
267        Check::RangeF64 {
268            column, min, max, ..
269        } => {
270            format!("column '{column}' has {failed} value(s) outside [{min}, {max}]")
271        }
272        Check::RegexMatch {
273            column, pattern, ..
274        } => {
275            format!("column '{column}' has {failed} value(s) not matching /{pattern}/")
276        }
277        Check::InSet { column, .. } => {
278            format!("column '{column}' has {failed} value(s) not in set")
279        }
280        Check::Unique { column, .. } => {
281            format!("column '{column}' has {failed} duplicate(s) among non-null values")
282        }
283        Check::Utf8LenCharsBetween {
284            column,
285            min_chars,
286            max_chars,
287            ..
288        } => {
289            format!(
290                "column '{column}' has {failed} value(s) whose UTF-8 length is outside [{min_chars}, {max_chars}] Unicode scalars"
291            )
292        }
293    }
294}
295
296fn fail_count_expr(chk: &Check) -> Expr {
297    match chk {
298        Check::NotNull { column, .. } => col(column).is_null().sum(),
299        Check::RangeF64 {
300            column, min, max, ..
301        } => (col(column).lt(lit(*min)).or(col(column).gt(lit(*max)))).sum(),
302        Check::RegexMatch {
303            column,
304            pattern,
305            strict,
306            ..
307        } => col(column)
308            .cast(DataType::String)
309            .str()
310            .contains(lit(pattern.clone()), *strict)
311            .not()
312            .sum(),
313        Check::InSet { column, values, .. } => {
314            let set_expr = lit(values_to_series(values));
315            col(column).is_in(set_expr, false).not().sum()
316        }
317        Check::Unique { column, .. } => {
318            // duplicates among non-null: non_null_count - unique_count
319            let non_null = col(column).is_not_null().sum();
320            let unique = col(column).drop_nulls().n_unique();
321            (non_null - unique).alias("__dup")
322        }
323        Check::Utf8LenCharsBetween {
324            column,
325            min_chars,
326            max_chars,
327            ..
328        } => {
329            let len = col(column)
330                .cast(DataType::String)
331                .str()
332                .len_chars()
333                .fill_null(lit(0u32));
334            (len.clone().lt(lit(*min_chars)).or(len.gt(lit(*max_chars)))).sum()
335        }
336    }
337}
338
339fn values_to_series(values: &[Value]) -> Series {
340    // We deliberately keep this minimal: enforce all values are same primitive type.
341    if values.is_empty() {
342        return Series::new("set".into(), Vec::<i64>::new());
343    }
344    match &values[0] {
345        Value::Int64(_) => {
346            let mut v: Vec<i64> = Vec::with_capacity(values.len());
347            for x in values {
348                if let Value::Int64(i) = x {
349                    v.push(*i);
350                }
351            }
352            Series::new("set".into(), v)
353        }
354        Value::Bool(_) => {
355            let mut v: Vec<bool> = Vec::with_capacity(values.len());
356            for x in values {
357                if let Value::Bool(b) = x {
358                    v.push(*b);
359                }
360            }
361            Series::new("set".into(), v)
362        }
363        Value::Utf8(_) => {
364            let mut v: Vec<String> = Vec::with_capacity(values.len());
365            for x in values {
366                if let Value::Utf8(s) = x {
367                    v.push(s.clone());
368                }
369            }
370            Series::new("set".into(), v)
371        }
372        Value::Float64(_) | Value::Null => Series::new("set".into(), Vec::<String>::new()),
373    }
374}
375
376fn series_to_usize(s: &Series) -> IngestionResult<Option<usize>> {
377    let av = s.get(0).map_err(|e| IngestionError::Engine {
378        message: "failed to read validation value".to_string(),
379        source: Box::new(e),
380    })?;
381    Ok(match av {
382        AnyValue::Null => None,
383        AnyValue::Int64(v) => Some(v.max(0) as usize),
384        AnyValue::UInt64(v) => Some(v as usize),
385        AnyValue::Int32(v) => Some((v as i64).max(0) as usize),
386        AnyValue::UInt32(v) => Some(v as usize),
387        other => {
388            return Err(IngestionError::SchemaMismatch {
389                message: format!("expected integer-like validation value, got {other}"),
390            });
391        }
392    })
393}
394
395fn collect_examples(
396    df: &DataFrame,
397    chk: &Check,
398    max_examples: usize,
399) -> IngestionResult<Vec<Value>> {
400    let mut lf = df.lazy_clone();
401    let (col_name, predicate) = match chk {
402        Check::NotNull { column, .. } => (column.as_str(), col(column).is_null()),
403        Check::RangeF64 {
404            column, min, max, ..
405        } => (
406            column.as_str(),
407            col(column).lt(lit(*min)).or(col(column).gt(lit(*max))),
408        ),
409        Check::RegexMatch {
410            column,
411            pattern,
412            strict,
413            ..
414        } => (
415            column.as_str(),
416            col(column)
417                .cast(DataType::String)
418                .str()
419                .contains(lit(pattern.clone()), *strict)
420                .not(),
421        ),
422        Check::InSet { column, values, .. } => (
423            column.as_str(),
424            col(column)
425                .is_in(lit(values_to_series(values)), false)
426                .not(),
427        ),
428        Check::Unique { .. } => return Ok(Vec::new()), // examples for duplicates would require group-by; skip in Phase 1
429        Check::Utf8LenCharsBetween {
430            column,
431            min_chars,
432            max_chars,
433            ..
434        } => {
435            let len = col(column)
436                .cast(DataType::String)
437                .str()
438                .len_chars()
439                .fill_null(lit(0u32));
440            (
441                column.as_str(),
442                len.clone().lt(lit(*min_chars)).or(len.gt(lit(*max_chars))),
443            )
444        }
445    };
446
447    lf = lf
448        .filter(predicate)
449        .select([col(col_name)])
450        .limit(max_examples as IdxSize);
451    let out = lf.collect().map_err(|e| {
452        crate::ingestion::polars_bridge::polars_error_to_ingestion(
453            "failed to collect validation examples",
454            e,
455        )
456    })?;
457
458    let s = out
459        .column(col_name)
460        .map_err(|e| {
461            crate::ingestion::polars_bridge::polars_error_to_ingestion(
462                "missing validation example column",
463                e,
464            )
465        })?
466        .as_materialized_series()
467        .clone();
468
469    let mut ex = Vec::new();
470    for i in 0..usize::min(max_examples, s.len()) {
471        let v = s.get(i).map_err(|e| IngestionError::Engine {
472            message: "failed to read validation example".to_string(),
473            source: Box::new(e),
474        })?;
475        ex.push(any_to_value(v));
476    }
477    Ok(ex)
478}
479
480fn any_to_value(v: AnyValue) -> Value {
481    match v {
482        AnyValue::Null => Value::Null,
483        AnyValue::Boolean(b) => Value::Bool(b),
484        AnyValue::Int64(i) => Value::Int64(i),
485        AnyValue::Float64(x) => Value::Float64(x),
486        AnyValue::String(s) => Value::Utf8(s.to_string()),
487        AnyValue::StringOwned(s) => Value::Utf8(s.to_string()),
488        other => Value::Utf8(other.to_string()),
489    }
490}
491
492fn value_to_json(v: &Value) -> serde_json::Value {
493    match v {
494        Value::Null => serde_json::Value::Null,
495        Value::Int64(i) => serde_json::json!(i),
496        Value::Float64(x) => serde_json::json!(x),
497        Value::Bool(b) => serde_json::json!(b),
498        Value::Utf8(s) => serde_json::json!(s),
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505    use crate::types::{DataType, Field, Schema};
506
507    fn sample() -> DataSet {
508        DataSet::new(
509            Schema::new(vec![
510                Field::new("id", DataType::Int64),
511                Field::new("name", DataType::Utf8),
512                Field::new("score", DataType::Float64),
513            ]),
514            vec![
515                vec![
516                    Value::Int64(1),
517                    Value::Utf8("Ada".to_string()),
518                    Value::Float64(10.0),
519                ],
520                vec![Value::Int64(2), Value::Null, Value::Float64(200.0)],
521                vec![
522                    Value::Int64(2),
523                    Value::Utf8("Bob".to_string()),
524                    Value::Float64(5.0),
525                ],
526            ],
527        )
528    }
529
530    #[test]
531    fn validation_counts_failures_and_renders_reports() {
532        let ds = sample();
533        let spec = ValidationSpec::new(vec![
534            Check::NotNull {
535                column: "name".to_string(),
536                severity: Severity::Error,
537            },
538            Check::RangeF64 {
539                column: "score".to_string(),
540                min: 0.0,
541                max: 100.0,
542                severity: Severity::Warn,
543            },
544            Check::Unique {
545                column: "id".to_string(),
546                severity: Severity::Error,
547            },
548        ]);
549
550        let rep = validate_dataset(&ds, &spec).unwrap();
551        assert_eq!(rep.summary.total_checks, 3);
552        assert!(rep.summary.failed_checks >= 1);
553
554        let json = render_validation_report_json(&rep).unwrap();
555        assert!(json.contains("\"results\""));
556
557        let md = render_validation_report_markdown(&rep);
558        assert!(md.contains("## Validation report"));
559    }
560
561    #[test]
562    fn utf8_len_chars_between_flags_too_short_and_too_long() {
563        let ds = DataSet::new(
564            Schema::new(vec![Field::new("code", DataType::Utf8)]),
565            vec![
566                vec![Value::Utf8("ab".into())],
567                vec![Value::Utf8("abcd".into())],
568                vec![Value::Utf8("abcdef".into())],
569            ],
570        );
571        let spec = ValidationSpec::new(vec![Check::Utf8LenCharsBetween {
572            column: "code".into(),
573            min_chars: 3,
574            max_chars: 5,
575            severity: Severity::Error,
576        }]);
577        let rep = validate_dataset(&ds, &spec).unwrap();
578        assert!(rep.summary.failed_checks >= 1);
579    }
580}