Skip to main content

rust_data_processing/profiling/
mod.rs

1//! Profiling (Phase 1).
2//!
3//! A small, engine-delegated profiler that computes common column metrics using Polars under the hood,
4//! while keeping the public API in crate-owned types.
5//!
6//! ## Example
7//!
8//! ```rust
9//! use rust_data_processing::profiling::{profile_dataset, ProfileOptions, SamplingMode};
10//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
11//!
12//! # fn main() -> Result<(), rust_data_processing::IngestionError> {
13//! let ds = DataSet::new(
14//!     Schema::new(vec![Field::new("score", DataType::Float64)]),
15//!     vec![vec![Value::Float64(1.0)], vec![Value::Null], vec![Value::Float64(3.0)]],
16//! );
17//!
18//! let rep = profile_dataset(
19//!     &ds,
20//!     &ProfileOptions {
21//!         sampling: SamplingMode::Head(2),
22//!         quantiles: vec![0.5],
23//!     },
24//! )?;
25//!
26//! assert_eq!(rep.row_count, 2);
27//! assert_eq!(rep.columns[0].null_count, 1);
28//! # Ok(())
29//! # }
30//! ```
31
32use crate::error::{IngestionError, IngestionResult};
33use crate::pipeline::DataFrame;
34use crate::types::{DataSet, DataType};
35
36use polars::prelude::*;
37
38/// How profiling should sample rows before computing metrics.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
40pub enum SamplingMode {
41    /// Profile the full dataset.
42    #[default]
43    Full,
44    /// Profile only the first N rows.
45    Head(usize),
46}
47
48/// Options for profiling.
49#[derive(Debug, Clone, PartialEq)]
50pub struct ProfileOptions {
51    pub sampling: SamplingMode,
52    /// Quantiles to compute for numeric columns (values in [0.0, 1.0]).
53    pub quantiles: Vec<f64>,
54}
55
56impl Default for ProfileOptions {
57    fn default() -> Self {
58        Self {
59            sampling: SamplingMode::Full,
60            quantiles: vec![0.5, 0.95],
61        }
62    }
63}
64
65#[derive(Debug, Clone, PartialEq)]
66pub struct NumericProfile {
67    pub min: Option<f64>,
68    pub max: Option<f64>,
69    pub mean: Option<f64>,
70    pub quantiles: Vec<(f64, Option<f64>)>,
71}
72
73#[derive(Debug, Clone, PartialEq)]
74pub struct ColumnProfile {
75    pub name: String,
76    pub data_type: DataType,
77    pub null_count: usize,
78    pub distinct_count: usize,
79    pub numeric: Option<NumericProfile>,
80}
81
82#[derive(Debug, Clone, PartialEq)]
83pub struct ProfileReport {
84    pub sampling: SamplingMode,
85    /// Row count of the profiled (possibly sampled) data.
86    pub row_count: usize,
87    pub columns: Vec<ColumnProfile>,
88}
89
90/// Render a profile report to a stable JSON string.
91pub fn render_profile_report_json(report: &ProfileReport) -> IngestionResult<String> {
92    let sampling = match report.sampling {
93        SamplingMode::Full => "full",
94        SamplingMode::Head(_) => "head",
95    };
96
97    let cols: Vec<serde_json::Value> = report
98        .columns
99        .iter()
100        .map(|c| {
101            let dtype = match c.data_type {
102                DataType::Int64 => "int64",
103                DataType::Float64 => "float64",
104                DataType::Bool => "bool",
105                DataType::Utf8 => "utf8",
106            };
107            let numeric = c.numeric.as_ref().map(|n| {
108                serde_json::json!({
109                    "min": n.min,
110                    "max": n.max,
111                    "mean": n.mean,
112                    "quantiles": n.quantiles.iter().map(|(q, v)| serde_json::json!({"q": q, "value": v})).collect::<Vec<_>>(),
113                })
114            });
115
116            serde_json::json!({
117                "name": c.name,
118                "data_type": dtype,
119                "null_count": c.null_count,
120                "distinct_count": c.distinct_count,
121                "numeric": numeric,
122            })
123        })
124        .collect();
125
126    serde_json::to_string_pretty(&serde_json::json!({
127        "sampling": sampling,
128        "row_count": report.row_count,
129        "columns": cols,
130    }))
131    .map_err(|e| IngestionError::SchemaMismatch {
132        message: format!("failed to serialize profile report json: {e}"),
133    })
134}
135
136/// Render a profile report to a human-readable Markdown string.
137pub fn render_profile_report_markdown(report: &ProfileReport) -> String {
138    let sampling = match report.sampling {
139        SamplingMode::Full => "Full",
140        SamplingMode::Head(n) => {
141            return format!(
142                "## Profile report\n\n- Sampling: **Head({n})**\n- Rows profiled: **{}**\n\n{}",
143                report.row_count,
144                render_columns_markdown(&report.columns)
145            );
146        }
147    };
148
149    format!(
150        "## Profile report\n\n- Sampling: **{sampling}**\n- Rows profiled: **{}**\n\n{}",
151        report.row_count,
152        render_columns_markdown(&report.columns)
153    )
154}
155
156fn render_columns_markdown(cols: &[ColumnProfile]) -> String {
157    let mut out = String::new();
158    out.push_str("### Columns\n\n");
159    out.push_str("| column | type | nulls | distinct (non-null) | min | max | mean |\n");
160    out.push_str("|---|---:|---:|---:|---:|---:|---:|\n");
161    for c in cols {
162        let dtype = match c.data_type {
163            DataType::Int64 => "Int64",
164            DataType::Float64 => "Float64",
165            DataType::Bool => "Bool",
166            DataType::Utf8 => "Utf8",
167        };
168        let (min, max, mean) = match &c.numeric {
169            Some(n) => (
170                n.min
171                    .map(|v| format!("{v:.4}"))
172                    .unwrap_or_else(|| "—".to_string()),
173                n.max
174                    .map(|v| format!("{v:.4}"))
175                    .unwrap_or_else(|| "—".to_string()),
176                n.mean
177                    .map(|v| format!("{v:.4}"))
178                    .unwrap_or_else(|| "—".to_string()),
179            ),
180            None => ("—".to_string(), "—".to_string(), "—".to_string()),
181        };
182        out.push_str(&format!(
183            "| `{}` | {} | {} | {} | {} | {} | {} |\n",
184            c.name, dtype, c.null_count, c.distinct_count, min, max, mean
185        ));
186    }
187    out
188}
189
190/// Profile an in-memory dataset.
191pub fn profile_dataset(ds: &DataSet, options: &ProfileOptions) -> IngestionResult<ProfileReport> {
192    let df = DataFrame::from_dataset(ds)?;
193    profile_frame(&df, options)
194}
195
196/// Profile a pipeline frame (computed lazily).
197pub fn profile_frame(df: &DataFrame, options: &ProfileOptions) -> IngestionResult<ProfileReport> {
198    let mut lf = df.lazy_clone();
199
200    lf = match options.sampling {
201        SamplingMode::Full => lf,
202        SamplingMode::Head(n) => lf.limit(n as IdxSize),
203    };
204
205    let schema = lf.clone().collect_schema().map_err(|e| {
206        crate::ingestion::polars_bridge::polars_error_to_ingestion("failed to collect schema", e)
207    })?;
208
209    let cols: Vec<(String, DataType, bool)> = schema
210        .iter_fields()
211        .map(|f| {
212            let (dt, is_numeric) = polars_dtype_to_profile_dtype(f.dtype());
213            (f.name().to_string(), dt, is_numeric)
214        })
215        .collect();
216
217    if cols.is_empty() {
218        return Ok(ProfileReport {
219            sampling: options.sampling,
220            row_count: 0,
221            columns: Vec::new(),
222        });
223    }
224
225    // Build a single-row aggregation over the (optionally sampled) LazyFrame.
226    let mut exprs: Vec<Expr> = Vec::new();
227    exprs.push(len().alias("__rows"));
228
229    for (name, _dt, is_numeric) in &cols {
230        exprs.push(col(name).null_count().alias(format!("{name}__nulls")));
231        // Distinct count excluding nulls (common profiling expectation).
232        exprs.push(
233            col(name)
234                .drop_nulls()
235                .n_unique()
236                .alias(format!("{name}__distinct")),
237        );
238        if *is_numeric {
239            exprs.push(col(name).min().alias(format!("{name}__min")));
240            exprs.push(col(name).max().alias(format!("{name}__max")));
241            exprs.push(col(name).mean().alias(format!("{name}__mean")));
242            for q in &options.quantiles {
243                if !(0.0..=1.0).contains(q) {
244                    return Err(IngestionError::SchemaMismatch {
245                        message: format!("invalid quantile {q}; expected value in [0.0, 1.0]"),
246                    });
247                }
248                let pct = (q * 100.0).round() as i64;
249                exprs.push(
250                    col(name)
251                        .quantile(lit(*q), QuantileMethod::Nearest)
252                        .alias(format!("{name}__p{pct}")),
253                );
254            }
255        }
256    }
257
258    let agg = lf.select(exprs).collect().map_err(|e| {
259        crate::ingestion::polars_bridge::polars_error_to_ingestion("failed to compute profile", e)
260    })?;
261
262    let row_count_col = agg.column("__rows").map_err(|e| {
263        crate::ingestion::polars_bridge::polars_error_to_ingestion(
264            "profiling missing __rows column",
265            e,
266        )
267    })?;
268    let row_count = any_to_usize(row_count_col.as_materialized_series(), 0)?.unwrap_or(0);
269
270    let mut out_cols: Vec<ColumnProfile> = Vec::with_capacity(cols.len());
271    for (name, dt, is_numeric) in cols {
272        let nulls_col = agg.column(&format!("{name}__nulls")).map_err(|e| {
273            crate::ingestion::polars_bridge::polars_error_to_ingestion(
274                &format!("profiling missing null_count for '{name}'"),
275                e,
276            )
277        })?;
278        let null_count = any_to_usize(nulls_col.as_materialized_series(), 0)?.unwrap_or(0);
279
280        let distinct_col = agg.column(&format!("{name}__distinct")).map_err(|e| {
281            crate::ingestion::polars_bridge::polars_error_to_ingestion(
282                &format!("profiling missing distinct_count for '{name}'"),
283                e,
284            )
285        })?;
286        let distinct_count = any_to_usize(distinct_col.as_materialized_series(), 0)?.unwrap_or(0);
287
288        let numeric = if is_numeric {
289            let min = any_to_f64(
290                agg.column(&format!("{name}__min"))
291                    .map_err(|e| {
292                        crate::ingestion::polars_bridge::polars_error_to_ingestion(
293                            "profiling missing min",
294                            e,
295                        )
296                    })?
297                    .as_materialized_series(),
298                0,
299            )?;
300            let max = any_to_f64(
301                agg.column(&format!("{name}__max"))
302                    .map_err(|e| {
303                        crate::ingestion::polars_bridge::polars_error_to_ingestion(
304                            "profiling missing max",
305                            e,
306                        )
307                    })?
308                    .as_materialized_series(),
309                0,
310            )?;
311            let mean = any_to_f64(
312                agg.column(&format!("{name}__mean"))
313                    .map_err(|e| {
314                        crate::ingestion::polars_bridge::polars_error_to_ingestion(
315                            "profiling missing mean",
316                            e,
317                        )
318                    })?
319                    .as_materialized_series(),
320                0,
321            )?;
322            let mut qs = Vec::with_capacity(options.quantiles.len());
323            for q in &options.quantiles {
324                let pct = (q * 100.0).round() as i64;
325                let v = any_to_f64(
326                    agg.column(&format!("{name}__p{pct}"))
327                        .map_err(|e| {
328                            crate::ingestion::polars_bridge::polars_error_to_ingestion(
329                                "profiling missing quantile",
330                                e,
331                            )
332                        })?
333                        .as_materialized_series(),
334                    0,
335                )?;
336                qs.push((*q, v));
337            }
338            Some(NumericProfile {
339                min,
340                max,
341                mean,
342                quantiles: qs,
343            })
344        } else {
345            None
346        };
347
348        out_cols.push(ColumnProfile {
349            name,
350            data_type: dt,
351            null_count,
352            distinct_count,
353            numeric,
354        });
355    }
356
357    Ok(ProfileReport {
358        sampling: options.sampling,
359        row_count,
360        columns: out_cols,
361    })
362}
363
364fn polars_dtype_to_profile_dtype(dt: &polars::datatypes::DataType) -> (DataType, bool) {
365    use polars::datatypes::DataType as P;
366    match dt {
367        P::Boolean => (DataType::Bool, false),
368        P::String => (DataType::Utf8, false),
369        P::Int8 | P::Int16 | P::Int32 | P::Int64 | P::UInt8 | P::UInt16 | P::UInt32 | P::UInt64 => {
370            (DataType::Int64, true)
371        }
372        P::Float32 | P::Float64 => (DataType::Float64, true),
373        _ => (DataType::Utf8, false),
374    }
375}
376
377fn any_to_usize(s: &Series, idx: usize) -> IngestionResult<Option<usize>> {
378    let av = s.get(idx).map_err(|e| IngestionError::Engine {
379        message: "failed to read profile value".to_string(),
380        source: Box::new(e),
381    })?;
382    Ok(match av {
383        AnyValue::Null => None,
384        AnyValue::Int64(v) => Some(v.max(0) as usize),
385        AnyValue::UInt64(v) => Some(v as usize),
386        AnyValue::Int32(v) => Some((v as i64).max(0) as usize),
387        AnyValue::UInt32(v) => Some(v as usize),
388        AnyValue::Int16(v) => Some((v as i64).max(0) as usize),
389        AnyValue::UInt16(v) => Some(v as usize),
390        AnyValue::Int8(v) => Some((v as i64).max(0) as usize),
391        AnyValue::UInt8(v) => Some(v as usize),
392        other => {
393            return Err(IngestionError::SchemaMismatch {
394                message: format!("expected integer-like profile value, got {other}"),
395            });
396        }
397    })
398}
399
400fn any_to_f64(s: &Series, idx: usize) -> IngestionResult<Option<f64>> {
401    let av = s.get(idx).map_err(|e| IngestionError::Engine {
402        message: "failed to read profile value".to_string(),
403        source: Box::new(e),
404    })?;
405    Ok(match av {
406        AnyValue::Null => None,
407        AnyValue::Float64(v) => Some(v),
408        AnyValue::Float32(v) => Some(v as f64),
409        AnyValue::Int64(v) => Some(v as f64),
410        AnyValue::UInt64(v) => Some(v as f64),
411        AnyValue::Int32(v) => Some(v as f64),
412        AnyValue::UInt32(v) => Some(v as f64),
413        other => {
414            return Err(IngestionError::SchemaMismatch {
415                message: format!("expected numeric profile value, got {other}"),
416            });
417        }
418    })
419}
420
421#[cfg(test)]
422mod tests {
423    use super::*;
424    use crate::types::Value;
425    use crate::types::{Field, Schema};
426
427    fn tiny() -> DataSet {
428        let schema = Schema::new(vec![
429            Field::new("id", DataType::Int64),
430            Field::new("score", DataType::Float64),
431            Field::new("name", DataType::Utf8),
432        ]);
433        DataSet::new(
434            schema,
435            vec![
436                vec![
437                    Value::Int64(1),
438                    Value::Float64(10.0),
439                    Value::Utf8("A".to_string()),
440                ],
441                vec![Value::Int64(2), Value::Null, Value::Utf8("A".to_string())],
442                vec![
443                    Value::Int64(3),
444                    Value::Float64(30.0),
445                    Value::Utf8("B".to_string()),
446                ],
447            ],
448        )
449    }
450
451    #[test]
452    fn profiling_counts_rows_nulls_and_distinct() {
453        let ds = tiny();
454        let rep = profile_dataset(&ds, &ProfileOptions::default()).unwrap();
455        assert_eq!(rep.row_count, 3);
456
457        let score = rep.columns.iter().find(|c| c.name == "score").unwrap();
458        assert_eq!(score.null_count, 1);
459        assert_eq!(score.distinct_count, 2);
460        assert!(score.numeric.is_some());
461
462        let name = rep.columns.iter().find(|c| c.name == "name").unwrap();
463        assert_eq!(name.null_count, 0);
464        assert_eq!(name.distinct_count, 2);
465        assert!(name.numeric.is_none());
466    }
467
468    #[test]
469    fn profiling_supports_head_sampling() {
470        let ds = tiny();
471        let rep = profile_dataset(
472            &ds,
473            &ProfileOptions {
474                sampling: SamplingMode::Head(2),
475                quantiles: vec![0.5],
476            },
477        )
478        .unwrap();
479        assert_eq!(rep.row_count, 2);
480    }
481
482    #[test]
483    fn profile_report_renders_json_and_markdown() {
484        let ds = tiny();
485        let rep = profile_dataset(&ds, &ProfileOptions::default()).unwrap();
486        let json = render_profile_report_json(&rep).unwrap();
487        assert!(json.contains("\"row_count\""));
488        assert!(json.contains("\"columns\""));
489
490        let md = render_profile_report_markdown(&rep);
491        assert!(md.contains("## Profile report"));
492        assert!(md.contains("### Columns"));
493    }
494}