![]() ![]() The driverLocations stream has a relatively compact schema, and it doesn’tĬontain much data that a human would find particularly useful. Enrich driverLocations stream by joining with PostgreSQL data ¶ Next, unzip the downloaded archive:ĬREATE STREAM driverLocations ( driver_id INTEGER, latitude DOUBLE, longitude DOUBLE, speed DOUBLE ) WITH ( kafka_topic = 'driver_locations', value_format = 'json', partitions = 1, key = 'driver_id' ) CREATE STREAM riderLocations ( driver_id INTEGER, latitude DOUBLE, longitude DOUBLE ) WITH ( kafka_topic = 'rider_locations', value_format = 'json', partitions = 1, key = 'driver_id' ) ġ2. connect.properties:/connect/connect.properties ksqldb-cli : image : confluentinc/ksqldb-cli:0.6.0 container_name : ksqldb-cli depends_on : - broker - ksqldb-server entrypoint : /bin/sh tty : true postgres : image : postgres:12 hostname : postgres container_name : postgres ports : - "5432:5432" confluentinc-kafka-connect-jdbc-5.3.2:/usr/share/kafka/plugins/jdbc. ![]() version : '2' services : zookeeper : image : confluentinc/cp-zookeeper:5.3.2 hostname : zookeeper container_name : zookeeper ports : - "2181:2181" environment : ZOOKEEPER_CLIENT_PORT : 2181 ZOOKEEPER_TICK_TIME : 2000 broker : image : confluentinc/cp-enterprise-kafka:5.3.2 hostname : broker container_name : broker depends_on : - zookeeper ports : - "29092:29092" environment : KAFKA_BROKER_ID : 1 KAFKA_ZOOKEEPER_CONNECT : 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP : PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS : PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR : 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS : 0 ksqldb-server : image : confluentinc/ksqldb-server:0.6.0 hostname : ksqldb-server container_name : ksqldb-server depends_on : - broker ports : - "8088:8088" environment : KSQL_LISTENERS : KSQL_BOOTSTRAP_SERVERS : broker:9092 KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE : "true" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE : "true" KSQL_KSQL_CONNECT_WORKER_CONFIG : "/connect/connect.properties" volumes :. Enrich driverLocations stream by joining with PostgreSQL data Create streams for driver locations and rider locationsġ2. Populate PostgreSQL with vehicle/driver dataġ1. ![]() Implement a User-defined Function (UDF, UDAF, and UDTF)Ħ. ![]()
0 Comments
Leave a Reply. |