Skip to main content

rust_data_processing/ingestion/
partition.rs

1//! Hive-style **partition path discovery** and helpers to resolve **glob patterns** or **explicit
2//! file lists** — single-process only (no distributed coordinator).
3//!
4//! # Hive-style layout rules
5//!
6//! A common batch layout (e.g. Apache Hive / Spark) stores files under directories whose names are
7//! `key=value` pairs, for example:
8//!
9//! ```text
10//! warehouse/my_table/dt=2024-01-01/region=us/part-00000.csv
11//! ```
12//!
13//! **Rules used here:**
14//!
15//! - Discovery starts at a **root directory** you provide.
16//! - For each **file** under that root, the **parent path relative to root** is split into path
17//!   components. **Every** directory component must match `key=value` where both sides are
18//!   non-empty (split on the **first** `=`). The filename itself is not a partition segment.
19//! - A file placed **directly** under `root` (no partition directories) has an empty partition
20//!   prefix.
21//! - If **any** directory component is **not** of the form `key=value`, that file is **skipped**
22//!   (not returned). This avoids mis-classifying folders like `staging/` or `_temporary/`.
23//! - This crate does **not** validate that partition keys match your schema; callers may attach
24//!   [`PartitionSegment`]s as extra columns after ingest in a later pipeline step.
25//!
26//! # Glob and explicit lists
27//!
28//! - [`paths_from_glob`] expands a filesystem glob (e.g. `data/**/*.parquet`) to existing files.
29//! - [`paths_from_explicit_list`] checks that each path exists and is a file, then returns them in
30//!   order (deduplicated while preserving first occurrence).
31//! - [`paths_from_directory_scan`] walks a directory tree and returns matching files in sorted path
32//!   order (see **Deterministic ordering** below).
33//!
34//! ## Deterministic ordering (incremental batches)
35//!
36//! For repeatable pipelines, these helpers define a stable sequence:
37//!
38//! - [`paths_from_directory_scan`], [`paths_from_glob`], and [`discover_hive_partitioned_files`]
39//!   sort results by [`PathBuf`] (lexicographic / component-wise per the standard library).
40//! - [`paths_from_explicit_list`] preserves caller order (deduplicates while keeping first occurrence).
41
42use std::collections::HashSet;
43use std::path::{Path, PathBuf};
44
45use glob::{Pattern, glob};
46use walkdir::WalkDir;
47
48use crate::error::{IngestionError, IngestionResult};
49
50/// One hive-style directory segment `key=value`.
51#[derive(Debug, Clone, PartialEq, Eq, Hash)]
52pub struct PartitionSegment {
53    /// Partition column name (left of `=`).
54    pub key: String,
55    /// Partition value (right of `=`).
56    pub value: String,
57}
58
59/// A data file discovered under a hive-style tree, with parsed partition segments.
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct PartitionedFile {
62    /// Absolute or normalized path to the file.
63    pub path: PathBuf,
64    /// Partition segments from the root down to the file's parent directory (order preserved).
65    pub segments: Vec<PartitionSegment>,
66}
67
68/// Parse a single path component as `key=value`.
69///
70/// Returns `None` if there is no `=`, either side is empty, or the string is malformed.
71pub fn parse_partition_segment(component: &str) -> Option<PartitionSegment> {
72    let (k, v) = component.split_once('=')?;
73    if k.is_empty() || v.is_empty() {
74        return None;
75    }
76    Some(PartitionSegment {
77        key: k.to_string(),
78        value: v.to_string(),
79    })
80}
81
82/// Parse every directory component of `relative_parent` as hive segments.
83///
84/// `relative_parent` should be the path of the parent directory **relative to the partition
85/// root**, or empty if the file sits directly under the root.
86///
87/// Returns `None` if any component is not a valid `key=value` segment.
88pub fn hive_segments_for_relative_parent(relative_parent: &Path) -> Option<Vec<PartitionSegment>> {
89    let mut segments = Vec::new();
90    for c in relative_parent.components() {
91        let std::path::Component::Normal(part) = c else {
92            continue;
93        };
94        let s = part.to_str()?;
95        segments.push(parse_partition_segment(s)?);
96    }
97    Some(segments)
98}
99
100/// Discover files under `root` whose parent path (relative to `root`) consists only of hive-style
101/// `key=value` directory segments.
102///
103/// - `root` must exist and be a directory.
104/// - If `file_pattern` is `Some`, it is a [`glob::Pattern`] string matched against the path of each
105///   file **relative to `root`** (use forward slashes in the pattern for portability, e.g.
106///   `**/*.csv`).
107/// - Results are sorted by [`Path`] for deterministic ordering.
108pub fn discover_hive_partitioned_files(
109    root: impl AsRef<Path>,
110    file_pattern: Option<&str>,
111) -> IngestionResult<Vec<PartitionedFile>> {
112    let root = root.as_ref();
113    if !root.is_dir() {
114        return Err(IngestionError::SchemaMismatch {
115            message: format!(
116                "hive partition root must be an existing directory: {}",
117                root.display()
118            ),
119        });
120    }
121
122    let pattern = match file_pattern {
123        None => None,
124        Some(p) => Some(Pattern::new(p).map_err(|e| IngestionError::SchemaMismatch {
125            message: format!("invalid glob pattern '{p}': {e}"),
126        })?),
127    };
128
129    let root = root.to_path_buf();
130    let mut out = Vec::new();
131
132    for entry in WalkDir::new(&root)
133        .follow_links(false)
134        .into_iter()
135        .filter_map(|e| e.ok())
136    {
137        let path = entry.path();
138        if !path.is_file() {
139            continue;
140        }
141
142        let rel = match path.strip_prefix(&root) {
143            Ok(r) => r.to_path_buf(),
144            Err(_) => continue,
145        };
146
147        if let Some(ref pat) = pattern {
148            if !pat.matches_path_with(
149                &rel,
150                glob::MatchOptions {
151                    case_sensitive: true,
152                    require_literal_separator: true,
153                    require_literal_leading_dot: false,
154                },
155            ) {
156                continue;
157            }
158        }
159
160        let parent = rel.parent().unwrap_or_else(|| Path::new(""));
161        if let Some(segments) = hive_segments_for_relative_parent(parent) {
162            out.push(PartitionedFile {
163                path: path.to_path_buf(),
164                segments,
165            });
166        }
167    }
168
169    out.sort_by(|a, b| a.path.cmp(&b.path));
170    Ok(out)
171}
172
173/// Expand a filesystem glob pattern and return existing regular files, sorted by path.
174///
175/// Uses the [`mod@glob`] crate (shell-style patterns). Patterns are platform-specific; prefer explicit
176/// paths in tests when possible.
177pub fn paths_from_glob(pattern: &str) -> IngestionResult<Vec<PathBuf>> {
178    let mut out: Vec<PathBuf> = Vec::new();
179    for entry in glob(pattern).map_err(|e| IngestionError::SchemaMismatch {
180        message: format!("invalid glob pattern '{pattern}': {e}"),
181    })? {
182        let p = entry.map_err(|e| IngestionError::SchemaMismatch {
183            message: format!("glob expansion error for '{pattern}': {e}"),
184        })?;
185        if p.is_file() {
186            out.push(p);
187        }
188    }
189
190    out.sort();
191    out.dedup();
192    Ok(out)
193}
194
195/// Recursively list files under `root`, optionally filtered by a glob on the path **relative to
196/// `root`**, then sort for deterministic ordering.
197///
198/// Intended for **incremental directory batches**: pair with
199/// [`ingest_from_ordered_paths`](super::unified::ingest_from_ordered_paths) (or your own ordering)
200/// when you need the same file sequence across machines and runs.
201///
202/// - `root` must exist and be a directory.
203/// - If `relative_pattern` is `None`, every regular file under `root` is included.
204/// - If `Some`, it is a [`glob::Pattern`] matched against each file path relative to `root` (use
205///   forward slashes in the pattern string for portability, e.g. `**/*.csv`).
206pub fn paths_from_directory_scan(
207    root: impl AsRef<Path>,
208    relative_pattern: Option<&str>,
209) -> IngestionResult<Vec<PathBuf>> {
210    let root = root.as_ref();
211    if !root.is_dir() {
212        return Err(IngestionError::SchemaMismatch {
213            message: format!(
214                "directory scan root must be an existing directory: {}",
215                root.display()
216            ),
217        });
218    }
219
220    let pattern = match relative_pattern {
221        None => None,
222        Some(p) => Some(Pattern::new(p).map_err(|e| IngestionError::SchemaMismatch {
223            message: format!("invalid glob pattern '{p}': {e}"),
224        })?),
225    };
226
227    let root = root.to_path_buf();
228    let mut out = Vec::new();
229
230    for entry in WalkDir::new(&root)
231        .follow_links(false)
232        .into_iter()
233        .filter_map(|e| e.ok())
234    {
235        let path = entry.path();
236        if !path.is_file() {
237            continue;
238        }
239
240        let rel = match path.strip_prefix(&root) {
241            Ok(r) => r.to_path_buf(),
242            Err(_) => continue,
243        };
244
245        if let Some(ref pat) = pattern {
246            if !pat.matches_path_with(
247                &rel,
248                glob::MatchOptions {
249                    case_sensitive: true,
250                    require_literal_separator: true,
251                    require_literal_leading_dot: false,
252                },
253            ) {
254                continue;
255            }
256        }
257
258        out.push(path.to_path_buf());
259    }
260
261    out.sort();
262    out.dedup();
263    Ok(out)
264}
265
266/// Validate and return an explicit list of file paths (must each exist and be a file).
267///
268/// Duplicates are removed while preserving first occurrence order.
269pub fn paths_from_explicit_list(paths: &[PathBuf]) -> IngestionResult<Vec<PathBuf>> {
270    let mut seen = HashSet::new();
271    let mut out = Vec::new();
272    for p in paths {
273        if !p.is_file() {
274            return Err(IngestionError::SchemaMismatch {
275                message: format!("explicit path is not an existing file: {}", p.display()),
276            });
277        }
278        if seen.insert(p.clone()) {
279            out.push(p.clone());
280        }
281    }
282    Ok(out)
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288
289    #[test]
290    fn parse_segment_happy() {
291        let s = parse_partition_segment("dt=2024-01-01").unwrap();
292        assert_eq!(s.key, "dt");
293        assert_eq!(s.value, "2024-01-01");
294    }
295
296    #[test]
297    fn parse_segment_rejects() {
298        assert!(parse_partition_segment("nodash").is_none());
299        assert!(parse_partition_segment("=v").is_none());
300        assert!(parse_partition_segment("k=").is_none());
301    }
302
303    #[test]
304    fn hive_segments_nested() {
305        let p = Path::new("dt=2024-01-01").join("region=us");
306        let segs = hive_segments_for_relative_parent(&p).unwrap();
307        assert_eq!(segs.len(), 2);
308        assert_eq!(segs[0].key, "dt");
309        assert_eq!(segs[1].key, "region");
310    }
311}