Skip to main content

rust_data_processing/ingestion/
xml.rs

1//! Row-oriented XML interchange (`<rdp_records><record>…</record></rdp_records>`).
2//!
3//! Field names in XML elements match [`Schema`] field names. Values are encoded as UTF-8 text.
4
5use std::collections::HashMap;
6use std::fs::File;
7use std::io::{BufReader, Write};
8use std::path::Path;
9
10use quick_xml::Writer;
11use quick_xml::events::{BytesDecl, BytesEnd, BytesStart, BytesText, Event};
12
13use crate::error::{IngestionError, IngestionResult};
14use crate::types::{DataSet, DataType, Schema, Value};
15
16const ROOT: &str = "rdp_records";
17const RECORD: &str = "record";
18
19fn xml_err(message: impl Into<String>) -> IngestionError {
20    IngestionError::SchemaMismatch {
21        message: message.into(),
22    }
23}
24
25fn value_to_text(v: &Value) -> String {
26    match v {
27        Value::Null => String::new(),
28        Value::Int64(i) => i.to_string(),
29        Value::Float64(f) => f.to_string(),
30        Value::Bool(b) => b.to_string(),
31        Value::Utf8(s) => s.clone(),
32    }
33}
34
35fn parse_text_to_value(raw: &str, dt: &DataType) -> IngestionResult<Value> {
36    let trimmed = raw.trim();
37    if trimmed.is_empty() {
38        return Ok(Value::Null);
39    }
40    match dt {
41        DataType::Int64 => trimmed
42            .parse::<i64>()
43            .map(Value::Int64)
44            .map_err(|_| xml_err(format!("invalid Int64 in XML: {trimmed:?}"))),
45        DataType::Float64 => trimmed
46            .parse::<f64>()
47            .map(Value::Float64)
48            .map_err(|_| xml_err(format!("invalid Float64 in XML: {trimmed:?}"))),
49        DataType::Bool => match trimmed.to_ascii_lowercase().as_str() {
50            "true" | "1" | "yes" => Ok(Value::Bool(true)),
51            "false" | "0" | "no" => Ok(Value::Bool(false)),
52            _ => Err(xml_err(format!("invalid Bool in XML: {trimmed:?}"))),
53        },
54        DataType::Utf8 => Ok(Value::Utf8(trimmed.to_string())),
55    }
56}
57
58fn row_from_field_map(
59    schema: &Schema,
60    fields: &HashMap<String, String>,
61) -> IngestionResult<Vec<Value>> {
62    let mut row = Vec::with_capacity(schema.fields.len());
63    for field in &schema.fields {
64        let raw = fields.get(&field.name).map(String::as_str).unwrap_or("");
65        row.push(parse_text_to_value(raw, &field.data_type)?);
66    }
67    Ok(row)
68}
69
70/// Write a [`DataSet`] to a single XML file (see module docs for layout).
71pub fn export_dataset_to_xml(path: &Path, ds: &DataSet) -> IngestionResult<()> {
72    let mut file = File::create(path)?;
73    let mut writer = Writer::new_with_indent(&mut file, b' ', 2);
74    writer
75        .write_event(Event::Decl(BytesDecl::new("1.0", Some("UTF-8"), None)))
76        .map_err(|e| xml_err(format!("write xml decl: {e}")))?;
77    writer
78        .write_event(Event::Start(BytesStart::new(ROOT)))
79        .map_err(|e| xml_err(format!("write xml root: {e}")))?;
80
81    for row in &ds.rows {
82        writer
83            .write_event(Event::Start(BytesStart::new(RECORD)))
84            .map_err(|e| xml_err(format!("write record start: {e}")))?;
85        for (field, cell) in ds.schema.fields.iter().zip(row.iter()) {
86            writer
87                .write_event(Event::Start(BytesStart::new(field.name.as_str())))
88                .map_err(|e| xml_err(format!("write field start: {e}")))?;
89            let text = value_to_text(cell);
90            if !text.is_empty() {
91                writer
92                    .write_event(Event::Text(BytesText::new(&text)))
93                    .map_err(|e| xml_err(format!("write field text: {e}")))?;
94            }
95            writer
96                .write_event(Event::End(BytesEnd::new(field.name.as_str())))
97                .map_err(|e| xml_err(format!("write field end: {e}")))?;
98        }
99        writer
100            .write_event(Event::End(BytesEnd::new(RECORD)))
101            .map_err(|e| xml_err(format!("write record end: {e}")))?;
102    }
103
104    writer
105        .write_event(Event::End(BytesEnd::new(ROOT)))
106        .map_err(|e| xml_err(format!("write xml root end: {e}")))?;
107    writer
108        .write_event(Event::Eof)
109        .map_err(|e| xml_err(format!("write xml eof: {e}")))?;
110    file.flush()?;
111    Ok(())
112}
113
114/// Ingest row-oriented XML into a [`DataSet`] using the provided [`Schema`].
115pub fn ingest_xml_from_path(path: impl AsRef<Path>, schema: &Schema) -> IngestionResult<DataSet> {
116    let file = File::open(path.as_ref())?;
117    let reader = BufReader::new(file);
118    let mut xml = quick_xml::Reader::from_reader(reader);
119    xml.config_mut().trim_text(true);
120
121    let mut rows: Vec<Vec<Value>> = Vec::new();
122    let mut in_record = false;
123    let mut field_values: HashMap<String, String> = HashMap::new();
124    let mut active_field: Option<String> = None;
125    let mut buf = Vec::new();
126
127    loop {
128        buf.clear();
129        match xml.read_event_into(&mut buf) {
130            Ok(Event::Start(e)) => {
131                let name = String::from_utf8_lossy(e.name().as_ref()).into_owned();
132                if name == RECORD {
133                    in_record = true;
134                    field_values.clear();
135                } else if in_record && name != ROOT {
136                    active_field = Some(name);
137                }
138            }
139            Ok(Event::Text(e)) => {
140                if let Some(ref field) = active_field {
141                    let text = e
142                        .unescape()
143                        .map_err(|err| xml_err(format!("xml text: {err}")))?;
144                    field_values
145                        .entry(field.clone())
146                        .and_modify(|v| {
147                            v.push(' ');
148                            v.push_str(&text);
149                        })
150                        .or_insert_with(|| text.into_owned());
151                }
152            }
153            Ok(Event::End(e)) => {
154                let name = String::from_utf8_lossy(e.name().as_ref()).into_owned();
155                if name == RECORD && in_record {
156                    rows.push(row_from_field_map(schema, &field_values)?);
157                    in_record = false;
158                    active_field = None;
159                } else if active_field.as_deref() == Some(name.as_str()) {
160                    active_field = None;
161                }
162            }
163            Ok(Event::Eof) => break,
164            Err(e) => return Err(xml_err(format!("xml parse: {e}"))),
165            _ => {}
166        }
167    }
168
169    Ok(DataSet::new(schema.clone(), rows))
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175    use crate::types::Field;
176
177    fn sample_schema() -> Schema {
178        Schema::new(vec![
179            Field::new("stationCode", DataType::Utf8),
180            Field::new("lat", DataType::Float64),
181            Field::new("label", DataType::Utf8),
182        ])
183    }
184
185    #[test]
186    fn xml_roundtrip_preserves_rows() {
187        let schema = sample_schema();
188        let ds = DataSet::new(
189            schema.clone(),
190            vec![vec![
191                Value::Utf8("ACW00011604".into()),
192                Value::Float64(17.1167),
193                Value::Utf8("ST JOHNS".into()),
194            ]],
195        );
196        let dir = std::env::temp_dir().join(format!("rdp_xml_test_{}", std::process::id()));
197        std::fs::create_dir_all(&dir).unwrap();
198        let path = dir.join("stations.xml");
199        export_dataset_to_xml(&path, &ds).unwrap();
200        let back = ingest_xml_from_path(&path, &schema).unwrap();
201        assert_eq!(back.row_count(), 1);
202        assert_eq!(back.rows[0][0], Value::Utf8("ACW00011604".into()));
203        std::fs::remove_dir_all(dir).ok();
204    }
205
206    #[test]
207    fn xml_ingest_reads_committed_fixture_shape() {
208        let manifest = Path::new(env!("CARGO_MANIFEST_DIR"));
209        let path = manifest.join("tests/fixtures/ghcn/ghcn_stations_sample.json");
210        if !path.exists() {
211            return;
212        }
213        let schema = Schema::new(vec![
214            Field::new("id", DataType::Utf8),
215            Field::new("latitude", DataType::Float64),
216            Field::new("longitude", DataType::Float64),
217            Field::new("elevation", DataType::Float64),
218            Field::new("name", DataType::Utf8),
219            Field::new("state", DataType::Utf8),
220        ]);
221        let json_ds = crate::ingestion::json::ingest_json_from_path(&path, &schema).unwrap();
222        let xml_path = manifest.join("tests/fixtures/ghcn/_tmp_roundtrip.xml");
223        export_dataset_to_xml(&xml_path, &json_ds).unwrap();
224        let xml_schema = Schema::new(vec![
225            Field::new("stationCode", DataType::Utf8),
226            Field::new("lat", DataType::Float64),
227            Field::new("lon", DataType::Float64),
228            Field::new("elev_m", DataType::Float64),
229            Field::new("label", DataType::Utf8),
230            Field::new("region", DataType::Utf8),
231        ]);
232        let xml_ds = DataSet::new(
233            xml_schema.clone(),
234            json_ds
235                .rows
236                .iter()
237                .map(|row| {
238                    vec![
239                        row[0].clone(),
240                        row[1].clone(),
241                        row[2].clone(),
242                        row[3].clone(),
243                        row[4].clone(),
244                        row[5].clone(),
245                    ]
246                })
247                .collect(),
248        );
249        export_dataset_to_xml(&xml_path, &xml_ds).unwrap();
250        let read = ingest_xml_from_path(&xml_path, &xml_schema).unwrap();
251        assert_eq!(read.row_count(), json_ds.row_count());
252        std::fs::remove_file(xml_path).ok();
253    }
254}