Skip to main content

rust_data_processing/
lib.rs

1//! `rust-data-processing` is a small library for ingesting common file formats into an in-memory
2//! [`types::DataSet`], using a user-provided [`types::Schema`].
3//!
4//! The primary entrypoint is [`ingestion::ingest_from_path`], which can auto-detect the ingestion
5//! format from the file extension (or you can force a format via [`ingestion::IngestionOptions`]).
6//!
7//! ## What you can ingest (Epic 1 / Story 1.1)
8//!
9//! **File formats (auto-detected by extension):**
10//!
11//! - **CSV**: `.csv`
12//! - **JSON**: `.json` (array-of-objects) and `.ndjson` (newline-delimited objects)
13//! - **Parquet**: `.parquet`, `.pq`
14//! - **Excel/workbooks**: `.xlsx`, `.xls`, `.xlsm`, `.xlsb`, `.ods`
15//!
16//! **Schema + value types:**
17//!
18//! Ingestion produces a [`types::DataSet`] whose cells are typed [`types::Value`]s matching a
19//! user-provided [`types::Schema`]. Supported logical types are:
20//!
21//! - [`types::DataType::Int64`]
22//! - [`types::DataType::Float64`]
23//! - [`types::DataType::Bool`]
24//! - [`types::DataType::Utf8`]
25//!
26//! Across formats, empty cells / empty strings / explicit JSON `null` map to [`types::Value::Null`].
27//!
28//! ## Quick examples: ingest data
29//!
30//! ```no_run
31//! use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
32//! use rust_data_processing::types::{DataType, Field, Schema};
33//!
34//! # fn main() -> Result<(), rust_data_processing::IngestionError> {
35//! let schema = Schema::new(vec![
36//!     Field::new("id", DataType::Int64),
37//!     Field::new("name", DataType::Utf8),
38//! ]);
39//! // Auto-detects by extension (.csv/.json/.parquet/.xlsx/...).
40//! let ds = ingest_from_path("data.csv", &schema, &IngestionOptions::default())?;
41//! println!("rows={}", ds.row_count());
42//! # Ok(())
43//! # }
44//! ```
45//!
46//! JSON supports nested field paths using dot notation in the schema (e.g. `user.name`):
47//!
48//! ```no_run
49//! use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
50//! use rust_data_processing::types::{DataType, Field, Schema};
51//!
52//! # fn main() -> Result<(), rust_data_processing::IngestionError> {
53//! let schema = Schema::new(vec![
54//!     Field::new("id", DataType::Int64),
55//!     Field::new("user.name", DataType::Utf8),
56//! ]);
57//! let ds = ingest_from_path("events.ndjson", &schema, &IngestionOptions::default())?;
58//! println!("rows={}", ds.row_count());
59//! # Ok(())
60//! # }
61//! ```
62//!
63//! ## Modules
64//!
65//! - [`ingestion`]: unified ingestion entrypoints and format-specific implementations
66//! - [`types`]: schema + in-memory dataset types
67//! - [`export`]: JSON Lines export + deterministic train/test row indices
68//! - [`privacy`]: UTF-8 change summaries after masking transforms (reports only)
69//! - [`reports`]: deterministic truncation helpers for large JSON/text blobs
70//! - [`processing`]: in-memory dataset transformations (filter/map/reduce, feature-wise stats, arg max/min, top‑k frequency)
71//! - [`execution`]: execution engine for parallel pipelines + throttling + metrics
72//! - `sql`: SQL support (Polars-backed; enabled by default)
73//! - [`transform`]: serde-friendly transformation spec compiled to pipeline wrappers
74//! - [`profiling`]: Polars-backed profiling metrics + sampling modes
75//! - [`validation`]: validation DSL + built-in checks + reporting
76//! - [`outliers`]: outlier detection primitives + explainable outputs
77//! - [`cdc`]: CDC boundary types (Phase 1 spike)
78//! - [`error`]: error types used across ingestion
79//! - [`pipeline_spec`]: shared `tests/fixtures/<bundle>/` JSON for pipelines, payloads, and schemas
80//!
81//! ## Processing example (1.2 pipeline)
82//!
83//! ```rust
84//! use rust_data_processing::processing::{filter, map, reduce, ReduceOp};
85//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
86//!
87//! let schema = Schema::new(vec![
88//!     Field::new("id", DataType::Int64),
89//!     Field::new("active", DataType::Bool),
90//!     Field::new("score", DataType::Float64),
91//! ]);
92//! let ds = DataSet::new(
93//!     schema,
94//!     vec![
95//!         vec![Value::Int64(1), Value::Bool(true), Value::Float64(10.0)],
96//!         vec![Value::Int64(2), Value::Bool(false), Value::Float64(20.0)],
97//!         vec![Value::Int64(3), Value::Bool(true), Value::Null],
98//!     ],
99//! );
100//!
101//! let active_idx = ds.schema.index_of("active").unwrap();
102//! let filtered = filter(&ds, |row| matches!(row.get(active_idx), Some(Value::Bool(true))));
103//! let mapped = map(&filtered, |row| {
104//!     let mut out = row.to_vec();
105//!     // score *= 2.0
106//!     if let Some(Value::Float64(v)) = out.get(2) {
107//!         out[2] = Value::Float64(v * 2.0);
108//!     }
109//!     out
110//! });
111//!
112//! let sum = reduce(&mapped, "score", ReduceOp::Sum).unwrap();
113//! assert_eq!(sum, Value::Float64(20.0));
114//! ```
115//!
116//! ## Execution engine example (1.3 parallel pipeline)
117//!
118//! ```no_run
119//! use rust_data_processing::execution::{ExecutionEngine, ExecutionOptions};
120//! use rust_data_processing::processing::ReduceOp;
121//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
122//!
123//! # fn main() {
124//! let schema = Schema::new(vec![
125//!     Field::new("id", DataType::Int64),
126//!     Field::new("active", DataType::Bool),
127//!     Field::new("score", DataType::Float64),
128//! ]);
129//! let ds = DataSet::new(
130//!     schema,
131//!     vec![
132//!         vec![Value::Int64(1), Value::Bool(true), Value::Float64(10.0)],
133//!         vec![Value::Int64(2), Value::Bool(false), Value::Float64(20.0)],
134//!         vec![Value::Int64(3), Value::Bool(true), Value::Null],
135//!     ],
136//! );
137//!
138//! let engine = ExecutionEngine::new(ExecutionOptions {
139//!     num_threads: Some(4),
140//!     chunk_size: 1_024,
141//!     max_in_flight_chunks: 4,
142//! });
143//!
144//! let active_idx = ds.schema.index_of("active").unwrap();
145//! let filtered = engine.filter_parallel(&ds, |row| matches!(row.get(active_idx), Some(Value::Bool(true))));
146//! let mapped = engine.map_parallel(&filtered, |row| row.to_vec());
147//! let sum = engine.reduce(&mapped, "score", ReduceOp::Sum).unwrap();
148//! assert_eq!(sum, Value::Float64(30.0));
149//!
150//! let snapshot = engine.metrics().snapshot();
151//! println!("rows_processed={}", snapshot.rows_processed);
152//! # }
153//! ```
154//!
155//! ## Quick examples: Phase 1 modules
156//!
157//! ### TransformSpec (declarative ETL)
158//!
159//! ```rust
160//! use rust_data_processing::pipeline::CastMode;
161//! use rust_data_processing::transform::{TransformSpec, TransformStep};
162//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
163//!
164//! let ds = DataSet::new(
165//!     Schema::new(vec![
166//!         Field::new("id", DataType::Int64),
167//!         Field::new("score", DataType::Int64),
168//!     ]),
169//!     vec![vec![Value::Int64(1), Value::Int64(10)], vec![Value::Int64(2), Value::Null]],
170//! );
171//!
172//! let out_schema = Schema::new(vec![
173//!     Field::new("id", DataType::Int64),
174//!     Field::new("score_f", DataType::Float64),
175//! ]);
176//!
177//! let spec = TransformSpec::new(out_schema.clone())
178//!     .with_step(TransformStep::Rename { pairs: vec![("score".to_string(), "score_f".to_string())] })
179//!     .with_step(TransformStep::Cast { column: "score_f".to_string(), to: DataType::Float64, mode: CastMode::Lossy })
180//!     .with_step(TransformStep::FillNull { column: "score_f".to_string(), value: Value::Float64(0.0) });
181//!
182//! let out = spec.apply(&ds).unwrap();
183//! assert_eq!(out.schema, out_schema);
184//! ```
185//!
186//! ### Profiling (metrics + deterministic sampling)
187//!
188//! ```rust
189//! use rust_data_processing::profiling::{profile_dataset, ProfileOptions, SamplingMode};
190//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
191//!
192//! let ds = DataSet::new(
193//!     Schema::new(vec![Field::new("x", DataType::Float64)]),
194//!     vec![vec![Value::Float64(1.0)], vec![Value::Null], vec![Value::Float64(3.0)]],
195//! );
196//!
197//! let rep = profile_dataset(
198//!     &ds,
199//!     &ProfileOptions { sampling: SamplingMode::Head(2), quantiles: vec![0.5] },
200//! )
201//! .unwrap();
202//! assert_eq!(rep.row_count, 2);
203//! ```
204//!
205//! ### Validation (DSL + reporting)
206//!
207//! ```rust
208//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
209//! use rust_data_processing::validation::{validate_dataset, Check, Severity, ValidationSpec};
210//!
211//! let ds = DataSet::new(
212//!     Schema::new(vec![Field::new("email", DataType::Utf8)]),
213//!     vec![vec![Value::Utf8("ada@example.com".to_string())], vec![Value::Null]],
214//! );
215//!
216//! let spec = ValidationSpec::new(vec![
217//!     Check::NotNull { column: "email".to_string(), severity: Severity::Error },
218//! ]);
219//!
220//! let rep = validate_dataset(&ds, &spec).unwrap();
221//! assert_eq!(rep.summary.total_checks, 1);
222//! ```
223//!
224//! ### Outliers (IQR / z-score / MAD)
225//!
226//! ```rust
227//! use rust_data_processing::outliers::{detect_outliers_dataset, OutlierMethod, OutlierOptions};
228//! use rust_data_processing::profiling::SamplingMode;
229//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
230//!
231//! let ds = DataSet::new(
232//!     Schema::new(vec![Field::new("x", DataType::Float64)]),
233//!     vec![
234//!         vec![Value::Float64(1.0)],
235//!         vec![Value::Float64(1.0)],
236//!         vec![Value::Float64(1.0)],
237//!         vec![Value::Float64(1000.0)],
238//!     ],
239//! );
240//!
241//! let rep = detect_outliers_dataset(
242//!     &ds,
243//!     "x",
244//!     OutlierMethod::Iqr { k: 1.5 },
245//!     &OutlierOptions { sampling: SamplingMode::Full, max_examples: 3 },
246//! )
247//! .unwrap();
248//! assert!(rep.outlier_count >= 1);
249//! ```
250//!
251//! ### SQL (Polars-backed)
252//!
253//! ```no_run
254//! # #[cfg(feature = "sql")]
255//! # fn main() -> Result<(), rust_data_processing::IngestionError> {
256//! use rust_data_processing::pipeline::DataFrame;
257//! use rust_data_processing::sql;
258//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
259//!
260//! let ds = DataSet::new(
261//!     Schema::new(vec![Field::new("id", DataType::Int64), Field::new("active", DataType::Bool)]),
262//!     vec![vec![Value::Int64(1), Value::Bool(true)]],
263//! );
264//! let out = sql::query(&DataFrame::from_dataset(&ds)?, "SELECT id FROM df WHERE active = TRUE")?
265//!     .collect()?;
266//! assert_eq!(out.row_count(), 1);
267//! # Ok(())
268//! # }
269//! # #[cfg(not(feature = "sql"))]
270//! # fn main() {}
271//! ```
272//!
273//! For more end-to-end examples, see the repository `README.md` and `API.md` (processing / aggregates).
274//! Aggregate semantics: `docs/REDUCE_AGG_SEMANTICS.md`.
275//!
276//! ### Reduce operations
277//!
278//! - [`processing::ReduceOp::Count`]: counts rows (including nulls)
279//! - [`processing::ReduceOp::Sum`], [`processing::ReduceOp::Min`], [`processing::ReduceOp::Max`]:
280//!   operate on numeric columns and ignore nulls. If all values are null, these return
281//!   `Some(Value::Null)`.
282//! - [`processing::ReduceOp::Mean`], [`processing::ReduceOp::Variance`], [`processing::ReduceOp::StdDev`]:
283//!   use a numerically stable one-pass (Welford) accumulation; mean / sum-of-squares / L2 norm are
284//!   returned as [`types::Value::Float64`]. Sample variance / std dev require at least two values.
285//! - [`processing::ReduceOp::CountDistinctNonNull`]: distinct non-null values (also for UTF-8 and bool).
286//! - [`processing::ReduceOp::Median`]: median of numeric values as [`types::Value::Float64`] (even length: mean of the two middle values).
287//! - [`pipeline::DataFrame::reduce`] provides the Polars-backed equivalent for whole-frame scalars.
288//! - [`processing::feature_wise_mean_std`]: one scan, mean + std for several numeric columns; [`pipeline::DataFrame::feature_wise_mean_std`] for Polars.
289//! - [`processing::arg_max_row`], [`processing::arg_min_row`], [`processing::top_k_by_frequency`]: row extrema and label top‑k.
290
291pub mod cdc;
292pub mod error;
293pub mod execution;
294pub mod export;
295pub mod ingestion;
296pub mod kafka;
297pub mod outliers;
298pub mod pipeline;
299pub mod pipeline_spec;
300pub mod privacy;
301pub mod processing;
302pub mod profiling;
303pub mod reports;
304#[cfg(feature = "sql")]
305pub mod sql;
306pub mod transform;
307pub mod types;
308pub mod validation;
309
310pub use error::{IngestionError, IngestionResult};
311
312// Keep pinned postgres crates in the `db_connectorx` dependency graph (RUSTSEC-2026-0178/0179/0180).
313#[cfg(feature = "db_connectorx")]
314use {postgres_protocol as _, tokio_postgres as _};