rust_data_processing/kafka/
mod.rs1#[cfg(feature = "kafka")]
16mod builder;
17#[cfg(feature = "kafka")]
18mod consumer;
19#[cfg(feature = "kafka")]
20mod load;
21#[cfg(feature = "kafka")]
22mod producer;
23#[cfg(feature = "kafka")]
24mod record;
25
26#[cfg(feature = "kafka")]
27pub use builder::{KafkaConsumerBuilder, KafkaProducerBuilder};
28#[cfg(feature = "kafka")]
29pub use consumer::{poll_kafka_window, poll_kafka_window_loaded};
30#[cfg(feature = "kafka")]
31pub use load::{
32 elt_load_kafka_records, elt_load_kafka_records_json, ingest_from_external_kafka_batches,
33 ingest_from_external_kafka_batches_json,
34};
35#[cfg(feature = "kafka")]
36pub use producer::export_dataset_to_kafka;
37#[cfg(feature = "kafka")]
38pub use record::{BytesTopicBatch, KafkaHeader, KafkaStreamRecord};
39
40#[cfg(feature = "kafka")]
42pub use consumer::{consume_micro_batch, consume_micro_batch_as_dataset};
43
44#[cfg(feature = "kafka")]
46pub trait KafkaStreamSource {
47 fn poll_window(
48 &mut self,
49 max_records: usize,
50 ) -> crate::error::IngestionResult<Vec<KafkaStreamRecord>>;
51}
52
53#[cfg(feature = "kafka")]
55pub trait KafkaBatchSource: KafkaStreamSource {
56 fn next_batch(
57 &mut self,
58 max_records: usize,
59 ) -> crate::error::IngestionResult<Vec<KafkaStreamRecord>> {
60 self.poll_window(max_records)
61 }
62}
63
64#[cfg(feature = "kafka")]
65impl<T: KafkaStreamSource + ?Sized> KafkaBatchSource for T {}
66
67#[cfg(not(feature = "kafka"))]
68mod disabled {
69 use crate::error::{IngestionError, IngestionResult};
70 use crate::types::{DataSet, Schema};
71
72 fn disabled_err() -> IngestionError {
73 IngestionError::SchemaMismatch {
74 message: "kafka support is disabled; enable Cargo feature 'kafka'".to_string(),
75 }
76 }
77
78 pub fn elt_load_kafka_records_json(_json: &str, _schema: &Schema) -> IngestionResult<DataSet> {
79 Err(disabled_err())
80 }
81}
82
83#[cfg(not(feature = "kafka"))]
84pub use disabled::elt_load_kafka_records_json;