1use crate::error::{IngestionError, IngestionResult};
33use crate::pipeline::DataFrame;
34use crate::types::{DataSet, DataType};
35
36use polars::prelude::*;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
40pub enum SamplingMode {
41 #[default]
43 Full,
44 Head(usize),
46}
47
48#[derive(Debug, Clone, PartialEq)]
50pub struct ProfileOptions {
51 pub sampling: SamplingMode,
52 pub quantiles: Vec<f64>,
54}
55
56impl Default for ProfileOptions {
57 fn default() -> Self {
58 Self {
59 sampling: SamplingMode::Full,
60 quantiles: vec![0.5, 0.95],
61 }
62 }
63}
64
65#[derive(Debug, Clone, PartialEq)]
66pub struct NumericProfile {
67 pub min: Option<f64>,
68 pub max: Option<f64>,
69 pub mean: Option<f64>,
70 pub quantiles: Vec<(f64, Option<f64>)>,
71}
72
73#[derive(Debug, Clone, PartialEq)]
74pub struct ColumnProfile {
75 pub name: String,
76 pub data_type: DataType,
77 pub null_count: usize,
78 pub distinct_count: usize,
79 pub numeric: Option<NumericProfile>,
80}
81
82#[derive(Debug, Clone, PartialEq)]
83pub struct ProfileReport {
84 pub sampling: SamplingMode,
85 pub row_count: usize,
87 pub columns: Vec<ColumnProfile>,
88}
89
90pub fn render_profile_report_json(report: &ProfileReport) -> IngestionResult<String> {
92 let sampling = match report.sampling {
93 SamplingMode::Full => "full",
94 SamplingMode::Head(_) => "head",
95 };
96
97 let cols: Vec<serde_json::Value> = report
98 .columns
99 .iter()
100 .map(|c| {
101 let dtype = match c.data_type {
102 DataType::Int64 => "int64",
103 DataType::Float64 => "float64",
104 DataType::Bool => "bool",
105 DataType::Utf8 => "utf8",
106 };
107 let numeric = c.numeric.as_ref().map(|n| {
108 serde_json::json!({
109 "min": n.min,
110 "max": n.max,
111 "mean": n.mean,
112 "quantiles": n.quantiles.iter().map(|(q, v)| serde_json::json!({"q": q, "value": v})).collect::<Vec<_>>(),
113 })
114 });
115
116 serde_json::json!({
117 "name": c.name,
118 "data_type": dtype,
119 "null_count": c.null_count,
120 "distinct_count": c.distinct_count,
121 "numeric": numeric,
122 })
123 })
124 .collect();
125
126 serde_json::to_string_pretty(&serde_json::json!({
127 "sampling": sampling,
128 "row_count": report.row_count,
129 "columns": cols,
130 }))
131 .map_err(|e| IngestionError::SchemaMismatch {
132 message: format!("failed to serialize profile report json: {e}"),
133 })
134}
135
136pub fn render_profile_report_markdown(report: &ProfileReport) -> String {
138 let sampling = match report.sampling {
139 SamplingMode::Full => "Full",
140 SamplingMode::Head(n) => {
141 return format!(
142 "## Profile report\n\n- Sampling: **Head({n})**\n- Rows profiled: **{}**\n\n{}",
143 report.row_count,
144 render_columns_markdown(&report.columns)
145 );
146 }
147 };
148
149 format!(
150 "## Profile report\n\n- Sampling: **{sampling}**\n- Rows profiled: **{}**\n\n{}",
151 report.row_count,
152 render_columns_markdown(&report.columns)
153 )
154}
155
156fn render_columns_markdown(cols: &[ColumnProfile]) -> String {
157 let mut out = String::new();
158 out.push_str("### Columns\n\n");
159 out.push_str("| column | type | nulls | distinct (non-null) | min | max | mean |\n");
160 out.push_str("|---|---:|---:|---:|---:|---:|---:|\n");
161 for c in cols {
162 let dtype = match c.data_type {
163 DataType::Int64 => "Int64",
164 DataType::Float64 => "Float64",
165 DataType::Bool => "Bool",
166 DataType::Utf8 => "Utf8",
167 };
168 let (min, max, mean) = match &c.numeric {
169 Some(n) => (
170 n.min
171 .map(|v| format!("{v:.4}"))
172 .unwrap_or_else(|| "—".to_string()),
173 n.max
174 .map(|v| format!("{v:.4}"))
175 .unwrap_or_else(|| "—".to_string()),
176 n.mean
177 .map(|v| format!("{v:.4}"))
178 .unwrap_or_else(|| "—".to_string()),
179 ),
180 None => ("—".to_string(), "—".to_string(), "—".to_string()),
181 };
182 out.push_str(&format!(
183 "| `{}` | {} | {} | {} | {} | {} | {} |\n",
184 c.name, dtype, c.null_count, c.distinct_count, min, max, mean
185 ));
186 }
187 out
188}
189
190pub fn profile_dataset(ds: &DataSet, options: &ProfileOptions) -> IngestionResult<ProfileReport> {
192 let df = DataFrame::from_dataset(ds)?;
193 profile_frame(&df, options)
194}
195
196pub fn profile_frame(df: &DataFrame, options: &ProfileOptions) -> IngestionResult<ProfileReport> {
198 let mut lf = df.lazy_clone();
199
200 lf = match options.sampling {
201 SamplingMode::Full => lf,
202 SamplingMode::Head(n) => lf.limit(n as IdxSize),
203 };
204
205 let schema = lf.clone().collect_schema().map_err(|e| {
206 crate::ingestion::polars_bridge::polars_error_to_ingestion("failed to collect schema", e)
207 })?;
208
209 let cols: Vec<(String, DataType, bool)> = schema
210 .iter_fields()
211 .map(|f| {
212 let (dt, is_numeric) = polars_dtype_to_profile_dtype(f.dtype());
213 (f.name().to_string(), dt, is_numeric)
214 })
215 .collect();
216
217 if cols.is_empty() {
218 return Ok(ProfileReport {
219 sampling: options.sampling,
220 row_count: 0,
221 columns: Vec::new(),
222 });
223 }
224
225 let mut exprs: Vec<Expr> = Vec::new();
227 exprs.push(len().alias("__rows"));
228
229 for (name, _dt, is_numeric) in &cols {
230 exprs.push(col(name).null_count().alias(format!("{name}__nulls")));
231 exprs.push(
233 col(name)
234 .drop_nulls()
235 .n_unique()
236 .alias(format!("{name}__distinct")),
237 );
238 if *is_numeric {
239 exprs.push(col(name).min().alias(format!("{name}__min")));
240 exprs.push(col(name).max().alias(format!("{name}__max")));
241 exprs.push(col(name).mean().alias(format!("{name}__mean")));
242 for q in &options.quantiles {
243 if !(0.0..=1.0).contains(q) {
244 return Err(IngestionError::SchemaMismatch {
245 message: format!("invalid quantile {q}; expected value in [0.0, 1.0]"),
246 });
247 }
248 let pct = (q * 100.0).round() as i64;
249 exprs.push(
250 col(name)
251 .quantile(lit(*q), QuantileMethod::Nearest)
252 .alias(format!("{name}__p{pct}")),
253 );
254 }
255 }
256 }
257
258 let agg = lf.select(exprs).collect().map_err(|e| {
259 crate::ingestion::polars_bridge::polars_error_to_ingestion("failed to compute profile", e)
260 })?;
261
262 let row_count_col = agg.column("__rows").map_err(|e| {
263 crate::ingestion::polars_bridge::polars_error_to_ingestion(
264 "profiling missing __rows column",
265 e,
266 )
267 })?;
268 let row_count = any_to_usize(row_count_col.as_materialized_series(), 0)?.unwrap_or(0);
269
270 let mut out_cols: Vec<ColumnProfile> = Vec::with_capacity(cols.len());
271 for (name, dt, is_numeric) in cols {
272 let nulls_col = agg.column(&format!("{name}__nulls")).map_err(|e| {
273 crate::ingestion::polars_bridge::polars_error_to_ingestion(
274 &format!("profiling missing null_count for '{name}'"),
275 e,
276 )
277 })?;
278 let null_count = any_to_usize(nulls_col.as_materialized_series(), 0)?.unwrap_or(0);
279
280 let distinct_col = agg.column(&format!("{name}__distinct")).map_err(|e| {
281 crate::ingestion::polars_bridge::polars_error_to_ingestion(
282 &format!("profiling missing distinct_count for '{name}'"),
283 e,
284 )
285 })?;
286 let distinct_count = any_to_usize(distinct_col.as_materialized_series(), 0)?.unwrap_or(0);
287
288 let numeric = if is_numeric {
289 let min = any_to_f64(
290 agg.column(&format!("{name}__min"))
291 .map_err(|e| {
292 crate::ingestion::polars_bridge::polars_error_to_ingestion(
293 "profiling missing min",
294 e,
295 )
296 })?
297 .as_materialized_series(),
298 0,
299 )?;
300 let max = any_to_f64(
301 agg.column(&format!("{name}__max"))
302 .map_err(|e| {
303 crate::ingestion::polars_bridge::polars_error_to_ingestion(
304 "profiling missing max",
305 e,
306 )
307 })?
308 .as_materialized_series(),
309 0,
310 )?;
311 let mean = any_to_f64(
312 agg.column(&format!("{name}__mean"))
313 .map_err(|e| {
314 crate::ingestion::polars_bridge::polars_error_to_ingestion(
315 "profiling missing mean",
316 e,
317 )
318 })?
319 .as_materialized_series(),
320 0,
321 )?;
322 let mut qs = Vec::with_capacity(options.quantiles.len());
323 for q in &options.quantiles {
324 let pct = (q * 100.0).round() as i64;
325 let v = any_to_f64(
326 agg.column(&format!("{name}__p{pct}"))
327 .map_err(|e| {
328 crate::ingestion::polars_bridge::polars_error_to_ingestion(
329 "profiling missing quantile",
330 e,
331 )
332 })?
333 .as_materialized_series(),
334 0,
335 )?;
336 qs.push((*q, v));
337 }
338 Some(NumericProfile {
339 min,
340 max,
341 mean,
342 quantiles: qs,
343 })
344 } else {
345 None
346 };
347
348 out_cols.push(ColumnProfile {
349 name,
350 data_type: dt,
351 null_count,
352 distinct_count,
353 numeric,
354 });
355 }
356
357 Ok(ProfileReport {
358 sampling: options.sampling,
359 row_count,
360 columns: out_cols,
361 })
362}
363
364fn polars_dtype_to_profile_dtype(dt: &polars::datatypes::DataType) -> (DataType, bool) {
365 use polars::datatypes::DataType as P;
366 match dt {
367 P::Boolean => (DataType::Bool, false),
368 P::String => (DataType::Utf8, false),
369 P::Int8 | P::Int16 | P::Int32 | P::Int64 | P::UInt8 | P::UInt16 | P::UInt32 | P::UInt64 => {
370 (DataType::Int64, true)
371 }
372 P::Float32 | P::Float64 => (DataType::Float64, true),
373 _ => (DataType::Utf8, false),
374 }
375}
376
377fn any_to_usize(s: &Series, idx: usize) -> IngestionResult<Option<usize>> {
378 let av = s.get(idx).map_err(|e| IngestionError::Engine {
379 message: "failed to read profile value".to_string(),
380 source: Box::new(e),
381 })?;
382 Ok(match av {
383 AnyValue::Null => None,
384 AnyValue::Int64(v) => Some(v.max(0) as usize),
385 AnyValue::UInt64(v) => Some(v as usize),
386 AnyValue::Int32(v) => Some((v as i64).max(0) as usize),
387 AnyValue::UInt32(v) => Some(v as usize),
388 AnyValue::Int16(v) => Some((v as i64).max(0) as usize),
389 AnyValue::UInt16(v) => Some(v as usize),
390 AnyValue::Int8(v) => Some((v as i64).max(0) as usize),
391 AnyValue::UInt8(v) => Some(v as usize),
392 other => {
393 return Err(IngestionError::SchemaMismatch {
394 message: format!("expected integer-like profile value, got {other}"),
395 });
396 }
397 })
398}
399
400fn any_to_f64(s: &Series, idx: usize) -> IngestionResult<Option<f64>> {
401 let av = s.get(idx).map_err(|e| IngestionError::Engine {
402 message: "failed to read profile value".to_string(),
403 source: Box::new(e),
404 })?;
405 Ok(match av {
406 AnyValue::Null => None,
407 AnyValue::Float64(v) => Some(v),
408 AnyValue::Float32(v) => Some(v as f64),
409 AnyValue::Int64(v) => Some(v as f64),
410 AnyValue::UInt64(v) => Some(v as f64),
411 AnyValue::Int32(v) => Some(v as f64),
412 AnyValue::UInt32(v) => Some(v as f64),
413 other => {
414 return Err(IngestionError::SchemaMismatch {
415 message: format!("expected numeric profile value, got {other}"),
416 });
417 }
418 })
419}
420
421#[cfg(test)]
422mod tests {
423 use super::*;
424 use crate::types::Value;
425 use crate::types::{Field, Schema};
426
427 fn tiny() -> DataSet {
428 let schema = Schema::new(vec![
429 Field::new("id", DataType::Int64),
430 Field::new("score", DataType::Float64),
431 Field::new("name", DataType::Utf8),
432 ]);
433 DataSet::new(
434 schema,
435 vec![
436 vec![
437 Value::Int64(1),
438 Value::Float64(10.0),
439 Value::Utf8("A".to_string()),
440 ],
441 vec![Value::Int64(2), Value::Null, Value::Utf8("A".to_string())],
442 vec![
443 Value::Int64(3),
444 Value::Float64(30.0),
445 Value::Utf8("B".to_string()),
446 ],
447 ],
448 )
449 }
450
451 #[test]
452 fn profiling_counts_rows_nulls_and_distinct() {
453 let ds = tiny();
454 let rep = profile_dataset(&ds, &ProfileOptions::default()).unwrap();
455 assert_eq!(rep.row_count, 3);
456
457 let score = rep.columns.iter().find(|c| c.name == "score").unwrap();
458 assert_eq!(score.null_count, 1);
459 assert_eq!(score.distinct_count, 2);
460 assert!(score.numeric.is_some());
461
462 let name = rep.columns.iter().find(|c| c.name == "name").unwrap();
463 assert_eq!(name.null_count, 0);
464 assert_eq!(name.distinct_count, 2);
465 assert!(name.numeric.is_none());
466 }
467
468 #[test]
469 fn profiling_supports_head_sampling() {
470 let ds = tiny();
471 let rep = profile_dataset(
472 &ds,
473 &ProfileOptions {
474 sampling: SamplingMode::Head(2),
475 quantiles: vec![0.5],
476 },
477 )
478 .unwrap();
479 assert_eq!(rep.row_count, 2);
480 }
481
482 #[test]
483 fn profile_report_renders_json_and_markdown() {
484 let ds = tiny();
485 let rep = profile_dataset(&ds, &ProfileOptions::default()).unwrap();
486 let json = render_profile_report_json(&rep).unwrap();
487 assert!(json.contains("\"row_count\""));
488 assert!(json.contains("\"columns\""));
489
490 let md = render_profile_report_markdown(&rep);
491 assert!(md.contains("## Profile report"));
492 assert!(md.contains("### Columns"));
493 }
494}