1use std::collections::HashMap;
21use std::path::{Path, PathBuf};
22
23use crate::error::{IngestionError, IngestionResult};
24use crate::types::Schema;
25
26#[derive(Debug, Clone)]
28pub struct PipelineBundle {
29 root: PathBuf,
30}
31
32impl PipelineBundle {
33 pub fn from_repo_fixture(name: &str) -> Self {
35 let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
36 .join("tests")
37 .join("fixtures")
38 .join(name);
39 Self { root }
40 }
41
42 pub fn from_root(root: impl AsRef<Path>) -> Self {
44 Self {
45 root: root.as_ref().to_path_buf(),
46 }
47 }
48
49 pub fn root(&self) -> &Path {
50 &self.root
51 }
52
53 pub fn load_schema(&self, rel: &str) -> IngestionResult<Schema> {
55 let path = self.root.join(rel);
56 let text = std::fs::read_to_string(&path).map_err(IngestionError::Io)?;
57 serde_json::from_str(&text).map_err(|e| IngestionError::SchemaMismatch {
58 message: format!("schema JSON {}: {e}", path.display()),
59 })
60 }
61
62 pub fn expect_schema(&self, rel: &str) -> Schema {
64 self.load_schema(rel)
65 .unwrap_or_else(|e| panic!("fixture schema {}: {e}", self.root.join(rel).display()))
66 }
67
68 pub fn resolve_pipeline_json(
70 &self,
71 pipeline_rel: &str,
72 bindings: &HashMap<String, String>,
73 ) -> IngestionResult<String> {
74 let mut doc = self.load_json_value(pipeline_rel)?;
75 resolve_schema_refs(&mut doc, self)?;
76 bind_placeholders(&mut doc, bindings)?;
77 serde_json::to_string(&doc).map_err(|e| IngestionError::SchemaMismatch {
78 message: format!("serialize resolved pipeline: {e}"),
79 })
80 }
81
82 pub fn resolve_payload_json(
84 &self,
85 payload_rel: &str,
86 bindings: &HashMap<String, String>,
87 ) -> IngestionResult<String> {
88 let mut doc = self.load_json_value(payload_rel)?;
89 resolve_schema_refs(&mut doc, self)?;
90 bind_placeholders(&mut doc, bindings)?;
91 serde_json::to_string(&doc).map_err(|e| IngestionError::SchemaMismatch {
92 message: format!("serialize resolved payload: {e}"),
93 })
94 }
95
96 fn load_json_value(&self, rel: &str) -> IngestionResult<serde_json::Value> {
97 let path = self.root.join(rel);
98 let text = std::fs::read_to_string(&path).map_err(IngestionError::Io)?;
99 serde_json::from_str(&text).map_err(|e| IngestionError::SchemaMismatch {
100 message: format!("JSON {}: {e}", path.display()),
101 })
102 }
103
104 pub fn schema_json(&self, schema_rel: &str) -> IngestionResult<String> {
106 let schema = self.load_schema(schema_rel)?;
107 serde_json::to_string(&schema).map_err(|e| IngestionError::SchemaMismatch {
108 message: format!("serialize schema: {e}"),
109 })
110 }
111
112 pub fn pipeline_transform_sql(&self, pipeline_rel: &str) -> IngestionResult<String> {
114 let doc = self.load_json_value(pipeline_rel)?;
115 doc.get("transform")
116 .and_then(|t| t.get("sql"))
117 .and_then(|s| s.as_str())
118 .map(str::to_string)
119 .ok_or_else(|| IngestionError::SchemaMismatch {
120 message: format!(
121 "pipeline {} missing transform.sql string",
122 self.root.join(pipeline_rel).display()
123 ),
124 })
125 }
126
127 pub fn load_query_sql(&self, query_rel: &str) -> IngestionResult<String> {
129 let doc = self.load_json_value(query_rel)?;
130 doc.get("sql")
131 .and_then(|s| s.as_str())
132 .map(str::to_string)
133 .ok_or_else(|| IngestionError::SchemaMismatch {
134 message: format!(
135 "query {} missing sql string",
136 self.root.join(query_rel).display()
137 ),
138 })
139 }
140}
141
142fn resolve_schema_refs(
143 value: &mut serde_json::Value,
144 bundle: &PipelineBundle,
145) -> IngestionResult<()> {
146 match value {
147 serde_json::Value::Object(map) => {
148 let ref_keys: Vec<String> = map
149 .keys()
150 .filter(|k| k.ends_with("_ref"))
151 .cloned()
152 .collect();
153 for old_key in ref_keys {
154 let rel = map
155 .remove(&old_key)
156 .ok_or_else(|| IngestionError::SchemaMismatch {
157 message: format!("missing key {old_key} while expanding schema refs"),
158 })?;
159 let rel_str = rel.as_str().ok_or_else(|| IngestionError::SchemaMismatch {
160 message: format!("{old_key} must be a string path relative to the bundle root"),
161 })?;
162 let new_key = old_key
163 .strip_suffix("_ref")
164 .expect("keys end with _ref")
165 .to_string();
166 let schema = bundle.load_schema(rel_str)?;
167 map.insert(
168 new_key,
169 serde_json::to_value(&schema).map_err(|e| IngestionError::SchemaMismatch {
170 message: format!("embed schema from {rel_str}: {e}"),
171 })?,
172 );
173 }
174 for child in map.values_mut() {
175 resolve_schema_refs(child, bundle)?;
176 }
177 }
178 serde_json::Value::Array(items) => {
179 for child in items {
180 resolve_schema_refs(child, bundle)?;
181 }
182 }
183 _ => {}
184 }
185 Ok(())
186}
187
188pub fn bind_placeholders(
190 value: &mut serde_json::Value,
191 bindings: &HashMap<String, String>,
192) -> IngestionResult<()> {
193 match value {
194 serde_json::Value::String(s) => {
195 for (key, replacement) in bindings {
196 let needle = format!("{{{{{key}}}}}");
197 if s.contains(&needle) {
198 *s = s.replace(&needle, replacement);
199 }
200 }
201 }
202 serde_json::Value::Array(items) => {
203 for child in items {
204 bind_placeholders(child, bindings)?;
205 }
206 }
207 serde_json::Value::Object(map) => {
208 for child in map.values_mut() {
209 bind_placeholders(child, bindings)?;
210 }
211 }
212 _ => {}
213 }
214 Ok(())
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 #[test]
222 fn ghcn_bundle_resolves_json_to_xml_pipeline() {
223 let bundle = PipelineBundle::from_repo_fixture("ghcn");
224 let json = bundle
225 .resolve_pipeline_json(
226 "pipelines/json_to_xml.pipeline.json",
227 &HashMap::from([
228 ("SOURCE_PATH".into(), "/data/in.json".into()),
229 ("SINK_PATH".into(), "/tmp/out.xml".into()),
230 ]),
231 )
232 .unwrap();
233 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
234 assert_eq!(v["sources"]["paths"][0], "/data/in.json");
235 assert_eq!(v["sinks"][0]["path"], "/tmp/out.xml");
236 assert!(v["sources"]["schema"]["fields"].is_array());
237 assert!(v["sources"].get("schema_ref").is_none());
238 }
239
240 #[test]
241 fn ghcn_schemas_load() {
242 let bundle = PipelineBundle::from_repo_fixture("ghcn");
243 let s = bundle
244 .load_schema("schemas/parquet_lake.schema.json")
245 .unwrap();
246 assert_eq!(s.fields.len(), 6);
247 assert_eq!(s.fields[0].name, "station_id");
248 }
249
250 #[test]
251 fn ghcn_xml_to_parquet_pipeline_expands_schema_ref() {
252 let bundle = PipelineBundle::from_repo_fixture("ghcn");
253 let json = bundle
254 .resolve_pipeline_json(
255 "pipelines/xml_to_parquet.pipeline.json",
256 &HashMap::from([
257 ("SOURCE_PATH".into(), "/in.xml".into()),
258 ("SINK_PATH".into(), "/out.parquet".into()),
259 ]),
260 )
261 .unwrap();
262 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
263 assert_eq!(v["sinks"][0]["kind"], "parquet_file");
264 assert!(v["sources"]["schema"]["fields"].is_array());
265 assert!(v["sources"].get("schema_ref").is_none());
266 }
267
268 #[test]
269 fn jvm_contract_resolves_ordered_paths_payload() {
270 let bundle = PipelineBundle::from_repo_fixture("jvm_contract");
271 let json = bundle
272 .resolve_payload_json(
273 "payloads/ordered_paths_dataset.payload.json",
274 &HashMap::from([
275 ("PATH_A".into(), "/a.csv".into()),
276 ("PATH_B".into(), "/b.csv".into()),
277 ]),
278 )
279 .unwrap();
280 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
281 assert_eq!(v["paths"][0], "/a.csv");
282 assert!(v["schema"]["fields"].is_array());
283 }
284
285 #[test]
286 fn jvm_contract_sql_query_dataset_pipeline_has_transform_sql() {
287 let bundle = PipelineBundle::from_repo_fixture("jvm_contract");
288 let sql = bundle
289 .pipeline_transform_sql("pipelines/sql_query_dataset.pipeline.json")
290 .unwrap();
291 assert!(sql.contains("active = TRUE"));
292 assert!(sql.contains("ORDER BY id DESC"));
293 }
294
295 #[test]
296 fn jvm_contract_dataframe_centric_pipeline_expands_schema_ref() {
297 let bundle = PipelineBundle::from_repo_fixture("jvm_contract");
298 let json = bundle
299 .resolve_pipeline_json(
300 "pipelines/dataframe_centric_sql.pipeline.json",
301 &HashMap::from([
302 ("SOURCE_PATH".into(), "/in.json".into()),
303 ("SINK_PATH".into(), "/out.parquet".into()),
304 ]),
305 )
306 .unwrap();
307 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
308 assert_eq!(v["sinks"][0]["kind"], "parquet_file");
309 assert!(v["sources"]["schema"]["fields"].is_array());
310 assert!(v["sources"].get("schema_ref").is_none());
311 assert!(
312 v["transform"]["sql"]
313 .as_str()
314 .unwrap()
315 .contains("score * 2.0")
316 );
317 }
318
319 #[test]
320 fn sql_parity_join_query_json_loads() {
321 let bundle = PipelineBundle::from_repo_fixture("sql_parity");
322 let sql = bundle
323 .load_query_sql("queries/join_people_scores.sql.json")
324 .unwrap();
325 assert!(sql.contains("JOIN scores"));
326 assert!(sql.contains("people p"));
327 }
328
329 #[test]
330 fn student_etl_legacy_pipeline_expands_schema_refs() {
331 let bundle = PipelineBundle::from_repo_fixture("student_etl");
332 let json = bundle
333 .resolve_pipeline_json(
334 "pipelines/legacy_student_etl.pipeline.json",
335 &HashMap::from([("SOURCE_PATH".into(), "/part-0.json".into())]),
336 )
337 .unwrap();
338 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
339 assert!(v["schema_student_json"]["fields"].is_array());
340 assert!(v.get("schema_student_json_ref").is_none());
341 }
342
343 #[test]
344 fn people_json_path_dataset_payload_expands_schema_ref() {
345 let bundle = PipelineBundle::from_repo_fixture("people");
346 let json = bundle
347 .resolve_payload_json(
348 "payloads/json_path_dataset.payload.json",
349 &HashMap::from([("SOURCE_PATH".into(), "/in.json".into())]),
350 )
351 .unwrap();
352 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
353 assert_eq!(v["paths"][0], "/in.json");
354 assert!(v["schema"]["fields"].is_array());
355 assert_eq!(v["options"]["format"], "json");
356 }
357
358 #[test]
359 fn people_excel_sheet_dataset_payload_expands_schema_ref() {
360 let bundle = PipelineBundle::from_repo_fixture("people");
361 let json = bundle
362 .resolve_payload_json(
363 "payloads/excel_sheet_dataset.payload.json",
364 &HashMap::from([
365 ("SOURCE_PATH".into(), "/in.xlsx".into()),
366 ("SHEET_NAME".into(), "Sheet1".into()),
367 ]),
368 )
369 .unwrap();
370 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
371 assert_eq!(v["paths"][0], "/in.xlsx");
372 assert_eq!(v["options"]["format"], "excel");
373 assert_eq!(v["options"]["sheet_name"], "Sheet1");
374 assert!(v["schema"]["fields"].is_array());
375 }
376
377 #[test]
378 fn people_csv_path_dataset_payload_expands_schema_ref() {
379 let bundle = PipelineBundle::from_repo_fixture("people");
380 let json = bundle
381 .resolve_payload_json(
382 "payloads/csv_path_dataset.payload.json",
383 &HashMap::from([("SOURCE_PATH".into(), "/in.csv".into())]),
384 )
385 .unwrap();
386 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
387 assert_eq!(v["paths"][0], "/in.csv");
388 assert!(v["schema"]["fields"].is_array());
389 assert_eq!(v["options"]["format"], "csv");
390 }
391
392 #[test]
393 fn people_csv_to_parquet_pipeline_expands_schema_ref() {
394 let bundle = PipelineBundle::from_repo_fixture("people");
395 let json = bundle
396 .resolve_pipeline_json(
397 "pipelines/csv_to_parquet.pipeline.json",
398 &HashMap::from([
399 ("SOURCE_PATH".into(), "/in.csv".into()),
400 ("SINK_PATH".into(), "/out.parquet".into()),
401 ]),
402 )
403 .unwrap();
404 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
405 assert_eq!(v["sinks"][0]["kind"], "parquet_file");
406 assert!(v["sources"]["schema"]["fields"].is_array());
407 }
408
409 #[test]
410 fn watermark_csv_watermark_ingest_body_expands_schema_ref() {
411 let bundle = PipelineBundle::from_repo_fixture("watermark");
412 let json = bundle
413 .resolve_payload_json("payloads/csv_watermark_ingest.body.json", &HashMap::new())
414 .unwrap();
415 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
416 assert!(v["schema"]["fields"].is_array());
417 assert_eq!(v["options"]["watermark_exclusive_above"], 100);
418 }
419
420 #[test]
421 fn student_etl_legacy_three_paths_binds_committed_parts() {
422 let bundle = PipelineBundle::from_repo_fixture("student_etl");
423 let root = bundle.root();
424 let json = bundle
425 .resolve_pipeline_json(
426 "pipelines/legacy_student_etl_three_paths.pipeline.json",
427 &HashMap::from([
428 (
429 "PATH_A".into(),
430 root.join("data/part-00000.json")
431 .to_string_lossy()
432 .into_owned(),
433 ),
434 (
435 "PATH_B".into(),
436 root.join("data/part-00001.json")
437 .to_string_lossy()
438 .into_owned(),
439 ),
440 (
441 "PATH_C".into(),
442 root.join("data/part-00002.json")
443 .to_string_lossy()
444 .into_owned(),
445 ),
446 ]),
447 )
448 .unwrap();
449 let v: serde_json::Value = serde_json::from_str(&json).unwrap();
450 assert_eq!(v["json_source_paths"].as_array().unwrap().len(), 3);
451 }
452}