rust_data_processing/ingestion/
xml.rs1use std::collections::HashMap;
6use std::fs::File;
7use std::io::{BufReader, Write};
8use std::path::Path;
9
10use quick_xml::Writer;
11use quick_xml::events::{BytesDecl, BytesEnd, BytesStart, BytesText, Event};
12
13use crate::error::{IngestionError, IngestionResult};
14use crate::types::{DataSet, DataType, Schema, Value};
15
16const ROOT: &str = "rdp_records";
17const RECORD: &str = "record";
18
19fn xml_err(message: impl Into<String>) -> IngestionError {
20 IngestionError::SchemaMismatch {
21 message: message.into(),
22 }
23}
24
25fn value_to_text(v: &Value) -> String {
26 match v {
27 Value::Null => String::new(),
28 Value::Int64(i) => i.to_string(),
29 Value::Float64(f) => f.to_string(),
30 Value::Bool(b) => b.to_string(),
31 Value::Utf8(s) => s.clone(),
32 }
33}
34
35fn parse_text_to_value(raw: &str, dt: &DataType) -> IngestionResult<Value> {
36 let trimmed = raw.trim();
37 if trimmed.is_empty() {
38 return Ok(Value::Null);
39 }
40 match dt {
41 DataType::Int64 => trimmed
42 .parse::<i64>()
43 .map(Value::Int64)
44 .map_err(|_| xml_err(format!("invalid Int64 in XML: {trimmed:?}"))),
45 DataType::Float64 => trimmed
46 .parse::<f64>()
47 .map(Value::Float64)
48 .map_err(|_| xml_err(format!("invalid Float64 in XML: {trimmed:?}"))),
49 DataType::Bool => match trimmed.to_ascii_lowercase().as_str() {
50 "true" | "1" | "yes" => Ok(Value::Bool(true)),
51 "false" | "0" | "no" => Ok(Value::Bool(false)),
52 _ => Err(xml_err(format!("invalid Bool in XML: {trimmed:?}"))),
53 },
54 DataType::Utf8 => Ok(Value::Utf8(trimmed.to_string())),
55 }
56}
57
58fn row_from_field_map(
59 schema: &Schema,
60 fields: &HashMap<String, String>,
61) -> IngestionResult<Vec<Value>> {
62 let mut row = Vec::with_capacity(schema.fields.len());
63 for field in &schema.fields {
64 let raw = fields.get(&field.name).map(String::as_str).unwrap_or("");
65 row.push(parse_text_to_value(raw, &field.data_type)?);
66 }
67 Ok(row)
68}
69
70pub fn export_dataset_to_xml(path: &Path, ds: &DataSet) -> IngestionResult<()> {
72 let mut file = File::create(path)?;
73 let mut writer = Writer::new_with_indent(&mut file, b' ', 2);
74 writer
75 .write_event(Event::Decl(BytesDecl::new("1.0", Some("UTF-8"), None)))
76 .map_err(|e| xml_err(format!("write xml decl: {e}")))?;
77 writer
78 .write_event(Event::Start(BytesStart::new(ROOT)))
79 .map_err(|e| xml_err(format!("write xml root: {e}")))?;
80
81 for row in &ds.rows {
82 writer
83 .write_event(Event::Start(BytesStart::new(RECORD)))
84 .map_err(|e| xml_err(format!("write record start: {e}")))?;
85 for (field, cell) in ds.schema.fields.iter().zip(row.iter()) {
86 writer
87 .write_event(Event::Start(BytesStart::new(field.name.as_str())))
88 .map_err(|e| xml_err(format!("write field start: {e}")))?;
89 let text = value_to_text(cell);
90 if !text.is_empty() {
91 writer
92 .write_event(Event::Text(BytesText::new(&text)))
93 .map_err(|e| xml_err(format!("write field text: {e}")))?;
94 }
95 writer
96 .write_event(Event::End(BytesEnd::new(field.name.as_str())))
97 .map_err(|e| xml_err(format!("write field end: {e}")))?;
98 }
99 writer
100 .write_event(Event::End(BytesEnd::new(RECORD)))
101 .map_err(|e| xml_err(format!("write record end: {e}")))?;
102 }
103
104 writer
105 .write_event(Event::End(BytesEnd::new(ROOT)))
106 .map_err(|e| xml_err(format!("write xml root end: {e}")))?;
107 writer
108 .write_event(Event::Eof)
109 .map_err(|e| xml_err(format!("write xml eof: {e}")))?;
110 file.flush()?;
111 Ok(())
112}
113
114pub fn ingest_xml_from_path(path: impl AsRef<Path>, schema: &Schema) -> IngestionResult<DataSet> {
116 let file = File::open(path.as_ref())?;
117 let reader = BufReader::new(file);
118 let mut xml = quick_xml::Reader::from_reader(reader);
119 xml.config_mut().trim_text(true);
120
121 let mut rows: Vec<Vec<Value>> = Vec::new();
122 let mut in_record = false;
123 let mut field_values: HashMap<String, String> = HashMap::new();
124 let mut active_field: Option<String> = None;
125 let mut buf = Vec::new();
126
127 loop {
128 buf.clear();
129 match xml.read_event_into(&mut buf) {
130 Ok(Event::Start(e)) => {
131 let name = String::from_utf8_lossy(e.name().as_ref()).into_owned();
132 if name == RECORD {
133 in_record = true;
134 field_values.clear();
135 } else if in_record && name != ROOT {
136 active_field = Some(name);
137 }
138 }
139 Ok(Event::Text(e)) => {
140 if let Some(ref field) = active_field {
141 let text = e
142 .unescape()
143 .map_err(|err| xml_err(format!("xml text: {err}")))?;
144 field_values
145 .entry(field.clone())
146 .and_modify(|v| {
147 v.push(' ');
148 v.push_str(&text);
149 })
150 .or_insert_with(|| text.into_owned());
151 }
152 }
153 Ok(Event::End(e)) => {
154 let name = String::from_utf8_lossy(e.name().as_ref()).into_owned();
155 if name == RECORD && in_record {
156 rows.push(row_from_field_map(schema, &field_values)?);
157 in_record = false;
158 active_field = None;
159 } else if active_field.as_deref() == Some(name.as_str()) {
160 active_field = None;
161 }
162 }
163 Ok(Event::Eof) => break,
164 Err(e) => return Err(xml_err(format!("xml parse: {e}"))),
165 _ => {}
166 }
167 }
168
169 Ok(DataSet::new(schema.clone(), rows))
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175 use crate::types::Field;
176
177 fn sample_schema() -> Schema {
178 Schema::new(vec![
179 Field::new("stationCode", DataType::Utf8),
180 Field::new("lat", DataType::Float64),
181 Field::new("label", DataType::Utf8),
182 ])
183 }
184
185 #[test]
186 fn xml_roundtrip_preserves_rows() {
187 let schema = sample_schema();
188 let ds = DataSet::new(
189 schema.clone(),
190 vec![vec![
191 Value::Utf8("ACW00011604".into()),
192 Value::Float64(17.1167),
193 Value::Utf8("ST JOHNS".into()),
194 ]],
195 );
196 let dir = std::env::temp_dir().join(format!("rdp_xml_test_{}", std::process::id()));
197 std::fs::create_dir_all(&dir).unwrap();
198 let path = dir.join("stations.xml");
199 export_dataset_to_xml(&path, &ds).unwrap();
200 let back = ingest_xml_from_path(&path, &schema).unwrap();
201 assert_eq!(back.row_count(), 1);
202 assert_eq!(back.rows[0][0], Value::Utf8("ACW00011604".into()));
203 std::fs::remove_dir_all(dir).ok();
204 }
205
206 #[test]
207 fn xml_ingest_reads_committed_fixture_shape() {
208 let manifest = Path::new(env!("CARGO_MANIFEST_DIR"));
209 let path = manifest.join("tests/fixtures/ghcn/ghcn_stations_sample.json");
210 if !path.exists() {
211 return;
212 }
213 let schema = Schema::new(vec![
214 Field::new("id", DataType::Utf8),
215 Field::new("latitude", DataType::Float64),
216 Field::new("longitude", DataType::Float64),
217 Field::new("elevation", DataType::Float64),
218 Field::new("name", DataType::Utf8),
219 Field::new("state", DataType::Utf8),
220 ]);
221 let json_ds = crate::ingestion::json::ingest_json_from_path(&path, &schema).unwrap();
222 let xml_path = manifest.join("tests/fixtures/ghcn/_tmp_roundtrip.xml");
223 export_dataset_to_xml(&xml_path, &json_ds).unwrap();
224 let xml_schema = Schema::new(vec![
225 Field::new("stationCode", DataType::Utf8),
226 Field::new("lat", DataType::Float64),
227 Field::new("lon", DataType::Float64),
228 Field::new("elev_m", DataType::Float64),
229 Field::new("label", DataType::Utf8),
230 Field::new("region", DataType::Utf8),
231 ]);
232 let xml_ds = DataSet::new(
233 xml_schema.clone(),
234 json_ds
235 .rows
236 .iter()
237 .map(|row| {
238 vec![
239 row[0].clone(),
240 row[1].clone(),
241 row[2].clone(),
242 row[3].clone(),
243 row[4].clone(),
244 row[5].clone(),
245 ]
246 })
247 .collect(),
248 );
249 export_dataset_to_xml(&xml_path, &xml_ds).unwrap();
250 let read = ingest_xml_from_path(&xml_path, &xml_schema).unwrap();
251 assert_eq!(read.row_count(), json_ds.row_count());
252 std::fs::remove_file(xml_path).ok();
253 }
254}