Skip to main content

rust_data_processing/
pipeline_spec.rs

1//! Load shared pipeline + schema JSON from a fixture bundle (e.g. `tests/fixtures/ghcn/`).
2//!
3//! Bundles are language-neutral: Java, Python, and Rust tests read the same files. Use
4//! `{{PLACEHOLDER}}` in pipeline templates for paths resolved at runtime.
5//!
6//! # Layout
7//!
8//! ```text
9//! tests/fixtures/<bundle>/
10//!   schemas/*.schema.json          — serde [`crate::types::Schema`] JSON
11//!   pipelines/*.pipeline.json      — `rdp_run_pipeline_json` control plane
12//!   payloads/*.payload.json        — `rdp_ingest_ordered_paths_json` bodies
13//! ```
14//!
15//! Bundles: `ghcn`, `jvm_contract`, `student_etl`, `people`, `watermark`, `deep`, `file_transfer`, …
16//!
17//! Pipelines may use `"schema_ref": "schemas/foo.schema.json"` (relative to the bundle root)
18//! instead of an inline `"schema"` object.
19
20use std::collections::HashMap;
21use std::path::{Path, PathBuf};
22
23use crate::error::{IngestionError, IngestionResult};
24use crate::types::Schema;
25
26/// Root directory for a fixture bundle (`tests/fixtures/ghcn`, etc.).
27#[derive(Debug, Clone)]
28pub struct PipelineBundle {
29    root: PathBuf,
30}
31
32impl PipelineBundle {
33    /// Bundle rooted at `tests/fixtures/<name>` under the repository (via `CARGO_MANIFEST_DIR`).
34    pub fn from_repo_fixture(name: &str) -> Self {
35        let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
36            .join("tests")
37            .join("fixtures")
38            .join(name);
39        Self { root }
40    }
41
42    /// Bundle with an explicit filesystem root (JVM tests pass `tests/fixtures/ghcn`).
43    pub fn from_root(root: impl AsRef<Path>) -> Self {
44        Self {
45            root: root.as_ref().to_path_buf(),
46        }
47    }
48
49    pub fn root(&self) -> &Path {
50        &self.root
51    }
52
53    /// Load a serde [`Schema`] from `rel` (e.g. `schemas/json_source.schema.json`).
54    pub fn load_schema(&self, rel: &str) -> IngestionResult<Schema> {
55        let path = self.root.join(rel);
56        let text = std::fs::read_to_string(&path).map_err(IngestionError::Io)?;
57        serde_json::from_str(&text).map_err(|e| IngestionError::SchemaMismatch {
58            message: format!("schema JSON {}: {e}", path.display()),
59        })
60    }
61
62    /// Like [`Self::load_schema`], but panics with bundle path context (integration tests).
63    pub fn expect_schema(&self, rel: &str) -> Schema {
64        self.load_schema(rel)
65            .unwrap_or_else(|e| panic!("fixture schema {}: {e}", self.root.join(rel).display()))
66    }
67
68    /// Load a pipeline template, expand `*_ref` schema pointers, substitute `{{KEY}}` placeholders.
69    pub fn resolve_pipeline_json(
70        &self,
71        pipeline_rel: &str,
72        bindings: &HashMap<String, String>,
73    ) -> IngestionResult<String> {
74        let mut doc = self.load_json_value(pipeline_rel)?;
75        resolve_schema_refs(&mut doc, self)?;
76        bind_placeholders(&mut doc, bindings)?;
77        serde_json::to_string(&doc).map_err(|e| IngestionError::SchemaMismatch {
78            message: format!("serialize resolved pipeline: {e}"),
79        })
80    }
81
82    /// Load an `rdp_ingest_ordered_paths_json` (or similar) payload template.
83    pub fn resolve_payload_json(
84        &self,
85        payload_rel: &str,
86        bindings: &HashMap<String, String>,
87    ) -> IngestionResult<String> {
88        let mut doc = self.load_json_value(payload_rel)?;
89        resolve_schema_refs(&mut doc, self)?;
90        bind_placeholders(&mut doc, bindings)?;
91        serde_json::to_string(&doc).map_err(|e| IngestionError::SchemaMismatch {
92            message: format!("serialize resolved payload: {e}"),
93        })
94    }
95
96    fn load_json_value(&self, rel: &str) -> IngestionResult<serde_json::Value> {
97        let path = self.root.join(rel);
98        let text = std::fs::read_to_string(&path).map_err(IngestionError::Io)?;
99        serde_json::from_str(&text).map_err(|e| IngestionError::SchemaMismatch {
100            message: format!("JSON {}: {e}", path.display()),
101        })
102    }
103
104    /// Schema JSON string for path ingest FFI (`rdp_ingest_xml_path`, etc.).
105    pub fn schema_json(&self, schema_rel: &str) -> IngestionResult<String> {
106        let schema = self.load_schema(schema_rel)?;
107        serde_json::to_string(&schema).map_err(|e| IngestionError::SchemaMismatch {
108            message: format!("serialize schema: {e}"),
109        })
110    }
111
112    /// Polars SQL from a pipeline template (`transform.sql`), e.g. `SQLQueries.java` / pytest parity.
113    pub fn pipeline_transform_sql(&self, pipeline_rel: &str) -> IngestionResult<String> {
114        let doc = self.load_json_value(pipeline_rel)?;
115        doc.get("transform")
116            .and_then(|t| t.get("sql"))
117            .and_then(|s| s.as_str())
118            .map(str::to_string)
119            .ok_or_else(|| IngestionError::SchemaMismatch {
120                message: format!(
121                    "pipeline {} missing transform.sql string",
122                    self.root.join(pipeline_rel).display()
123                ),
124            })
125    }
126
127    /// SQL text from `queries/*.sql.json` (`{ "sql": "..." }`), e.g. `sql_parity` JOIN fixtures.
128    pub fn load_query_sql(&self, query_rel: &str) -> IngestionResult<String> {
129        let doc = self.load_json_value(query_rel)?;
130        doc.get("sql")
131            .and_then(|s| s.as_str())
132            .map(str::to_string)
133            .ok_or_else(|| IngestionError::SchemaMismatch {
134                message: format!(
135                    "query {} missing sql string",
136                    self.root.join(query_rel).display()
137                ),
138            })
139    }
140}
141
142fn resolve_schema_refs(
143    value: &mut serde_json::Value,
144    bundle: &PipelineBundle,
145) -> IngestionResult<()> {
146    match value {
147        serde_json::Value::Object(map) => {
148            let ref_keys: Vec<String> = map
149                .keys()
150                .filter(|k| k.ends_with("_ref"))
151                .cloned()
152                .collect();
153            for old_key in ref_keys {
154                let rel = map
155                    .remove(&old_key)
156                    .ok_or_else(|| IngestionError::SchemaMismatch {
157                        message: format!("missing key {old_key} while expanding schema refs"),
158                    })?;
159                let rel_str = rel.as_str().ok_or_else(|| IngestionError::SchemaMismatch {
160                    message: format!("{old_key} must be a string path relative to the bundle root"),
161                })?;
162                let new_key = old_key
163                    .strip_suffix("_ref")
164                    .expect("keys end with _ref")
165                    .to_string();
166                let schema = bundle.load_schema(rel_str)?;
167                map.insert(
168                    new_key,
169                    serde_json::to_value(&schema).map_err(|e| IngestionError::SchemaMismatch {
170                        message: format!("embed schema from {rel_str}: {e}"),
171                    })?,
172                );
173            }
174            for child in map.values_mut() {
175                resolve_schema_refs(child, bundle)?;
176            }
177        }
178        serde_json::Value::Array(items) => {
179            for child in items {
180                resolve_schema_refs(child, bundle)?;
181            }
182        }
183        _ => {}
184    }
185    Ok(())
186}
187
188/// Replace `{{NAME}}` in every string leaf of `value`.
189pub fn bind_placeholders(
190    value: &mut serde_json::Value,
191    bindings: &HashMap<String, String>,
192) -> IngestionResult<()> {
193    match value {
194        serde_json::Value::String(s) => {
195            for (key, replacement) in bindings {
196                let needle = format!("{{{{{key}}}}}");
197                if s.contains(&needle) {
198                    *s = s.replace(&needle, replacement);
199                }
200            }
201        }
202        serde_json::Value::Array(items) => {
203            for child in items {
204                bind_placeholders(child, bindings)?;
205            }
206        }
207        serde_json::Value::Object(map) => {
208            for child in map.values_mut() {
209                bind_placeholders(child, bindings)?;
210            }
211        }
212        _ => {}
213    }
214    Ok(())
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    #[test]
222    fn ghcn_bundle_resolves_json_to_xml_pipeline() {
223        let bundle = PipelineBundle::from_repo_fixture("ghcn");
224        let json = bundle
225            .resolve_pipeline_json(
226                "pipelines/json_to_xml.pipeline.json",
227                &HashMap::from([
228                    ("SOURCE_PATH".into(), "/data/in.json".into()),
229                    ("SINK_PATH".into(), "/tmp/out.xml".into()),
230                ]),
231            )
232            .unwrap();
233        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
234        assert_eq!(v["sources"]["paths"][0], "/data/in.json");
235        assert_eq!(v["sinks"][0]["path"], "/tmp/out.xml");
236        assert!(v["sources"]["schema"]["fields"].is_array());
237        assert!(v["sources"].get("schema_ref").is_none());
238    }
239
240    #[test]
241    fn ghcn_schemas_load() {
242        let bundle = PipelineBundle::from_repo_fixture("ghcn");
243        let s = bundle
244            .load_schema("schemas/parquet_lake.schema.json")
245            .unwrap();
246        assert_eq!(s.fields.len(), 6);
247        assert_eq!(s.fields[0].name, "station_id");
248    }
249
250    #[test]
251    fn ghcn_xml_to_parquet_pipeline_expands_schema_ref() {
252        let bundle = PipelineBundle::from_repo_fixture("ghcn");
253        let json = bundle
254            .resolve_pipeline_json(
255                "pipelines/xml_to_parquet.pipeline.json",
256                &HashMap::from([
257                    ("SOURCE_PATH".into(), "/in.xml".into()),
258                    ("SINK_PATH".into(), "/out.parquet".into()),
259                ]),
260            )
261            .unwrap();
262        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
263        assert_eq!(v["sinks"][0]["kind"], "parquet_file");
264        assert!(v["sources"]["schema"]["fields"].is_array());
265        assert!(v["sources"].get("schema_ref").is_none());
266    }
267
268    #[test]
269    fn jvm_contract_resolves_ordered_paths_payload() {
270        let bundle = PipelineBundle::from_repo_fixture("jvm_contract");
271        let json = bundle
272            .resolve_payload_json(
273                "payloads/ordered_paths_dataset.payload.json",
274                &HashMap::from([
275                    ("PATH_A".into(), "/a.csv".into()),
276                    ("PATH_B".into(), "/b.csv".into()),
277                ]),
278            )
279            .unwrap();
280        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
281        assert_eq!(v["paths"][0], "/a.csv");
282        assert!(v["schema"]["fields"].is_array());
283    }
284
285    #[test]
286    fn jvm_contract_sql_query_dataset_pipeline_has_transform_sql() {
287        let bundle = PipelineBundle::from_repo_fixture("jvm_contract");
288        let sql = bundle
289            .pipeline_transform_sql("pipelines/sql_query_dataset.pipeline.json")
290            .unwrap();
291        assert!(sql.contains("active = TRUE"));
292        assert!(sql.contains("ORDER BY id DESC"));
293    }
294
295    #[test]
296    fn jvm_contract_dataframe_centric_pipeline_expands_schema_ref() {
297        let bundle = PipelineBundle::from_repo_fixture("jvm_contract");
298        let json = bundle
299            .resolve_pipeline_json(
300                "pipelines/dataframe_centric_sql.pipeline.json",
301                &HashMap::from([
302                    ("SOURCE_PATH".into(), "/in.json".into()),
303                    ("SINK_PATH".into(), "/out.parquet".into()),
304                ]),
305            )
306            .unwrap();
307        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
308        assert_eq!(v["sinks"][0]["kind"], "parquet_file");
309        assert!(v["sources"]["schema"]["fields"].is_array());
310        assert!(v["sources"].get("schema_ref").is_none());
311        assert!(
312            v["transform"]["sql"]
313                .as_str()
314                .unwrap()
315                .contains("score * 2.0")
316        );
317    }
318
319    #[test]
320    fn sql_parity_join_query_json_loads() {
321        let bundle = PipelineBundle::from_repo_fixture("sql_parity");
322        let sql = bundle
323            .load_query_sql("queries/join_people_scores.sql.json")
324            .unwrap();
325        assert!(sql.contains("JOIN scores"));
326        assert!(sql.contains("people p"));
327    }
328
329    #[test]
330    fn student_etl_legacy_pipeline_expands_schema_refs() {
331        let bundle = PipelineBundle::from_repo_fixture("student_etl");
332        let json = bundle
333            .resolve_pipeline_json(
334                "pipelines/legacy_student_etl.pipeline.json",
335                &HashMap::from([("SOURCE_PATH".into(), "/part-0.json".into())]),
336            )
337            .unwrap();
338        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
339        assert!(v["schema_student_json"]["fields"].is_array());
340        assert!(v.get("schema_student_json_ref").is_none());
341    }
342
343    #[test]
344    fn people_json_path_dataset_payload_expands_schema_ref() {
345        let bundle = PipelineBundle::from_repo_fixture("people");
346        let json = bundle
347            .resolve_payload_json(
348                "payloads/json_path_dataset.payload.json",
349                &HashMap::from([("SOURCE_PATH".into(), "/in.json".into())]),
350            )
351            .unwrap();
352        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
353        assert_eq!(v["paths"][0], "/in.json");
354        assert!(v["schema"]["fields"].is_array());
355        assert_eq!(v["options"]["format"], "json");
356    }
357
358    #[test]
359    fn people_excel_sheet_dataset_payload_expands_schema_ref() {
360        let bundle = PipelineBundle::from_repo_fixture("people");
361        let json = bundle
362            .resolve_payload_json(
363                "payloads/excel_sheet_dataset.payload.json",
364                &HashMap::from([
365                    ("SOURCE_PATH".into(), "/in.xlsx".into()),
366                    ("SHEET_NAME".into(), "Sheet1".into()),
367                ]),
368            )
369            .unwrap();
370        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
371        assert_eq!(v["paths"][0], "/in.xlsx");
372        assert_eq!(v["options"]["format"], "excel");
373        assert_eq!(v["options"]["sheet_name"], "Sheet1");
374        assert!(v["schema"]["fields"].is_array());
375    }
376
377    #[test]
378    fn people_csv_path_dataset_payload_expands_schema_ref() {
379        let bundle = PipelineBundle::from_repo_fixture("people");
380        let json = bundle
381            .resolve_payload_json(
382                "payloads/csv_path_dataset.payload.json",
383                &HashMap::from([("SOURCE_PATH".into(), "/in.csv".into())]),
384            )
385            .unwrap();
386        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
387        assert_eq!(v["paths"][0], "/in.csv");
388        assert!(v["schema"]["fields"].is_array());
389        assert_eq!(v["options"]["format"], "csv");
390    }
391
392    #[test]
393    fn people_csv_to_parquet_pipeline_expands_schema_ref() {
394        let bundle = PipelineBundle::from_repo_fixture("people");
395        let json = bundle
396            .resolve_pipeline_json(
397                "pipelines/csv_to_parquet.pipeline.json",
398                &HashMap::from([
399                    ("SOURCE_PATH".into(), "/in.csv".into()),
400                    ("SINK_PATH".into(), "/out.parquet".into()),
401                ]),
402            )
403            .unwrap();
404        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
405        assert_eq!(v["sinks"][0]["kind"], "parquet_file");
406        assert!(v["sources"]["schema"]["fields"].is_array());
407    }
408
409    #[test]
410    fn watermark_csv_watermark_ingest_body_expands_schema_ref() {
411        let bundle = PipelineBundle::from_repo_fixture("watermark");
412        let json = bundle
413            .resolve_payload_json("payloads/csv_watermark_ingest.body.json", &HashMap::new())
414            .unwrap();
415        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
416        assert!(v["schema"]["fields"].is_array());
417        assert_eq!(v["options"]["watermark_exclusive_above"], 100);
418    }
419
420    #[test]
421    fn student_etl_legacy_three_paths_binds_committed_parts() {
422        let bundle = PipelineBundle::from_repo_fixture("student_etl");
423        let root = bundle.root();
424        let json = bundle
425            .resolve_pipeline_json(
426                "pipelines/legacy_student_etl_three_paths.pipeline.json",
427                &HashMap::from([
428                    (
429                        "PATH_A".into(),
430                        root.join("data/part-00000.json")
431                            .to_string_lossy()
432                            .into_owned(),
433                    ),
434                    (
435                        "PATH_B".into(),
436                        root.join("data/part-00001.json")
437                            .to_string_lossy()
438                            .into_owned(),
439                    ),
440                    (
441                        "PATH_C".into(),
442                        root.join("data/part-00002.json")
443                            .to_string_lossy()
444                            .into_owned(),
445                    ),
446                ]),
447            )
448            .unwrap();
449        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
450        assert_eq!(v["json_source_paths"].as_array().unwrap().len(), 3);
451    }
452}