Rust (rust-data-processing / rdp_jvm_sys)
performs I/O. Python and Java are thin
wrappers: connection strings and pipeline JSON cross the boundary;
wrappers do not reimplement drivers.
Cloud auth: URIs go in JSON; tokens and keys
do not — Rust reads system/OS environment
variables on the process that loads the native library (not
Java-specific). Use shell export, Docker
--env-file, or Kubernetes env / Secrets.
Per-platform guides: AMAZON_S3.md · AZURE_ADLS.md · SNOWFLAKE.md · overview CLOUD_AUTH.md.
Fake credentials below are placeholders only — do not use in production.
| Connector | Shared connection / URI | Primary auth (examples) |
|---|---|---|
| PostgreSQL | postgresql://etl_user:FAKE_PG_PASS@db01.example.com:5432/analytics?sslmode=require |
User + password in URL |
| Oracle | oracle://etl_user:FAKE_ORA_PASS@db01.example.com:1521/ORCLPDB1 |
User + password in URL (ConnectorX) |
| SQL Server | mssql://etl_user:FAKE_SQL_PASS@db01.example.com:1433/warehouse?encrypt=true |
User + password in URL |
| Snowflake | Account
https://xy12345.us-east-1.snowflakecomputing.com · stage
s3://demo-bucket-us-east-1/snowflake-stage/rdp/ |
OS env: stage AWS_* + optional SNOWFLAKE_*
— SNOWFLAKE.md |
| Databricks | Workspace
https://dbc-a1b2c3d4-e5f6.cloud.databricks.com · warehouse
abfss://datalake@storacc01.dfs.core.windows.net/unity/ |
Storage: OS env AZURE_* or
AWS_* for warehouse URI (CLOUD_AUTH.md); PAT is outside in-tree sink
I/O |
| Spark | Master spark://spark-master.example.com:7077 · handoff
s3://demo-bucket-us-east-1/spark-handoff/out.parquet |
Cluster auth is in your Spark submit config (Kerberos, token, etc.) — not on the Rust FFI boundary |
| Amazon S3 | s3://demo-bucket-us-east-1/rdp/incoming/part-00000.parquet |
OS env: AWS_* (or IAM role on host/pod) — AMAZON_S3.md |
| Google Cloud Storage | gs://demo-gcs-project/rdp/incoming/part-00000.parquet |
OS env: GOOGLE_APPLICATION_CREDENTIALS — CLOUD_AUTH.md |
| Azure Blob / ADLS | abfss://container@storacc01.dfs.core.windows.net/rdp/incoming/part-00000.parquet |
OS env: AZURE_* — AZURE_ADLS.md |
| SFTP | sftp://etl_user:FAKE_SFTP_PASS@sftp.example.com:22/rdp/incoming/data.parquet |
User + password or SFTP_PRIVATE_KEY_PATH env — see CLOUD_AUTH.md |
| FTP | ftp://etl_user:FAKE_FTP_PASS@ftp.example.com:21/rdp/incoming/data.parquet |
User + password (FTP_PASSWORD env) — see CLOUD_AUTH.md |
| Kafka (streaming) | localhost:9092 + topic events (bootstrap
servers) |
SASL/SSL via librdkafka config — KAFKA_ELT.md (ELT, not batch ETL) |
Warehouse SQL (same text in all languages where applicable):
SELECT id, name, amount FROM demo.fact_scores WHERE amount > 0 LIMIT 100000;Build features: Validated by
scripts/check_jvm_full_features.py (JVM, Rust, Python).
Rust integration: integration_full
(db_connectorx + cloud_connectors +
excel). Python integration:
integration_full (db + cloud).
JVM / CI: rdp_jvm_sys --features full —
all batch connectors below (db_connectorx,
sink_postgres, sink_oracle,
cloud_connectors, sql, excel).
Kafka streaming ELT: add --features kafka
— see KAFKA_ELT.md (poll
windows, not file batches).
Tri-language tests under integration_testing/ run
Java, Python, and Rust against Docker emulators. Rust
performs all protocol I/O; Java and Python call
rdp_run_pipeline_json or Kafka FFI
(librdp_jvm_sys).
| Suite | Run command | Protocols / pattern |
|---|---|---|
| CloudConnectors | python3 integration_testing/CloudConnectors/run_cloud_tests.py --no-rancher |
S3 (MinIO), GCS (fake-gcs),
Azure (Azurite): CSV → transform →
kind: object_store export → read-back via
object_store_uris. SFTP / FTP: import
seeded CSV via file_transfer_uris. |
| Kafka | python3 integration_testing/Kafka/run_kafka_tests.py --no-rancher |
One Uber CSV row per message →
rdp_kafka_export_dataset_json →
rdp_kafka_poll_window_loaded_json (Redpanda). |
| Snowflake / Databricks / Spark | run_snowflake_tests.py,
run_databricks_tests.py,
run_spark_tests.py |
Platform kind: sinks → MinIO / Spark verify. |
| SQL Server / Oracle / PostgreSQL | run_mssql_tests.py, run_oracle_tests.py,
PostgreSQL/run_tests.py |
DB sink or ConnectorX import. |
Prerequisites:
python3 integration_testing/scripts/build_libs/build_all_libs.py,
Uber sample CSV (download_uber_data.py --sample). See integration_testing/README.md
and integration_testing/integration_testing_details.md.
Reference: integration_testing/scripts/cloud_pipeline.py
— used by all three languages in CloudConnectors.
Export (local CSV → transform → cloud Parquet):
{
"pipeline_spec_version": 1,
"sources": {
"paths": ["/path/to/uber_nyc_pickups_sample.csv"],
"schema": { "fields": [ … raw CSV columns … ] },
"options": { "format": "csv", "max_rows": 500 }
},
"transform": { "sql": "SELECT \"Date/Time\" AS pickup_time, Lat AS lat, Lon AS lon, Base AS base_code FROM df" },
"sinks": [{ "kind": "object_store", "uri": "s3://rdp-cloud-s3/out.parquet", "format": "parquet" }],
"orchestration": { "max_ingested_rows": 500 }
}Integration URIs (local emulators — see
integration_testing/CloudConnectors/.env.example):
| Protocol | Export / read URI | Auth (process env) |
|---|---|---|
| S3 | s3://rdp-cloud-s3/out.parquet |
AWS_* → MinIO :9000 |
| GCS | gs://rdp-cloud-gcs/out.parquet |
GOOGLE_APPLICATION_CREDENTIALS,
STORAGE_EMULATOR_HOST / gcs_base_url →
fake-gcs :4443 |
| Azure | azure://rdp-cloud-azure/out.parquet |
AZURE_STORAGE_USE_EMULATOR=true,
AZURE_ENDPOINT → Azurite :10000 |
Read-back uses
sources.object_store_uris with the curated
schema (transformed column names) and a local parquet_file
sink for verification.
Reference: cloud_pipeline.import_from_file_transfer —
sources.file_transfer_uris + file_transfer
Rust download.
{
"sources": {
"paths": [],
"file_transfer_uris": ["sftp://rdp:rdp_sftp_secret@127.0.0.1:2222/upload/incoming.csv"],
"schema": { "fields": [ … ] },
"options": { "format": "csv", "max_rows": 500 }
},
"sinks": [{ "kind": "parquet_file", "path": "/tmp/rdp-cloud-sftp-import.parquet" }]
}Reference: integration_testing/scripts/kafka_stream.py.
Dataset JSON uses tagged Rust Value cells:
{"Utf8": "…"}, {"Float64": 40.7}.
# Python integration helper (FFI to rdp_kafka_export_dataset_json / rdp_kafka_poll_window_loaded_json)
from kafka_stream import verify_uber_kafka_stream
count = verify_uber_kafka_stream("integration_testing/data/uber_nyc_pickups_sample.csv", max_rows=500)Java: KafkaEltStreamExample.java
· Rust: integration_testing/Kafka/rust/src/rdp_kafka.rs.
| Layer | Connector-only usage |
|---|---|
| Rust | ingest_from_db / sink postgresql:// |
| Python | ingest_from_db(conn, query, schema) — feature
db |
| Java | rdp_run_pipeline_json with
kind: postgresql sink (postgresql:// libpq
URL) |
use rust_data_processing::ingestion::{ingest_from_db_infer, IngestionOptions};
// --features db_connectorx
const URL: &str = "postgresql://etl_user:FAKE_PG_PASS@db01.example.com:5432/analytics?cxprotocol=binary";
const SQL: &str = "SELECT id, name, amount FROM public.fact_scores WHERE amount > 0 LIMIT 100000";
let ds = ingest_from_db_infer(URL, SQL, &IngestionOptions::default())?;
println!("rows={}", ds.row_count());import rust_data_processing as rdp
URL = "postgresql://etl_user:FAKE_PG_PASS@db01.example.com:5432/analytics?cxprotocol=binary"
SQL = "SELECT id, name, amount FROM public.fact_scores WHERE amount > 0 LIMIT 100000"
ds = rdp.ingest_from_db_infer(URL, SQL) # extension built with: maturin build --features db
print("rows", ds.row_count()){
"pipeline_spec_version": 1,
"sources": { "paths": ["{{LOCAL_OR_CLOUD_PATH}}"], "schema": { "fields": [] }, "options": { "format": "parquet" } },
"sinks": [
{
"kind": "postgresql",
"url": "postgresql://etl_user:FAKE_PG_PASS@db01.example.com:5432/analytics?sslmode=require",
"table": "public.fact_scores_curated"
}
]
}JSONObject root = RdpNativeJson.invokeRunPipelineJson(linker, lookup, arena, pipelineJson);| Layer | Notes |
|---|---|
| Rust / Python | ConnectorX oracle:// (ingest_from_db /
--features db) |
| Java | sources.db_reads (read) and
kind: oracle sink (write) — build
rdp_jvm_sys with
--features full (includes
db_connectorx + sink_oracle) |
const URL: &str = "oracle://etl_user:FAKE_ORA_PASS@db01.example.com:1521/ORCLPDB1";
const SQL: &str = "SELECT id, name, amount FROM demo.fact_scores WHERE ROWNUM <= 100000";
// ingest_from_db_infer(URL, SQL, &IngestionOptions::default())?;URL = "oracle://etl_user:FAKE_ORA_PASS@db01.example.com:1521/ORCLPDB1"
SQL = "SELECT id, name, amount FROM demo.fact_scores WHERE ROWNUM <= 100000"
# ds = rdp.ingest_from_db_infer(URL, SQL) # --features dbRust runs warehouse SQL via ConnectorX and row loads via OCI — use
oracle://. Build the native library with
DB read + sink enabled (CI uses
--features full):
cargo build -p rdp-jvm-sys --features fullRead (export to Parquet):
{
"pipeline_spec_version": 1,
"sources": {
"paths": [],
"db_reads": [
{
"url": "oracle://etl_user:FAKE_ORA_PASS@db01.example.com:1521/ORCLPDB1",
"query": "SELECT id, name, amount FROM demo.fact_scores WHERE ROWNUM <= 100000"
}
],
"schema_ref": "schemas/your_fact_scores.schema.json",
"options": {}
},
"transform": { "sql": "SELECT id, name, amount FROM df" },
"sinks": [{ "kind": "parquet_file", "path": "/var/rdp/curated/oracle_fact.parquet" }]
}Write (CSV/Parquet → Oracle table — same pattern as PostgreSQL sink):
{
"pipeline_spec_version": 1,
"sources": { "paths": ["{{LOCAL_OR_CLOUD_PATH}}"], "schema": { "fields": [] }, "options": { "format": "csv" } },
"transform": { "sql": "SELECT id, name, amount FROM df" },
"sinks": [
{
"kind": "oracle",
"url": "oracle://etl_user:FAKE_ORA_PASS@db01.example.com:1521/ORCLPDB1",
"table": "DEMO.FACT_SCORES_CURATED",
"create_table_if_missing": true,
"truncate_before_load": true
}
]
}JSONObject root = RdpNativeJson.invokeRunPipelineJson(linker, lookup, arena, pipelineJson);
// db_reads: root.optJSONArray("db_source_results")
// oracle sink: root.getJSONObject("interchange").optJSONArray("sink_results")Without db_connectorx,
db_reads returns
DB_CONNECTORX_NOT_BUILT. Without
sink_oracle, kind: oracle
returns ORACLE_SINK_NOT_BUILT. Use
--features full for both. Export query
results to a local file and use
sources.paths when you cannot rebuild.
const URL: &str = "mssql://etl_user:FAKE_SQL_PASS@db01.example.com:1433/warehouse?encrypt=true";
const SQL: &str = "SELECT TOP (100000) id, name, amount FROM dbo.fact_scores WHERE amount > 0";URL = "mssql://etl_user:FAKE_SQL_PASS@db01.example.com:1433/warehouse?encrypt=true"
SQL = "SELECT TOP (100000) id, name, amount FROM dbo.fact_scores WHERE amount > 0"
# ds = rdp.ingest_from_db_infer(URL, SQL)Same as Oracle: sources.db_reads with
ConnectorX mssql://:
{
"pipeline_spec_version": 1,
"sources": {
"paths": [],
"db_reads": [
{
"url": "mssql://etl_user:FAKE_SQL_PASS@db01.example.com:1433/warehouse?encrypt=true",
"query": "SELECT TOP (100000) id, name, amount FROM dbo.fact_scores WHERE amount > 0"
}
],
"schema_ref": "schemas/your_fact_scores.schema.json",
"options": {}
},
"sinks": [{ "kind": "parquet_file", "path": "/var/rdp/curated/mssql_fact.parquet" }]
}JSONObject root = RdpNativeJson.invokeRunPipelineJson(linker, lookup, arena, pipelineJson);Auth: Stage I/O uses the object-store
scheme of stage_uri (usually S3 →
AWS_*). Optional COPY INTO uses
SNOWFLAKE_USER / SNOWFLAKE_PASSWORD on the
same OS process — not in pipeline JSON. Full guide: SNOWFLAKE.md. Cross-cloud deployment:
CLOUD_AUTH.md.
Shared stage URI (Rust writes Parquet here):
s3://demo-bucket-us-east-1/snowflake-stage/rdp/load.parquet
use rust_data_processing::ingestion::{write_dataset_to_snowflake_stage, copy_into_table_from_stage};
// --features cloud_connectors
const ACCOUNT: &str = "https://xy12345.us-east-1.snowflakecomputing.com";
const STAGE: &str = "s3://demo-bucket-us-east-1/snowflake-stage/rdp/load.parquet";
// write_dataset_to_snowflake_stage(STAGE, &ds)?;
// copy_into_table_from_stage(ACCOUNT, Some("COMPUTE_WH"), Some("DEMO_DB"), Some("CURATED"), "FACT_SCORES", STAGE, Some("ETL_ROLE"))?;# Same ACCOUNT and STAGE; use JVM pipeline JSON or Rust until PyO3 exposes snowflake helpers.
STAGE = "s3://demo-bucket-us-east-1/snowflake-stage/rdp/load.parquet"{
"kind": "snowflake",
"account_url": "https://xy12345.us-east-1.snowflakecomputing.com",
"warehouse": "COMPUTE_WH",
"database": "DEMO_DB",
"schema": "CURATED",
"table": "FACT_SCORES",
"stage_uri": "s3://demo-bucket-us-east-1/snowflake-stage/rdp/",
"role": "ETL_ROLE"
}See PlatformConnectorsPipelineExample.java.
In-tree writes go to warehouse
(abfss:// or s3://) via
object_store — not via Databricks REST with a PAT.
workspace_url and catalog_uri in pipeline JSON
are metadata only. Azure AD / storage credentials: CLOUD_AUTH.md
— Databricks sink.
Workspace auth (outside in-tree I/O) — for notebooks, jobs, or Spark drivers you operate separately:
| Method | Example |
|---|---|
| Personal access token | dapiFAKE_DATABRICKS_PAT_abcdef0123456789 (HTTP header /
env in your tools) |
| OAuth (Azure AD) | App registration + client secret in Databricks workspace settings |
Shared warehouse path:
abfss://datalake@storacc01.dfs.core.windows.net/unity/curated/fact_scores/
use rust_data_processing::ingestion::{delta_table_uri, write_dataset_to_delta_table};
const WH: &str = "abfss://datalake@storacc01.dfs.core.windows.net/unity/";
let table_uri = delta_table_uri(WH, Some("curated"), "fact_scores");
// write_dataset_to_delta_table(&table_uri, &ds)?;WH = "abfss://datalake@storacc01.dfs.core.windows.net/unity/"
TABLE_URI = f"{WH.rstrip('/')}/curated/fact_scores/" # same layout as Rust delta_table_uri{
"kind": "databricks",
"workspace_url": "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com",
"catalog_uri": "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com/api/2.1/unity-catalog/iceberg",
"warehouse": "abfss://datalake@storacc01.dfs.core.windows.net/unity/",
"namespace": "main.curated",
"table": "fact_scores"
}Rust does not embed SparkSession. It
writes Parquet to handoff_uri; your Spark
driver reads it.
Auth: Rust write to handoff_uri follows
CLOUD_AUTH.md (S3 / Azure /
file://). Spark cluster login is separate — CLOUD_AUTH.md — Apache Spark
handoff.
Auth (in your Spark app, not in Rust):
| Deployment | Typical credentials |
|---|---|
| YARN / Kerberos | principal + keytab |
| Databricks | PAT or OAuth (cluster config) |
local[*] demo |
Often none |
Shared: handoff_uri =
s3://demo-bucket-us-east-1/spark-handoff/out.parquet ·
master =
spark://spark-master.example.com:7077
kind: spark){
"kind": "spark",
"master": "spark://spark-master.example.com:7077",
"app_name": "rdp-demo",
"handoff_uri": "s3://demo-bucket-us-east-1/spark-handoff/out.parquet"
}// rdp_run_pipeline_json → sink_results[].handoff_uri written by Rust
// spark.read().parquet(handoffUri) in your separate Spark moduleHANDOFF = "s3://demo-bucket-us-east-1/spark-handoff/out.parquet"
MASTER = "spark://spark-master.example.com:7077"
# Same pipeline JSON as Java via shared fixtures, or PySpark after Rust writeYes — S3 requires authentication. The
s3://bucket/key string in pipeline JSON is only the
location; it does not contain keys or
tokens. Full auth guide (OS env, Docker, K8s, IAM): AMAZON_S3.md. Cross-cloud notes: CLOUD_AUTH.md.
URI (location only):
s3://demo-bucket-us-east-1/rdp/incoming/part-00000.parquet
Integration (MinIO):
s3://rdp-cloud-s3/out.parquet with
AWS_ENDPOINT=http://127.0.0.1:9000 — see Integration validation.
use rust_data_processing::ingestion::{ingest_from_object_store_uri, export_dataset_to_object_store_uri, IngestionOptions};
// --features cloud_connectors
const URI: &str = "s3://demo-bucket-us-east-1/rdp/incoming/part-00000.parquet";
// let ds = ingest_from_object_store_uri(URI, &schema, &IngestionOptions { format: Some(IngestionFormat::Parquet), .. })?;
// export_dataset_to_object_store_uri("s3://demo-bucket-us-east-1/rdp/out/result.parquet", &ds)?;import rust_data_processing as rdp
URI = "s3://demo-bucket-us-east-1/rdp/incoming/part-00000.parquet"
schema = [{"name": "id", "data_type": "int64"}, {"name": "name", "data_type": "utf8"}]
ds = rdp.ingest_from_object_store_uri(URI, schema, {"format": "parquet"}) # --features cloud{
"sources": {
"paths": [],
"object_store_uris": ["s3://demo-bucket-us-east-1/rdp/incoming/part-00000.parquet"],
"schema": { "fields": [] },
"options": { "format": "parquet" }
},
"sinks": [{ "kind": "object_store", "uri": "s3://demo-bucket-us-east-1/rdp/out/", "format": "parquet" }]
}Auth: GOOGLE_APPLICATION_CREDENTIALS or
GCE/GKE workload identity on the process running Rust —
not in pipeline JSON. See CLOUD_AUTH.md — Google Cloud
Storage.
| Method | Typical setup |
|---|---|
| Service account JSON | GOOGLE_APPLICATION_CREDENTIALS=/path/to/fake-service-account.json |
| Workload identity | Metadata on GCE/GKE — no path in JSON |
URI:
gs://demo-gcs-project/rdp/incoming/part-00000.parquet
(alias gcs:// accepted in validation)
Local emulator (integration tests): set
STORAGE_EMULATOR_HOST=http://127.0.0.1:4443 and
gcs_base_url (or
GOOGLE_APPLICATION_CREDENTIALS pointing at a fake service
account with gcs_base_url). Rust uses the GCS JSON API
against fake-gcs-server
when those vars are set. Validated:
integration_testing/CloudConnectors/ →
gs://rdp-cloud-gcs/out.parquet.
const URI: &str = "gs://demo-gcs-project/rdp/incoming/part-00000.parquet";
// ingest_from_object_store_uri(URI, &schema, &opts)?;URI = "gs://demo-gcs-project/rdp/incoming/part-00000.parquet"
# ds = rdp.ingest_from_object_store_uri(URI, schema, {"format": "parquet"})"object_store_uris": ["gs://demo-gcs-project/rdp/incoming/part-00000.parquet"]Full guide: AZURE_ADLS.md (service principal env vars, managed identity, Docker/K8s, Java with no secrets in JSON). Cross-cloud notes: CLOUD_AUTH.md.
Quick reference:
| Method | Variables (on the process running rdp_jvm_sys / Rust /
Python) |
|---|---|
| Service principal | AZURE_TENANT_ID, AZURE_CLIENT_ID,
AZURE_CLIENT_SECRET,
AZURE_STORAGE_ACCOUNT_NAME |
| Account key | AZURE_STORAGE_ACCOUNT_NAME,
AZURE_STORAGE_ACCOUNT_KEY |
| Managed identity / CLI | Identity on host or az login — see CLOUD_AUTH.md |
URI:
abfss://container@storacc01.dfs.core.windows.net/rdp/incoming/part-00000.parquet
· azure://container/path also accepted
(Azurite integration uses
azure://rdp-cloud-azure/out.parquet)
Local emulator (integration tests):
AZURE_STORAGE_USE_EMULATOR=true,
AZURE_ENDPOINT=http://127.0.0.1:10000. Do not set a
truncated AZURE_STORAGE_ACCOUNT_KEY — Rust uses the
well-known Azurite key when emulator mode is on. Validated:
integration_testing/CloudConnectors/.
const URI: &str = "abfss://container@storacc01.dfs.core.windows.net/rdp/incoming/part-00000.parquet";URI = "abfss://container@storacc01.dfs.core.windows.net/rdp/incoming/part-00000.parquet""object_store_uris": ["abfss://container@storacc01.dfs.core.windows.net/rdp/incoming/part-00000.parquet"]Auth: CLOUD_AUTH.md —
SFTP. Pipeline field:
sources.file_transfer_uris (not
object_store_uris).
URL:
sftp://etl_user:FAKE_SFTP_PASS@sftp.example.com:22/rdp/incoming/data.parquet
| Auth | Example |
|---|---|
| Password | URL userinfo or SFTP_PASSWORD env (overrides URL
password) |
| SSH private key | SFTP_PRIVATE_KEY_PATH — not in JSON |
use rust_data_processing::ingestion::{ingest_from_file_transfer_uri, IngestionOptions};
// cargo run --features cloud_connectors --example file_transfer_ingest -- 'sftp://...'
const URI: &str = "sftp://etl_user:FAKE_SFTP_PASS@sftp.example.com:22/rdp/incoming/data.parquet";
let ds = ingest_from_file_transfer_uri(URI, &schema, &IngestionOptions::default())?;import rust_data_processing as rdp # maturin build --features cloud
URI = "sftp://etl_user:FAKE_SFTP_PASS@sftp.example.com:22/rdp/incoming/data.parquet"
ds = rdp.ingest_from_file_transfer_uri(URI, schema, {"format": "parquet"})"file_transfer_uris": ["sftp://etl_user:FAKE_SFTP_PASS@sftp.example.com:22/rdp/incoming/data.parquet"]See SftpFtpConnectorsExample.java
and fixture tests/fixtures/file_transfer/.
Docker-validated:
integration_testing/CloudConnectors/ (seeded CSV on
SFTP/FTP containers).
Auth: CLOUD_AUTH.md — FTP / FTPS.
Pipeline field:
sources.file_transfer_uris.
URL:
ftp://etl_user:FAKE_FTP_PASS@ftp.example.com:21/rdp/incoming/data.parquet
· FTPS: ftps://… (port 990 default)
| Mode | Notes |
|---|---|
| Plain FTP | ftp:// |
| Explicit TLS (FTPS) | ftps:// — requires cloud_connectors
(rustls) |
| Anonymous | Omit user; set FTP_USER if needed (discouraged) |
const URI: &str = "ftp://etl_user:FAKE_FTP_PASS@ftp.example.com:21/rdp/incoming/data.parquet";
let ds = ingest_from_file_transfer_uri(URI, &schema, &IngestionOptions::default())?;ds = rdp.ingest_from_file_transfer_uri(
"ftp://etl_user:FAKE_FTP_PASS@ftp.example.com:21/rdp/incoming/data.parquet",
schema,
{"format": "parquet"},
)"file_transfer_uris": ["ftp://etl_user:FAKE_FTP_PASS@ftp.example.com:21/rdp/incoming/data.parquet"]Not a batch file connector — poll windows from a topic, land rows, transform separately. Full guide: KAFKA_ELT.md.
| Language | Entry point | Integration reference |
|---|---|---|
| Rust | export_dataset_to_kafka,
poll_kafka_window_loaded |
integration_testing/Kafka/rust/src/rdp_kafka.rs |
| Python | rdp.export_dataset_to_kafka,
rdp.poll_kafka_window_loaded (feature
kafka) |
integration_testing/scripts/kafka_stream.py |
| Java | rdp_kafka_export_dataset_json,
rdp_kafka_poll_window_loaded_json (build
full,kafka) |
KafkaStreamIntegrationTest, KafkaEltStreamExample.java |
Run:
python3 integration_testing/Kafka/run_kafka_tests.py --no-rancher
(Redpanda on 127.0.0.1:19092).
docs/java/EXAMPLES.md ·
PlatformConnectorsPipelineExample.java
· fixture bundle tests/fixtures/cloud_connectors/.
AWS_*, IAM role, Docker, K8s)AZURE_*, Docker, K8s)COPY INTOdocs/java/EXAMPLES.md —
connector cookbook with warehouse vs Polars SQLpython-wrapper/API.md —
Python ingestion APIdocs/adr/006-jvm-orchestration-pipeline-json.md
— pipeline sink taxonomy