rust_data_processing
Python bindings for the rust-data-processing Rust crate.
The native extension is built with PyO3 and maturin. Prefer the APIs exported here rather than
importing rust_data_processing._rust_data_processing directly.
1"""Python bindings for the `rust-data-processing` Rust crate. 2 3The native extension is built with PyO3 and maturin. Prefer the APIs exported here rather than 4importing ``rust_data_processing._rust_data_processing`` directly. 5""" 6 7from __future__ import annotations 8 9import json 10from importlib.metadata import PackageNotFoundError, version 11from typing import Any, Mapping 12 13from . import cdc 14from ._rust_data_processing import ( 15 DataFrame, 16 DataSet, 17 ExecutionEngine, 18 SqlContext, 19 detect_outliers_json, 20 detect_outliers_markdown, 21 discover_hive_partitioned_files, 22 export_dataset_jsonl, 23 export_filter_rows_max_utf8_chars, 24 export_train_test_row_indices, 25 extension_version, 26 infer_schema_from_path, 27 ingest_from_db, 28 ingest_from_db_infer, 29 ingest_from_ordered_paths, 30 ingest_from_path, 31 ingest_from_path_infer, 32 parse_partition_segment, 33 paths_from_directory_scan, 34 paths_from_explicit_list, 35 paths_from_glob, 36 privacy_summarize_utf8_changes_json, 37 privacy_summarize_utf8_changes_markdown, 38 processing_arg_max_row, 39 processing_arg_min_row, 40 processing_feature_wise_mean_std, 41 processing_filter, 42 processing_map, 43 processing_reduce, 44 processing_top_k_by_frequency, 45 profile_dataset_json, 46 profile_dataset_markdown, 47 reports_truncate_utf8_bytes, 48 sql_query_dataset, 49 transform_apply_json, 50 validate_dataset_json, 51 validate_dataset_markdown, 52) 53 54try: 55 __version__ = version("rust-data-processing") 56except PackageNotFoundError: 57 __version__ = extension_version() 58 59 60def ingest_with_inferred_schema(path: str, options: dict[str, Any] | None = None): 61 """Infer schema once, then ingest (two passes over the file; same as the Rust helper).""" 62 schema = infer_schema_from_path(path, options) 63 return ingest_from_path(path, schema, options), schema 64 65 66def transform_apply(dataset: DataSet, spec: Mapping[str, Any] | str) -> DataSet: 67 """Apply a :class:`TransformSpec` given as JSON string or dict (serde shape).""" 68 if isinstance(spec, str): 69 payload = spec 70 else: 71 payload = json.dumps(spec) 72 return transform_apply_json(dataset, payload) 73 74 75def profile_dataset(dataset: DataSet, options: dict[str, Any] | None = None) -> dict[str, Any]: 76 """Return profiling report as a dict (parsed JSON).""" 77 return json.loads(profile_dataset_json(dataset, options)) 78 79 80def validate_dataset(dataset: DataSet, spec: Mapping[str, Any]) -> dict[str, Any]: 81 """Run validation checks; return report dict (parsed JSON).""" 82 return json.loads(validate_dataset_json(dataset, spec)) 83 84 85def detect_outliers( 86 dataset: DataSet, 87 column: str, 88 method: Mapping[str, Any], 89 options: dict[str, Any] | None = None, 90) -> dict[str, Any]: 91 """Outlier report as dict (parsed JSON).""" 92 return json.loads(detect_outliers_json(dataset, column, method, options)) 93 94 95def export_jsonl_records(dataset: DataSet, columns: list[str]) -> list[dict[str, Any]]: 96 """Parse :func:`export_dataset_jsonl` output into a list of row dicts.""" 97 text = export_dataset_jsonl(dataset, columns) 98 return [json.loads(line) for line in text.splitlines() if line.strip()] 99 100 101def privacy_summarize_utf8_changes( 102 before: DataSet, 103 after: DataSet, 104 columns: list[str], 105 *, 106 as_markdown: bool = False, 107) -> list[dict[str, Any]] | str: 108 """UTF-8 diff summary per column: parsed JSON list (default) or Markdown string.""" 109 if as_markdown: 110 return privacy_summarize_utf8_changes_markdown(before, after, columns) 111 return json.loads(privacy_summarize_utf8_changes_json(before, after, columns)) 112 113 114__all__ = [ 115 "DataFrame", 116 "DataSet", 117 "ExecutionEngine", 118 "SqlContext", 119 "__version__", 120 "cdc", 121 "detect_outliers", 122 "detect_outliers_json", 123 "detect_outliers_markdown", 124 "discover_hive_partitioned_files", 125 "export_dataset_jsonl", 126 "export_filter_rows_max_utf8_chars", 127 "export_jsonl_records", 128 "export_train_test_row_indices", 129 "extension_version", 130 "ingest_from_db", 131 "ingest_from_db_infer", 132 "ingest_from_ordered_paths", 133 "ingest_from_path", 134 "ingest_from_path_infer", 135 "ingest_with_inferred_schema", 136 "infer_schema_from_path", 137 "parse_partition_segment", 138 "paths_from_directory_scan", 139 "paths_from_explicit_list", 140 "paths_from_glob", 141 "privacy_summarize_utf8_changes", 142 "privacy_summarize_utf8_changes_json", 143 "privacy_summarize_utf8_changes_markdown", 144 "processing_arg_max_row", 145 "processing_arg_min_row", 146 "processing_feature_wise_mean_std", 147 "processing_filter", 148 "processing_map", 149 "processing_reduce", 150 "processing_top_k_by_frequency", 151 "profile_dataset", 152 "profile_dataset_json", 153 "profile_dataset_markdown", 154 "reports_truncate_utf8_bytes", 155 "sql_query_dataset", 156 "transform_apply", 157 "transform_apply_json", 158 "validate_dataset", 159 "validate_dataset_json", 160 "validate_dataset_markdown", 161] 162 163try: 164 from ._rust_data_processing import ( # noqa: F401 165 elt_load_kafka_records_json, 166 export_dataset_to_kafka, 167 poll_kafka_window, 168 poll_kafka_window_loaded, 169 ) 170except ImportError: 171 pass 172else: 173 __all__ += [ 174 "elt_load_kafka_records_json", 175 "export_dataset_to_kafka", 176 "poll_kafka_window", 177 "poll_kafka_window_loaded", 178 ] 179 180try: 181 from ._rust_data_processing import ( # noqa: F401 182 export_dataset_to_object_store_uri, 183 ingest_from_file_transfer_uri, 184 ingest_from_object_store_uri, 185 ) 186except ImportError: 187 pass 188else: 189 __all__ += [ 190 "export_dataset_to_object_store_uri", 191 "ingest_from_file_transfer_uri", 192 "ingest_from_object_store_uri", 193 ]
Polars-backed lazy pipeline; collect to [DataSet] when ready.
In-memory tabular dataset (mirrors rust_data_processing::types::DataSet).
Configurable Rayon-backed engine: parallel filter/map (Python row callbacks acquire the GIL per row),
sequential reduce, and optional on_execution_event hook.
Multi-table SQL context (register several pipeline frames, then execute).
86def detect_outliers( 87 dataset: DataSet, 88 column: str, 89 method: Mapping[str, Any], 90 options: dict[str, Any] | None = None, 91) -> dict[str, Any]: 92 """Outlier report as dict (parsed JSON).""" 93 return json.loads(detect_outliers_json(dataset, column, method, options))
Outlier report as dict (parsed JSON).
Discover files under a Hive-style key=value directory tree (see Rust ingestion::partition rustdoc).
Returns a list of dicts: {"path": str, "segments": [{"key": str, "value": str}, ...]}.
Drop rows where column (Utf8) exceeds max_chars Unicode scalars; nulls kept.
96def export_jsonl_records(dataset: DataSet, columns: list[str]) -> list[dict[str, Any]]: 97 """Parse :func:`export_dataset_jsonl` output into a list of row dicts.""" 98 text = export_dataset_jsonl(dataset, columns) 99 return [json.loads(line) for line in text.splitlines() if line.strip()]
Parse export_dataset_jsonl() output into a list of row dicts.
Deterministic train/test row index split: (train_indices, test_indices) as two lists of int.
Ingest an ordered list of files, concatenate rows, apply watermark once; returns (dataset, metadata_dict).
61def ingest_with_inferred_schema(path: str, options: dict[str, Any] | None = None): 62 """Infer schema once, then ingest (two passes over the file; same as the Rust helper).""" 63 schema = infer_schema_from_path(path, options) 64 return ingest_from_path(path, schema, options), schema
Infer schema once, then ingest (two passes over the file; same as the Rust helper).
Parse a single path component as key=value, or return None if invalid.
List files under root (recursive), optional glob on path relative to root; sorted for stable ordering.
Validate paths exist as files; return them in order with duplicates removed (first wins).
Expand a filesystem glob to existing file paths (sorted).
102def privacy_summarize_utf8_changes( 103 before: DataSet, 104 after: DataSet, 105 columns: list[str], 106 *, 107 as_markdown: bool = False, 108) -> list[dict[str, Any]] | str: 109 """UTF-8 diff summary per column: parsed JSON list (default) or Markdown string.""" 110 if as_markdown: 111 return privacy_summarize_utf8_changes_markdown(before, after, columns) 112 return json.loads(privacy_summarize_utf8_changes_json(before, after, columns))
UTF-8 diff summary per column: parsed JSON list (default) or Markdown string.
76def profile_dataset(dataset: DataSet, options: dict[str, Any] | None = None) -> dict[str, Any]: 77 """Return profiling report as a dict (parsed JSON).""" 78 return json.loads(profile_dataset_json(dataset, options))
Return profiling report as a dict (parsed JSON).
67def transform_apply(dataset: DataSet, spec: Mapping[str, Any] | str) -> DataSet: 68 """Apply a :class:`TransformSpec` given as JSON string or dict (serde shape).""" 69 if isinstance(spec, str): 70 payload = spec 71 else: 72 payload = json.dumps(spec) 73 return transform_apply_json(dataset, payload)
Apply a TransformSpec given as JSON string or dict (serde shape).
81def validate_dataset(dataset: DataSet, spec: Mapping[str, Any]) -> dict[str, Any]: 82 """Run validation checks; return report dict (parsed JSON).""" 83 return json.loads(validate_dataset_json(dataset, spec))
Run validation checks; return report dict (parsed JSON).