rust_data_processing/ingestion/partition.rs
1//! Hive-style **partition path discovery** and helpers to resolve **glob patterns** or **explicit
2//! file lists** — single-process only (no distributed coordinator).
3//!
4//! # Hive-style layout rules
5//!
6//! A common batch layout (e.g. Apache Hive / Spark) stores files under directories whose names are
7//! `key=value` pairs, for example:
8//!
9//! ```text
10//! warehouse/my_table/dt=2024-01-01/region=us/part-00000.csv
11//! ```
12//!
13//! **Rules used here:**
14//!
15//! - Discovery starts at a **root directory** you provide.
16//! - For each **file** under that root, the **parent path relative to root** is split into path
17//! components. **Every** directory component must match `key=value` where both sides are
18//! non-empty (split on the **first** `=`). The filename itself is not a partition segment.
19//! - A file placed **directly** under `root` (no partition directories) has an empty partition
20//! prefix.
21//! - If **any** directory component is **not** of the form `key=value`, that file is **skipped**
22//! (not returned). This avoids mis-classifying folders like `staging/` or `_temporary/`.
23//! - This crate does **not** validate that partition keys match your schema; callers may attach
24//! [`PartitionSegment`]s as extra columns after ingest in a later pipeline step.
25//!
26//! # Glob and explicit lists
27//!
28//! - [`paths_from_glob`] expands a filesystem glob (e.g. `data/**/*.parquet`) to existing files.
29//! - [`paths_from_explicit_list`] checks that each path exists and is a file, then returns them in
30//! order (deduplicated while preserving first occurrence).
31//! - [`paths_from_directory_scan`] walks a directory tree and returns matching files in sorted path
32//! order (see **Deterministic ordering** below).
33//!
34//! ## Deterministic ordering (incremental batches)
35//!
36//! For repeatable pipelines, these helpers define a stable sequence:
37//!
38//! - [`paths_from_directory_scan`], [`paths_from_glob`], and [`discover_hive_partitioned_files`]
39//! sort results by [`PathBuf`] (lexicographic / component-wise per the standard library).
40//! - [`paths_from_explicit_list`] preserves caller order (deduplicates while keeping first occurrence).
41
42use std::collections::HashSet;
43use std::path::{Path, PathBuf};
44
45use glob::{Pattern, glob};
46use walkdir::WalkDir;
47
48use crate::error::{IngestionError, IngestionResult};
49
50/// One hive-style directory segment `key=value`.
51#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct PartitionSegment {
53 /// Partition column name (left of `=`).
54 pub key: String,
55 /// Partition value (right of `=`).
56 pub value: String,
57}
58
59/// A data file discovered under a hive-style tree, with parsed partition segments.
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct PartitionedFile {
62 /// Absolute or normalized path to the file.
63 pub path: PathBuf,
64 /// Partition segments from the root down to the file's parent directory (order preserved).
65 pub segments: Vec<PartitionSegment>,
66}
67
68/// Parse a single path component as `key=value`.
69///
70/// Returns `None` if there is no `=`, either side is empty, or the string is malformed.
71pub fn parse_partition_segment(component: &str) -> Option<PartitionSegment> {
72 let (k, v) = component.split_once('=')?;
73 if k.is_empty() || v.is_empty() {
74 return None;
75 }
76 Some(PartitionSegment {
77 key: k.to_string(),
78 value: v.to_string(),
79 })
80}
81
82/// Parse every directory component of `relative_parent` as hive segments.
83///
84/// `relative_parent` should be the path of the parent directory **relative to the partition
85/// root**, or empty if the file sits directly under the root.
86///
87/// Returns `None` if any component is not a valid `key=value` segment.
88pub fn hive_segments_for_relative_parent(relative_parent: &Path) -> Option<Vec<PartitionSegment>> {
89 let mut segments = Vec::new();
90 for c in relative_parent.components() {
91 let std::path::Component::Normal(part) = c else {
92 continue;
93 };
94 let s = part.to_str()?;
95 segments.push(parse_partition_segment(s)?);
96 }
97 Some(segments)
98}
99
100/// Discover files under `root` whose parent path (relative to `root`) consists only of hive-style
101/// `key=value` directory segments.
102///
103/// - `root` must exist and be a directory.
104/// - If `file_pattern` is `Some`, it is a [`glob::Pattern`] string matched against the path of each
105/// file **relative to `root`** (use forward slashes in the pattern for portability, e.g.
106/// `**/*.csv`).
107/// - Results are sorted by [`Path`] for deterministic ordering.
108pub fn discover_hive_partitioned_files(
109 root: impl AsRef<Path>,
110 file_pattern: Option<&str>,
111) -> IngestionResult<Vec<PartitionedFile>> {
112 let root = root.as_ref();
113 if !root.is_dir() {
114 return Err(IngestionError::SchemaMismatch {
115 message: format!(
116 "hive partition root must be an existing directory: {}",
117 root.display()
118 ),
119 });
120 }
121
122 let pattern = match file_pattern {
123 None => None,
124 Some(p) => Some(Pattern::new(p).map_err(|e| IngestionError::SchemaMismatch {
125 message: format!("invalid glob pattern '{p}': {e}"),
126 })?),
127 };
128
129 let root = root.to_path_buf();
130 let mut out = Vec::new();
131
132 for entry in WalkDir::new(&root)
133 .follow_links(false)
134 .into_iter()
135 .filter_map(|e| e.ok())
136 {
137 let path = entry.path();
138 if !path.is_file() {
139 continue;
140 }
141
142 let rel = match path.strip_prefix(&root) {
143 Ok(r) => r.to_path_buf(),
144 Err(_) => continue,
145 };
146
147 if let Some(ref pat) = pattern {
148 if !pat.matches_path_with(
149 &rel,
150 glob::MatchOptions {
151 case_sensitive: true,
152 require_literal_separator: true,
153 require_literal_leading_dot: false,
154 },
155 ) {
156 continue;
157 }
158 }
159
160 let parent = rel.parent().unwrap_or_else(|| Path::new(""));
161 if let Some(segments) = hive_segments_for_relative_parent(parent) {
162 out.push(PartitionedFile {
163 path: path.to_path_buf(),
164 segments,
165 });
166 }
167 }
168
169 out.sort_by(|a, b| a.path.cmp(&b.path));
170 Ok(out)
171}
172
173/// Expand a filesystem glob pattern and return existing regular files, sorted by path.
174///
175/// Uses the [`mod@glob`] crate (shell-style patterns). Patterns are platform-specific; prefer explicit
176/// paths in tests when possible.
177pub fn paths_from_glob(pattern: &str) -> IngestionResult<Vec<PathBuf>> {
178 let mut out: Vec<PathBuf> = Vec::new();
179 for entry in glob(pattern).map_err(|e| IngestionError::SchemaMismatch {
180 message: format!("invalid glob pattern '{pattern}': {e}"),
181 })? {
182 let p = entry.map_err(|e| IngestionError::SchemaMismatch {
183 message: format!("glob expansion error for '{pattern}': {e}"),
184 })?;
185 if p.is_file() {
186 out.push(p);
187 }
188 }
189
190 out.sort();
191 out.dedup();
192 Ok(out)
193}
194
195/// Recursively list files under `root`, optionally filtered by a glob on the path **relative to
196/// `root`**, then sort for deterministic ordering.
197///
198/// Intended for **incremental directory batches**: pair with
199/// [`ingest_from_ordered_paths`](super::unified::ingest_from_ordered_paths) (or your own ordering)
200/// when you need the same file sequence across machines and runs.
201///
202/// - `root` must exist and be a directory.
203/// - If `relative_pattern` is `None`, every regular file under `root` is included.
204/// - If `Some`, it is a [`glob::Pattern`] matched against each file path relative to `root` (use
205/// forward slashes in the pattern string for portability, e.g. `**/*.csv`).
206pub fn paths_from_directory_scan(
207 root: impl AsRef<Path>,
208 relative_pattern: Option<&str>,
209) -> IngestionResult<Vec<PathBuf>> {
210 let root = root.as_ref();
211 if !root.is_dir() {
212 return Err(IngestionError::SchemaMismatch {
213 message: format!(
214 "directory scan root must be an existing directory: {}",
215 root.display()
216 ),
217 });
218 }
219
220 let pattern = match relative_pattern {
221 None => None,
222 Some(p) => Some(Pattern::new(p).map_err(|e| IngestionError::SchemaMismatch {
223 message: format!("invalid glob pattern '{p}': {e}"),
224 })?),
225 };
226
227 let root = root.to_path_buf();
228 let mut out = Vec::new();
229
230 for entry in WalkDir::new(&root)
231 .follow_links(false)
232 .into_iter()
233 .filter_map(|e| e.ok())
234 {
235 let path = entry.path();
236 if !path.is_file() {
237 continue;
238 }
239
240 let rel = match path.strip_prefix(&root) {
241 Ok(r) => r.to_path_buf(),
242 Err(_) => continue,
243 };
244
245 if let Some(ref pat) = pattern {
246 if !pat.matches_path_with(
247 &rel,
248 glob::MatchOptions {
249 case_sensitive: true,
250 require_literal_separator: true,
251 require_literal_leading_dot: false,
252 },
253 ) {
254 continue;
255 }
256 }
257
258 out.push(path.to_path_buf());
259 }
260
261 out.sort();
262 out.dedup();
263 Ok(out)
264}
265
266/// Validate and return an explicit list of file paths (must each exist and be a file).
267///
268/// Duplicates are removed while preserving first occurrence order.
269pub fn paths_from_explicit_list(paths: &[PathBuf]) -> IngestionResult<Vec<PathBuf>> {
270 let mut seen = HashSet::new();
271 let mut out = Vec::new();
272 for p in paths {
273 if !p.is_file() {
274 return Err(IngestionError::SchemaMismatch {
275 message: format!("explicit path is not an existing file: {}", p.display()),
276 });
277 }
278 if seen.insert(p.clone()) {
279 out.push(p.clone());
280 }
281 }
282 Ok(out)
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288
289 #[test]
290 fn parse_segment_happy() {
291 let s = parse_partition_segment("dt=2024-01-01").unwrap();
292 assert_eq!(s.key, "dt");
293 assert_eq!(s.value, "2024-01-01");
294 }
295
296 #[test]
297 fn parse_segment_rejects() {
298 assert!(parse_partition_segment("nodash").is_none());
299 assert!(parse_partition_segment("=v").is_none());
300 assert!(parse_partition_segment("k=").is_none());
301 }
302
303 #[test]
304 fn hive_segments_nested() {
305 let p = Path::new("dt=2024-01-01").join("region=us");
306 let segs = hive_segments_for_relative_parent(&p).unwrap();
307 assert_eq!(segs.len(), 2);
308 assert_eq!(segs[0].key, "dt");
309 assert_eq!(segs[1].key, "region");
310 }
311}