Я успешно создал подключение к базе данных mariadb с помощью debezium и kafka
Когда я попытался транслировать тему с помощью pyspark, вот что я получил
Будет ли это проблемой, когда я захочу загрузить/транслировать эти данные в целевую базу данных? По сути, я пытаюсь сделать так: база данных mariadb -> kafka-debezium -> pyspark -> база данных mariadb
Вот мой code, с помощью которого я получил свой вывод:
Это мой docker compose
Вот что я надеялся увидеть, я получил этот вывод с помощью контейнера реестра схем
а затем
Но я не понимаю, как реплицировать его в pyspark
-------------------------------------------
Batch: 0
-------------------------------------------
+------+--------------------------------------------------------------------------------------------------------------------------+
|key |value |
+------+--------------------------------------------------------------------------------------------------------------------------+
||MaxDoe1.4.2.Final\nmysqlmariadb\btruebasecampemployees mysql-bin.000032�r�ȯݭd |
||\bJane\bMary1.4.2.Final\nmysqlmariadb\btruebasecampemployees mysql-bin.000032�r�ȯݭd |
||\nAliceJohnson1.4.2.Final\nmysqlmariadb\blastbasecampemployees mysql-bin.000032�r�ȯݭd |
||MaxDoe\bMaxxDoe1.4.2.Final\nmysqlmariadb���߭d\nfalsebasecampemployees mysql-bin.000032�\bu���߭d|
||\bMaxxDoeMaxDoe1.4.2.Final\nmysqlmariadb���߭d\nfalsebasecampemployees mysql-bin.000032�\ru��߭d |
||\bMaxxDoeMaxDoe1.4.2.Final\nmysqlmariadb���߭d\nfalsebasecampemployees mysql-bin.000032�\ru����d|
+------+--------------------------------------------------------------------------------------------------------------------------+
#pysparkkafka1
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import traceback
import time
try:
# Initialize the Spark session
spark = SparkSession.builder \
.appName("Kafka Spark Integration") \
.getOrCreate()
print("Spark session started")
# Define Kafka topic and server
kafka_topic = "mariadb.basecamp.employees"
kafka_server = "kafka:9092" # Replace with your Kafka server address if different
# Print the Kafka topic
print(f"Reading from Kafka topic: {kafka_topic}")
# Read from the Kafka topic
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_server) \
.option("subscribe", kafka_topic) \
.option("startingOffsets", "earliest") \
.load()
print("DataFrame created from Kafka topic")
# Select key and value and convert from bytes to string
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print("DataFrame transformed")
# Display the dataframe in console
query = df.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", False) \
.start()
print("Query started")
# Timeout for termination
timeout = 60 # 1 minute timeout
start_time = time.time()
while time.time() - start_time < timeout:
if query.isActive:
print("Streaming...")
time.sleep(10) # Check every 10 seconds
else:
break
query.stop()
print("Query stopped")
except Exception as e:
print("An error occurred:", e)
traceback.print_exc()
finally:
spark.stop()
print("Spark session stopped")
version: '3.8'
services:
mariadb:
image: mariadb:10.5
restart: always
environment:
MYSQL_ROOT_PASSWORD: password
MYSQL_DATABASE: mydatabase
MYSQL_USER: root
MYSQL_PASSWORD: password
volumes:
- ./mariadb-config:/etc/mysql/mariadb.conf.d
- D:\mariakafka\my.cnf:/etc/mysql/my.cnf
- ./mysql:/var/lib/mysql
ports:
- "3306:3306"
phpmyadmin:
image: phpmyadmin
restart: always
ports:
- 8080:80
environment:
- PMA_ARBITRARY=1
- UPLOAD_LIMIT=2G
- MEMORY_LIMIT=2G
postgres:
image: debezium/postgres:13
restart: always
volumes:
- ./postgres:/var/lib/postgresql/data
ports:
- 5432:5432
environment:
- POSTGRES_USER=docker
- POSTGRES_PASSWORD=docker
- POSTGRES_DB=exampledb
pgadmin:
image: dpage/pgadmin4
restart: always
environment:
PGADMIN_DEFAULT_EMAIL: [email protected]
PGADMIN_DEFAULT_PASSWORD: root
ports:
- "5050:80"
depends_on:
- postgres
zookeeper:
image: wurstmeister/zookeeper:latest
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-server:5.5.1
restart: always
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT"
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092"
KAFKA_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- "9092:9092"
schema-registry:
image: confluentinc/cp-schema-registry:5.5.3
restart: always
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8087,http://localhost:8087
ports:
- 8087:8087
depends_on: [zookeeper, kafka]
debezium:
image: debezium/connect:1.4
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_status
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
OFFSET_FLUSH_INTERVAL_MS: 60000
OFFSET_FLUSH_TIMEOUT_MS: 5000
SHUTDOWN_TIMEOUT: 10000
ports:
- 8083:8083
depends_on:
- kafka
- schema-registry
volumes:
- D:\mariakafka\kafka\config:/kafka/config
- D:/mariakafka/mysql-connector:/usr/share/java/kafka-connect
pyspark:
image: jupyter/pyspark-notebook:latest
ports:
- "8888:8888"
environment:
- PYSPARK_SUBMIT_ARGS=--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 --jars /home/jovyan/work/mysql-connector-j-9.0.0.jar pyspark-shell
volumes:
- D:/mariakafka/pyspark:/home/jovyan/work
- D:/mysql-connector-j-9.0.0.jar:/home/jovyan/work/mysql-connector-j-9.0.0.jar
depends_on:
- kafka
volumes:
postgres_data:
mariadb_data:
networks:
flink-network:
driver: bridge
docker exec -it 3179874d15c23934fc55b841a5650d6e07a33a72cbdd74de308615a0c11c45e0 bash
kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic mariadb.basecamp.employees --from-beginning --property schema.registry.url=http://schema-registry:8087
{"before":null,"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":0,"snapshot":{"string":"true"},"db":"basecamp","table":{"string":"employees"},"server_id":0,"gtid":null,"file":"mysql-bin.000032","pos":342,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1724124623364},"transaction":null}
{"before":null,"after":{"mariadb.basecamp.employees.Value":{"employee_id":2,"first_name":"Jane","last_name":"Mary"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":0,"snapshot":{"string":"true"},"db":"basecamp","table":{"string":"employees"},"server_id":0,"gtid":null,"file":"mysql-bin.000032","pos":342,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1724124623367},"transaction":null}
{"before":null,"after":{"mariadb.basecamp.employees.Value":{"employee_id":3,"first_name":"Alice","last_name":"Johnson"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":0,"snapshot":{"string":"last"},"db":"basecamp","table":{"string":"employees"},"server_id":0,"gtid":null,"file":"mysql-bin.000032","pos":342,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1724124623369},"transaction":null}
{"before":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Maxx","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":1724126944000,"snapshot":{"string":"false"},"db":"basecamp","table":{"string":"employees"},"server_id":1,"gtid":null,"file":"mysql-bin.000032","pos":549,"row":0,"thread":{"long":0},"query":null},"op":"u","ts_ms":{"long":1724126944214},"transaction":null}
{"before":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Maxx","last_name":"Doe"}},"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":1724127005000,"snapshot":{"string":"false"},"db":"basecamp","table":{"string":"employees"},"server_id":1,"gtid":null,"file":"mysql-bin.000032","pos":855,"row":0,"thread":{"long":0},"query":null},"op":"u","ts_ms":{"long":1724127006261},"transaction":null}
{"before":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Maxx","last_name":"Doe"}},"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":1724127005000,"snapshot":{"string":"false"},"db":"basecamp","table":{"string":"employees"},"server_id":1,"gtid":null,"file":"mysql-bin.000032","pos":855,"row":0,"thread":{"long":0},"query":null},"op":"u","ts_ms":{"long":1724127912824},"transaction":null}
Мартьян
Вопрос задан10 февраля 2024 г.