Skip to main content

rust_data_processing/kafka/
mod.rs

1//! Kafka **streaming ELT** (Extract → Load → Transform).
2//!
3//! Kafka is **not** a file connector. RDP treats it like stream frameworks (Flink, Kafka Streams):
4//!
5//! 1. **Extract** — poll a bounded window from a topic (`poll_kafka_window`) or accept records from
6//!    your host consumer (`elt_load_kafka_records`).
7//! 2. **Load** — land raw/semi-structured rows to storage (Parquet, Postgres `COPY`, object store)
8//!    with offsets preserved — **no heavy transform in the hot path**.
9//! 3. **Transform** — run Polars SQL / pipeline JSON on landed data in a **separate** job or stage.
10//!
11//! **Poll window** (`Vec<KafkaStreamRecord>`) is backpressure / checkpoint sizing — not batch ETL.
12//!
13//! Enable native I/O with **`--features kafka`**. See [`docs/KAFKA_ELT.md`](../../docs/KAFKA_ELT.md).
14
15#[cfg(feature = "kafka")]
16mod builder;
17#[cfg(feature = "kafka")]
18mod consumer;
19#[cfg(feature = "kafka")]
20mod load;
21#[cfg(feature = "kafka")]
22mod producer;
23#[cfg(feature = "kafka")]
24mod record;
25
26#[cfg(feature = "kafka")]
27pub use builder::{KafkaConsumerBuilder, KafkaProducerBuilder};
28#[cfg(feature = "kafka")]
29pub use consumer::{poll_kafka_window, poll_kafka_window_loaded};
30#[cfg(feature = "kafka")]
31pub use load::{
32    elt_load_kafka_records, elt_load_kafka_records_json, ingest_from_external_kafka_batches,
33    ingest_from_external_kafka_batches_json,
34};
35#[cfg(feature = "kafka")]
36pub use producer::export_dataset_to_kafka;
37#[cfg(feature = "kafka")]
38pub use record::{BytesTopicBatch, KafkaHeader, KafkaStreamRecord};
39
40/// Back-compat aliases.
41#[cfg(feature = "kafka")]
42pub use consumer::{consume_micro_batch, consume_micro_batch_as_dataset};
43
44/// **Extract** side: poll the next bounded window from a native or test consumer.
45#[cfg(feature = "kafka")]
46pub trait KafkaStreamSource {
47    fn poll_window(
48        &mut self,
49        max_records: usize,
50    ) -> crate::error::IngestionResult<Vec<KafkaStreamRecord>>;
51}
52
53/// Back-compat trait name.
54#[cfg(feature = "kafka")]
55pub trait KafkaBatchSource: KafkaStreamSource {
56    fn next_batch(
57        &mut self,
58        max_records: usize,
59    ) -> crate::error::IngestionResult<Vec<KafkaStreamRecord>> {
60        self.poll_window(max_records)
61    }
62}
63
64#[cfg(feature = "kafka")]
65impl<T: KafkaStreamSource + ?Sized> KafkaBatchSource for T {}
66
67#[cfg(not(feature = "kafka"))]
68mod disabled {
69    use crate::error::{IngestionError, IngestionResult};
70    use crate::types::{DataSet, Schema};
71
72    fn disabled_err() -> IngestionError {
73        IngestionError::SchemaMismatch {
74            message: "kafka support is disabled; enable Cargo feature 'kafka'".to_string(),
75        }
76    }
77
78    pub fn elt_load_kafka_records_json(_json: &str, _schema: &Schema) -> IngestionResult<DataSet> {
79        Err(disabled_err())
80    }
81}
82
83#[cfg(not(feature = "kafka"))]
84pub use disabled::elt_load_kafka_records_json;