rust_data_processing

Python bindings for the rust-data-processing Rust crate.

The native extension is built with PyO3 and maturin. Prefer the APIs exported here rather than importing rust_data_processing._rust_data_processing directly.

  1"""Python bindings for the `rust-data-processing` Rust crate.
  2
  3The native extension is built with PyO3 and maturin. Prefer the APIs exported here rather than
  4importing ``rust_data_processing._rust_data_processing`` directly.
  5"""
  6
  7from __future__ import annotations
  8
  9import json
 10from importlib.metadata import PackageNotFoundError, version
 11from typing import Any, Mapping
 12
 13from . import cdc
 14from ._rust_data_processing import (
 15    DataFrame,
 16    DataSet,
 17    ExecutionEngine,
 18    SqlContext,
 19    detect_outliers_json,
 20    detect_outliers_markdown,
 21    discover_hive_partitioned_files,
 22    export_dataset_jsonl,
 23    export_filter_rows_max_utf8_chars,
 24    export_train_test_row_indices,
 25    extension_version,
 26    infer_schema_from_path,
 27    ingest_from_db,
 28    ingest_from_db_infer,
 29    ingest_from_ordered_paths,
 30    ingest_from_path,
 31    ingest_from_path_infer,
 32    parse_partition_segment,
 33    paths_from_directory_scan,
 34    paths_from_explicit_list,
 35    paths_from_glob,
 36    privacy_summarize_utf8_changes_json,
 37    privacy_summarize_utf8_changes_markdown,
 38    processing_arg_max_row,
 39    processing_arg_min_row,
 40    processing_feature_wise_mean_std,
 41    processing_filter,
 42    processing_map,
 43    processing_reduce,
 44    processing_top_k_by_frequency,
 45    profile_dataset_json,
 46    profile_dataset_markdown,
 47    reports_truncate_utf8_bytes,
 48    sql_query_dataset,
 49    transform_apply_json,
 50    validate_dataset_json,
 51    validate_dataset_markdown,
 52)
 53
 54try:
 55    __version__ = version("rust-data-processing")
 56except PackageNotFoundError:
 57    __version__ = extension_version()
 58
 59
 60def ingest_with_inferred_schema(path: str, options: dict[str, Any] | None = None):
 61    """Infer schema once, then ingest (two passes over the file; same as the Rust helper)."""
 62    schema = infer_schema_from_path(path, options)
 63    return ingest_from_path(path, schema, options), schema
 64
 65
 66def transform_apply(dataset: DataSet, spec: Mapping[str, Any] | str) -> DataSet:
 67    """Apply a :class:`TransformSpec` given as JSON string or dict (serde shape)."""
 68    if isinstance(spec, str):
 69        payload = spec
 70    else:
 71        payload = json.dumps(spec)
 72    return transform_apply_json(dataset, payload)
 73
 74
 75def profile_dataset(dataset: DataSet, options: dict[str, Any] | None = None) -> dict[str, Any]:
 76    """Return profiling report as a dict (parsed JSON)."""
 77    return json.loads(profile_dataset_json(dataset, options))
 78
 79
 80def validate_dataset(dataset: DataSet, spec: Mapping[str, Any]) -> dict[str, Any]:
 81    """Run validation checks; return report dict (parsed JSON)."""
 82    return json.loads(validate_dataset_json(dataset, spec))
 83
 84
 85def detect_outliers(
 86    dataset: DataSet,
 87    column: str,
 88    method: Mapping[str, Any],
 89    options: dict[str, Any] | None = None,
 90) -> dict[str, Any]:
 91    """Outlier report as dict (parsed JSON)."""
 92    return json.loads(detect_outliers_json(dataset, column, method, options))
 93
 94
 95def export_jsonl_records(dataset: DataSet, columns: list[str]) -> list[dict[str, Any]]:
 96    """Parse :func:`export_dataset_jsonl` output into a list of row dicts."""
 97    text = export_dataset_jsonl(dataset, columns)
 98    return [json.loads(line) for line in text.splitlines() if line.strip()]
 99
100
101def privacy_summarize_utf8_changes(
102    before: DataSet,
103    after: DataSet,
104    columns: list[str],
105    *,
106    as_markdown: bool = False,
107) -> list[dict[str, Any]] | str:
108    """UTF-8 diff summary per column: parsed JSON list (default) or Markdown string."""
109    if as_markdown:
110        return privacy_summarize_utf8_changes_markdown(before, after, columns)
111    return json.loads(privacy_summarize_utf8_changes_json(before, after, columns))
112
113
114__all__ = [
115    "DataFrame",
116    "DataSet",
117    "ExecutionEngine",
118    "SqlContext",
119    "__version__",
120    "cdc",
121    "detect_outliers",
122    "detect_outliers_json",
123    "detect_outliers_markdown",
124    "discover_hive_partitioned_files",
125    "export_dataset_jsonl",
126    "export_filter_rows_max_utf8_chars",
127    "export_jsonl_records",
128    "export_train_test_row_indices",
129    "extension_version",
130    "ingest_from_db",
131    "ingest_from_db_infer",
132    "ingest_from_ordered_paths",
133    "ingest_from_path",
134    "ingest_from_path_infer",
135    "ingest_with_inferred_schema",
136    "infer_schema_from_path",
137    "parse_partition_segment",
138    "paths_from_directory_scan",
139    "paths_from_explicit_list",
140    "paths_from_glob",
141    "privacy_summarize_utf8_changes",
142    "privacy_summarize_utf8_changes_json",
143    "privacy_summarize_utf8_changes_markdown",
144    "processing_arg_max_row",
145    "processing_arg_min_row",
146    "processing_feature_wise_mean_std",
147    "processing_filter",
148    "processing_map",
149    "processing_reduce",
150    "processing_top_k_by_frequency",
151    "profile_dataset",
152    "profile_dataset_json",
153    "profile_dataset_markdown",
154    "reports_truncate_utf8_bytes",
155    "sql_query_dataset",
156    "transform_apply",
157    "transform_apply_json",
158    "validate_dataset",
159    "validate_dataset_json",
160    "validate_dataset_markdown",
161]
162
163try:
164    from ._rust_data_processing import (  # noqa: F401
165        elt_load_kafka_records_json,
166        export_dataset_to_kafka,
167        poll_kafka_window,
168        poll_kafka_window_loaded,
169    )
170except ImportError:
171    pass
172else:
173    __all__ += [
174        "elt_load_kafka_records_json",
175        "export_dataset_to_kafka",
176        "poll_kafka_window",
177        "poll_kafka_window_loaded",
178    ]
179
180try:
181    from ._rust_data_processing import (  # noqa: F401
182        export_dataset_to_object_store_uri,
183        ingest_from_file_transfer_uri,
184        ingest_from_object_store_uri,
185    )
186except ImportError:
187    pass
188else:
189    __all__ += [
190        "export_dataset_to_object_store_uri",
191        "ingest_from_file_transfer_uri",
192        "ingest_from_object_store_uri",
193    ]
class DataFrame:

Polars-backed lazy pipeline; collect to [DataSet] when ready.

def from_dataset(ds):
def filter_eq(self, /, column, value):
def filter_not_null(self, /, column):
def filter_mod_eq_int64(self, /, column, modulus, equals):
def select(self, /, columns):
def rename(self, /, pairs):
def drop(self, /, columns):
def cast(self, /, column, to):
def cast_with_mode(self, /, column, to, mode):
def fill_null(self, /, column, value):
def with_literal(self, /, name, value):
def multiply_f64(self, /, column, factor):
def add_f64(self, /, column, delta):
def with_mul_f64(self, /, name, source, factor):
def with_add_f64(self, /, name, source, delta):
def group_by(self, /, keys, aggs):
def join(self, /, other, left_on, right_on, how):
def collect(self, /):
def collect_with_schema(self, /, schema):
def reduce(self, /, column, op):
def sum(self, /, column):
def feature_wise_mean_std(self, /, columns, std_kind=None):
class DataSet:

In-memory tabular dataset (mirrors rust_data_processing::types::DataSet).

def row_count(self, /):
def column_names(self, /):
def schema(self, /):
def to_rows(self, /):
class ExecutionEngine:

Configurable Rayon-backed engine: parallel filter/map (Python row callbacks acquire the GIL per row), sequential reduce, and optional on_execution_event hook.

def filter_parallel(self, /, ds, predicate):
def map_parallel(self, /, ds, mapper):
def reduce(self, /, ds, column, op):
def metrics_snapshot(self, /):
class SqlContext:

Multi-table SQL context (register several pipeline frames, then execute).

def register(self, /, name, df):
def execute(self, /, sql):
__version__ = '0.3.6'
def detect_outliers( dataset: DataSet, column: str, method: Mapping[str, Any], options: dict[str, typing.Any] | None = None) -> dict[str, typing.Any]:
86def detect_outliers(
87    dataset: DataSet,
88    column: str,
89    method: Mapping[str, Any],
90    options: dict[str, Any] | None = None,
91) -> dict[str, Any]:
92    """Outlier report as dict (parsed JSON)."""
93    return json.loads(detect_outliers_json(dataset, column, method, options))

Outlier report as dict (parsed JSON).

def detect_outliers_json(ds, column, method, options=None):
def detect_outliers_markdown(ds, column, method, options=None):
def discover_hive_partitioned_files(root, file_pattern=None):

Discover files under a Hive-style key=value directory tree (see Rust ingestion::partition rustdoc).

Returns a list of dicts: {"path": str, "segments": [{"key": str, "value": str}, ...]}.

def export_dataset_jsonl(ds, columns):
def export_filter_rows_max_utf8_chars(ds, column, max_chars):

Drop rows where column (Utf8) exceeds max_chars Unicode scalars; nulls kept.

def export_jsonl_records(dataset: DataSet, columns: list[str]) -> list[dict[str, typing.Any]]:
96def export_jsonl_records(dataset: DataSet, columns: list[str]) -> list[dict[str, Any]]:
97    """Parse :func:`export_dataset_jsonl` output into a list of row dicts."""
98    text = export_dataset_jsonl(dataset, columns)
99    return [json.loads(line) for line in text.splitlines() if line.strip()]

Parse export_dataset_jsonl() output into a list of row dicts.

def export_train_test_row_indices(row_count, test_fraction):

Deterministic train/test row index split: (train_indices, test_indices) as two lists of int.

def extension_version():
def ingest_from_db(conn, query, schema, options=None):
def ingest_from_db_infer(conn, query, options=None):
def ingest_from_ordered_paths(paths, schema, options=None):

Ingest an ordered list of files, concatenate rows, apply watermark once; returns (dataset, metadata_dict).

def ingest_from_path(path, schema, options=None):
def ingest_from_path_infer(path, options=None):
def ingest_with_inferred_schema(path: str, options: dict[str, typing.Any] | None = None):
61def ingest_with_inferred_schema(path: str, options: dict[str, Any] | None = None):
62    """Infer schema once, then ingest (two passes over the file; same as the Rust helper)."""
63    schema = infer_schema_from_path(path, options)
64    return ingest_from_path(path, schema, options), schema

Infer schema once, then ingest (two passes over the file; same as the Rust helper).

def infer_schema_from_path(path, options=None):
def parse_partition_segment(component):

Parse a single path component as key=value, or return None if invalid.

def paths_from_directory_scan(root, relative_pattern=None):

List files under root (recursive), optional glob on path relative to root; sorted for stable ordering.

def paths_from_explicit_list(paths):

Validate paths exist as files; return them in order with duplicates removed (first wins).

def paths_from_glob(pattern):

Expand a filesystem glob to existing file paths (sorted).

def privacy_summarize_utf8_changes( before: DataSet, after: DataSet, columns: list[str], *, as_markdown: bool = False) -> list[dict[str, typing.Any]] | str:
102def privacy_summarize_utf8_changes(
103    before: DataSet,
104    after: DataSet,
105    columns: list[str],
106    *,
107    as_markdown: bool = False,
108) -> list[dict[str, Any]] | str:
109    """UTF-8 diff summary per column: parsed JSON list (default) or Markdown string."""
110    if as_markdown:
111        return privacy_summarize_utf8_changes_markdown(before, after, columns)
112    return json.loads(privacy_summarize_utf8_changes_json(before, after, columns))

UTF-8 diff summary per column: parsed JSON list (default) or Markdown string.

def privacy_summarize_utf8_changes_json(before, after, columns):
def privacy_summarize_utf8_changes_markdown(before, after, columns):
def processing_arg_max_row(ds, column):
def processing_arg_min_row(ds, column):
def processing_feature_wise_mean_std(ds, columns, std_kind=None):
def processing_filter(ds, predicate):
def processing_map(ds, mapper):
def processing_reduce(ds, column, op):
def processing_top_k_by_frequency(ds, column, k):
def profile_dataset( dataset: DataSet, options: dict[str, typing.Any] | None = None) -> dict[str, typing.Any]:
76def profile_dataset(dataset: DataSet, options: dict[str, Any] | None = None) -> dict[str, Any]:
77    """Return profiling report as a dict (parsed JSON)."""
78    return json.loads(profile_dataset_json(dataset, options))

Return profiling report as a dict (parsed JSON).

def profile_dataset_json(ds, options=None):
def profile_dataset_markdown(ds, options=None):
def reports_truncate_utf8_bytes(text, max_bytes):
def sql_query_dataset(ds, sql):
def transform_apply(dataset: DataSet, spec: Union[Mapping[str, Any], str]) -> DataSet:
67def transform_apply(dataset: DataSet, spec: Mapping[str, Any] | str) -> DataSet:
68    """Apply a :class:`TransformSpec` given as JSON string or dict (serde shape)."""
69    if isinstance(spec, str):
70        payload = spec
71    else:
72        payload = json.dumps(spec)
73    return transform_apply_json(dataset, payload)

Apply a TransformSpec given as JSON string or dict (serde shape).

def transform_apply_json(ds, spec_json):
def validate_dataset(dataset: DataSet, spec: Mapping[str, Any]) -> dict[str, typing.Any]:
81def validate_dataset(dataset: DataSet, spec: Mapping[str, Any]) -> dict[str, Any]:
82    """Run validation checks; return report dict (parsed JSON)."""
83    return json.loads(validate_dataset_json(dataset, spec))

Run validation checks; return report dict (parsed JSON).

def validate_dataset_json(ds, spec):
def validate_dataset_markdown(ds, spec):