1use crate::error::{IngestionError, IngestionResult};
34use crate::pipeline::DataFrame;
35use crate::types::{DataSet, Value};
36
37use polars::prelude::*;
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
41pub enum Severity {
42 Info,
43 Warn,
44 Error,
45}
46
47#[derive(Debug, Clone, PartialEq)]
49pub enum Check {
50 NotNull {
51 column: String,
52 severity: Severity,
53 },
54 RangeF64 {
55 column: String,
56 min: f64,
57 max: f64,
58 severity: Severity,
59 },
60 RegexMatch {
61 column: String,
62 pattern: String,
63 severity: Severity,
64 strict: bool,
66 },
67 InSet {
68 column: String,
69 values: Vec<Value>,
70 severity: Severity,
71 },
72 Unique {
73 column: String,
74 severity: Severity,
75 },
76 Utf8LenCharsBetween {
78 column: String,
79 min_chars: u32,
80 max_chars: u32,
81 severity: Severity,
82 },
83}
84
85#[derive(Debug, Clone, PartialEq)]
87pub struct ValidationSpec {
88 pub checks: Vec<Check>,
89 pub max_examples: usize,
91}
92
93impl ValidationSpec {
94 pub fn new(checks: Vec<Check>) -> Self {
95 Self {
96 checks,
97 max_examples: 5,
98 }
99 }
100}
101
102#[derive(Debug, Clone, PartialEq)]
103pub struct ValidationSummary {
104 pub total_checks: usize,
105 pub failed_checks: usize,
106 pub max_severity: Option<Severity>,
107}
108
109#[derive(Debug, Clone, PartialEq)]
110pub struct CheckResult {
111 pub check: Check,
112 pub failed_count: usize,
113 pub examples: Vec<Value>,
114 pub message: String,
115}
116
117#[derive(Debug, Clone, PartialEq)]
118pub struct ValidationReport {
119 pub results: Vec<CheckResult>,
120 pub summary: ValidationSummary,
121}
122
123pub fn validate_dataset(ds: &DataSet, spec: &ValidationSpec) -> IngestionResult<ValidationReport> {
124 let df = DataFrame::from_dataset(ds)?;
125 validate_frame(&df, spec)
126}
127
128pub fn validate_frame(df: &DataFrame, spec: &ValidationSpec) -> IngestionResult<ValidationReport> {
129 if spec.checks.is_empty() {
130 return Ok(ValidationReport {
131 results: Vec::new(),
132 summary: ValidationSummary {
133 total_checks: 0,
134 failed_checks: 0,
135 max_severity: None,
136 },
137 });
138 }
139
140 let lf = df.lazy_clone();
142 let mut exprs: Vec<Expr> = Vec::with_capacity(spec.checks.len());
143
144 for (i, chk) in spec.checks.iter().enumerate() {
145 exprs.push(fail_count_expr(chk).alias(fail_count_col_name(i)));
146 }
147
148 let agg = lf.select(exprs).collect().map_err(|e| {
149 crate::ingestion::polars_bridge::polars_error_to_ingestion(
150 "failed to compute validation counts",
151 e,
152 )
153 })?;
154
155 let mut results: Vec<CheckResult> = Vec::with_capacity(spec.checks.len());
156 let mut failed_checks = 0usize;
157 let mut max_sev: Option<Severity> = None;
158
159 for (i, chk) in spec.checks.iter().cloned().enumerate() {
160 let col = agg.column(&fail_count_col_name(i)).map_err(|e| {
161 crate::ingestion::polars_bridge::polars_error_to_ingestion(
162 "validation missing agg column",
163 e,
164 )
165 })?;
166 let failed_count = series_to_usize(col.as_materialized_series())?.unwrap_or(0);
167
168 if failed_count > 0 {
169 failed_checks += 1;
170 let sev = severity_of(&chk);
171 max_sev = Some(max_sev.map(|s| s.max(sev)).unwrap_or(sev));
172 }
173
174 let examples = if failed_count > 0 && spec.max_examples > 0 {
175 collect_examples(df, &chk, spec.max_examples).unwrap_or_default()
176 } else {
177 Vec::new()
178 };
179
180 results.push(CheckResult {
181 message: default_message(&chk, failed_count),
182 check: chk,
183 failed_count,
184 examples,
185 });
186 }
187
188 Ok(ValidationReport {
189 summary: ValidationSummary {
190 total_checks: spec.checks.len(),
191 failed_checks,
192 max_severity: max_sev,
193 },
194 results,
195 })
196}
197
198pub fn render_validation_report_json(rep: &ValidationReport) -> IngestionResult<String> {
199 let results: Vec<serde_json::Value> = rep
200 .results
201 .iter()
202 .map(|r| {
203 serde_json::json!({
204 "check": format!("{:?}", r.check),
205 "failed_count": r.failed_count,
206 "examples": r.examples.iter().map(value_to_json).collect::<Vec<_>>(),
207 "message": r.message,
208 })
209 })
210 .collect();
211
212 serde_json::to_string_pretty(&serde_json::json!({
213 "summary": {
214 "total_checks": rep.summary.total_checks,
215 "failed_checks": rep.summary.failed_checks,
216 "max_severity": rep.summary.max_severity.map(|s| format!("{s:?}")),
217 },
218 "results": results,
219 }))
220 .map_err(|e| IngestionError::SchemaMismatch {
221 message: format!("failed to serialize validation report json: {e}"),
222 })
223}
224
225pub fn render_validation_report_markdown(rep: &ValidationReport) -> String {
226 let mut out = String::new();
227 out.push_str("## Validation report\n\n");
228 out.push_str(&format!(
229 "- Total checks: **{}**\n- Failed checks: **{}**\n\n",
230 rep.summary.total_checks, rep.summary.failed_checks
231 ));
232
233 out.push_str("### Results\n\n");
234 for r in &rep.results {
235 let status = if r.failed_count == 0 { "PASS" } else { "FAIL" };
236 out.push_str(&format!("- **{status}**: `{:?}`\n", r.check));
237 out.push_str(&format!(" - Failed: **{}**\n", r.failed_count));
238 out.push_str(&format!(" - Message: {}\n", r.message));
239 if !r.examples.is_empty() {
240 out.push_str(" - Examples:\n");
241 for ex in &r.examples {
242 out.push_str(&format!(" - `{ex:?}`\n"));
243 }
244 }
245 }
246 out
247}
248
249fn fail_count_col_name(i: usize) -> String {
250 format!("__fail_{i}")
251}
252
253fn severity_of(chk: &Check) -> Severity {
254 match chk {
255 Check::NotNull { severity, .. }
256 | Check::RangeF64 { severity, .. }
257 | Check::RegexMatch { severity, .. }
258 | Check::InSet { severity, .. }
259 | Check::Unique { severity, .. }
260 | Check::Utf8LenCharsBetween { severity, .. } => *severity,
261 }
262}
263
264fn default_message(chk: &Check, failed: usize) -> String {
265 match chk {
266 Check::NotNull { column, .. } => format!("column '{column}' has {failed} null(s)"),
267 Check::RangeF64 {
268 column, min, max, ..
269 } => {
270 format!("column '{column}' has {failed} value(s) outside [{min}, {max}]")
271 }
272 Check::RegexMatch {
273 column, pattern, ..
274 } => {
275 format!("column '{column}' has {failed} value(s) not matching /{pattern}/")
276 }
277 Check::InSet { column, .. } => {
278 format!("column '{column}' has {failed} value(s) not in set")
279 }
280 Check::Unique { column, .. } => {
281 format!("column '{column}' has {failed} duplicate(s) among non-null values")
282 }
283 Check::Utf8LenCharsBetween {
284 column,
285 min_chars,
286 max_chars,
287 ..
288 } => {
289 format!(
290 "column '{column}' has {failed} value(s) whose UTF-8 length is outside [{min_chars}, {max_chars}] Unicode scalars"
291 )
292 }
293 }
294}
295
296fn fail_count_expr(chk: &Check) -> Expr {
297 match chk {
298 Check::NotNull { column, .. } => col(column).is_null().sum(),
299 Check::RangeF64 {
300 column, min, max, ..
301 } => (col(column).lt(lit(*min)).or(col(column).gt(lit(*max)))).sum(),
302 Check::RegexMatch {
303 column,
304 pattern,
305 strict,
306 ..
307 } => col(column)
308 .cast(DataType::String)
309 .str()
310 .contains(lit(pattern.clone()), *strict)
311 .not()
312 .sum(),
313 Check::InSet { column, values, .. } => {
314 let set_expr = lit(values_to_series(values));
315 col(column).is_in(set_expr, false).not().sum()
316 }
317 Check::Unique { column, .. } => {
318 let non_null = col(column).is_not_null().sum();
320 let unique = col(column).drop_nulls().n_unique();
321 (non_null - unique).alias("__dup")
322 }
323 Check::Utf8LenCharsBetween {
324 column,
325 min_chars,
326 max_chars,
327 ..
328 } => {
329 let len = col(column)
330 .cast(DataType::String)
331 .str()
332 .len_chars()
333 .fill_null(lit(0u32));
334 (len.clone().lt(lit(*min_chars)).or(len.gt(lit(*max_chars)))).sum()
335 }
336 }
337}
338
339fn values_to_series(values: &[Value]) -> Series {
340 if values.is_empty() {
342 return Series::new("set".into(), Vec::<i64>::new());
343 }
344 match &values[0] {
345 Value::Int64(_) => {
346 let mut v: Vec<i64> = Vec::with_capacity(values.len());
347 for x in values {
348 if let Value::Int64(i) = x {
349 v.push(*i);
350 }
351 }
352 Series::new("set".into(), v)
353 }
354 Value::Bool(_) => {
355 let mut v: Vec<bool> = Vec::with_capacity(values.len());
356 for x in values {
357 if let Value::Bool(b) = x {
358 v.push(*b);
359 }
360 }
361 Series::new("set".into(), v)
362 }
363 Value::Utf8(_) => {
364 let mut v: Vec<String> = Vec::with_capacity(values.len());
365 for x in values {
366 if let Value::Utf8(s) = x {
367 v.push(s.clone());
368 }
369 }
370 Series::new("set".into(), v)
371 }
372 Value::Float64(_) | Value::Null => Series::new("set".into(), Vec::<String>::new()),
373 }
374}
375
376fn series_to_usize(s: &Series) -> IngestionResult<Option<usize>> {
377 let av = s.get(0).map_err(|e| IngestionError::Engine {
378 message: "failed to read validation value".to_string(),
379 source: Box::new(e),
380 })?;
381 Ok(match av {
382 AnyValue::Null => None,
383 AnyValue::Int64(v) => Some(v.max(0) as usize),
384 AnyValue::UInt64(v) => Some(v as usize),
385 AnyValue::Int32(v) => Some((v as i64).max(0) as usize),
386 AnyValue::UInt32(v) => Some(v as usize),
387 other => {
388 return Err(IngestionError::SchemaMismatch {
389 message: format!("expected integer-like validation value, got {other}"),
390 });
391 }
392 })
393}
394
395fn collect_examples(
396 df: &DataFrame,
397 chk: &Check,
398 max_examples: usize,
399) -> IngestionResult<Vec<Value>> {
400 let mut lf = df.lazy_clone();
401 let (col_name, predicate) = match chk {
402 Check::NotNull { column, .. } => (column.as_str(), col(column).is_null()),
403 Check::RangeF64 {
404 column, min, max, ..
405 } => (
406 column.as_str(),
407 col(column).lt(lit(*min)).or(col(column).gt(lit(*max))),
408 ),
409 Check::RegexMatch {
410 column,
411 pattern,
412 strict,
413 ..
414 } => (
415 column.as_str(),
416 col(column)
417 .cast(DataType::String)
418 .str()
419 .contains(lit(pattern.clone()), *strict)
420 .not(),
421 ),
422 Check::InSet { column, values, .. } => (
423 column.as_str(),
424 col(column)
425 .is_in(lit(values_to_series(values)), false)
426 .not(),
427 ),
428 Check::Unique { .. } => return Ok(Vec::new()), Check::Utf8LenCharsBetween {
430 column,
431 min_chars,
432 max_chars,
433 ..
434 } => {
435 let len = col(column)
436 .cast(DataType::String)
437 .str()
438 .len_chars()
439 .fill_null(lit(0u32));
440 (
441 column.as_str(),
442 len.clone().lt(lit(*min_chars)).or(len.gt(lit(*max_chars))),
443 )
444 }
445 };
446
447 lf = lf
448 .filter(predicate)
449 .select([col(col_name)])
450 .limit(max_examples as IdxSize);
451 let out = lf.collect().map_err(|e| {
452 crate::ingestion::polars_bridge::polars_error_to_ingestion(
453 "failed to collect validation examples",
454 e,
455 )
456 })?;
457
458 let s = out
459 .column(col_name)
460 .map_err(|e| {
461 crate::ingestion::polars_bridge::polars_error_to_ingestion(
462 "missing validation example column",
463 e,
464 )
465 })?
466 .as_materialized_series()
467 .clone();
468
469 let mut ex = Vec::new();
470 for i in 0..usize::min(max_examples, s.len()) {
471 let v = s.get(i).map_err(|e| IngestionError::Engine {
472 message: "failed to read validation example".to_string(),
473 source: Box::new(e),
474 })?;
475 ex.push(any_to_value(v));
476 }
477 Ok(ex)
478}
479
480fn any_to_value(v: AnyValue) -> Value {
481 match v {
482 AnyValue::Null => Value::Null,
483 AnyValue::Boolean(b) => Value::Bool(b),
484 AnyValue::Int64(i) => Value::Int64(i),
485 AnyValue::Float64(x) => Value::Float64(x),
486 AnyValue::String(s) => Value::Utf8(s.to_string()),
487 AnyValue::StringOwned(s) => Value::Utf8(s.to_string()),
488 other => Value::Utf8(other.to_string()),
489 }
490}
491
492fn value_to_json(v: &Value) -> serde_json::Value {
493 match v {
494 Value::Null => serde_json::Value::Null,
495 Value::Int64(i) => serde_json::json!(i),
496 Value::Float64(x) => serde_json::json!(x),
497 Value::Bool(b) => serde_json::json!(b),
498 Value::Utf8(s) => serde_json::json!(s),
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505 use crate::types::{DataType, Field, Schema};
506
507 fn sample() -> DataSet {
508 DataSet::new(
509 Schema::new(vec![
510 Field::new("id", DataType::Int64),
511 Field::new("name", DataType::Utf8),
512 Field::new("score", DataType::Float64),
513 ]),
514 vec![
515 vec![
516 Value::Int64(1),
517 Value::Utf8("Ada".to_string()),
518 Value::Float64(10.0),
519 ],
520 vec![Value::Int64(2), Value::Null, Value::Float64(200.0)],
521 vec![
522 Value::Int64(2),
523 Value::Utf8("Bob".to_string()),
524 Value::Float64(5.0),
525 ],
526 ],
527 )
528 }
529
530 #[test]
531 fn validation_counts_failures_and_renders_reports() {
532 let ds = sample();
533 let spec = ValidationSpec::new(vec![
534 Check::NotNull {
535 column: "name".to_string(),
536 severity: Severity::Error,
537 },
538 Check::RangeF64 {
539 column: "score".to_string(),
540 min: 0.0,
541 max: 100.0,
542 severity: Severity::Warn,
543 },
544 Check::Unique {
545 column: "id".to_string(),
546 severity: Severity::Error,
547 },
548 ]);
549
550 let rep = validate_dataset(&ds, &spec).unwrap();
551 assert_eq!(rep.summary.total_checks, 3);
552 assert!(rep.summary.failed_checks >= 1);
553
554 let json = render_validation_report_json(&rep).unwrap();
555 assert!(json.contains("\"results\""));
556
557 let md = render_validation_report_markdown(&rep);
558 assert!(md.contains("## Validation report"));
559 }
560
561 #[test]
562 fn utf8_len_chars_between_flags_too_short_and_too_long() {
563 let ds = DataSet::new(
564 Schema::new(vec![Field::new("code", DataType::Utf8)]),
565 vec![
566 vec![Value::Utf8("ab".into())],
567 vec![Value::Utf8("abcd".into())],
568 vec![Value::Utf8("abcdef".into())],
569 ],
570 );
571 let spec = ValidationSpec::new(vec![Check::Utf8LenCharsBetween {
572 column: "code".into(),
573 min_chars: 3,
574 max_chars: 5,
575 severity: Severity::Error,
576 }]);
577 let rep = validate_dataset(&ds, &spec).unwrap();
578 assert!(rep.summary.failed_checks >= 1);
579 }
580}