Pyspark поток kafka debezium тема Ошибка формата, ETL

Я успешно создал подключение к базе данных mariadb с помощью debezium и kafka

Когда я попытался транслировать тему с помощью pyspark, вот что я получил

Будет ли это проблемой, когда я захочу загрузить/транслировать эти данные в целевую базу данных? По сути, я пытаюсь сделать так: база данных mariadb -> kafka-debezium -> pyspark -> база данных mariadb

Вот мой code, с помощью которого я получил свой вывод:

Это мой docker compose

Вот что я надеялся увидеть, я получил этот вывод с помощью контейнера реестра схем

а затем

Но я не понимаю, как реплицировать его в pyspark

-------------------------------------------
Batch: 0
-------------------------------------------
+------+--------------------------------------------------------------------------------------------------------------------------+
|key   |value                                                                                                                     |
+------+--------------------------------------------------------------------------------------------------------------------------+
||MaxDoe1.4.2.Final\nmysqlmariadb\btruebasecampemployees mysql-bin.000032�r�ȯݭd                   |
||\bJane\bMary1.4.2.Final\nmysqlmariadb\btruebasecampemployees mysql-bin.000032�r�ȯݭd               |
||\nAliceJohnson1.4.2.Final\nmysqlmariadb\blastbasecampemployees mysql-bin.000032�r�ȯݭd            |
||MaxDoe\bMaxxDoe1.4.2.Final\nmysqlmariadb���߭d\nfalsebasecampemployees mysql-bin.000032�\bu���߭d|
||\bMaxxDoeMaxDoe1.4.2.Final\nmysqlmariadb���߭d\nfalsebasecampemployees mysql-bin.000032�\ru��߭d |
||\bMaxxDoeMaxDoe1.4.2.Final\nmysqlmariadb���߭d\nfalsebasecampemployees mysql-bin.000032�\ru����d|
+------+--------------------------------------------------------------------------------------------------------------------------+
#pysparkkafka1

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import traceback
import time

try:
    # Initialize the Spark session
    spark = SparkSession.builder \
        .appName("Kafka Spark Integration") \
        .getOrCreate()

    print("Spark session started")

    # Define Kafka topic and server
    kafka_topic = "mariadb.basecamp.employees"
    kafka_server = "kafka:9092"  # Replace with your Kafka server address if different

    # Print the Kafka topic
    print(f"Reading from Kafka topic: {kafka_topic}")

    # Read from the Kafka topic
    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_server) \
        .option("subscribe", kafka_topic) \
        .option("startingOffsets", "earliest") \
        .load()

    print("DataFrame created from Kafka topic")

    # Select key and value and convert from bytes to string
    df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    print("DataFrame transformed")

    # Display the dataframe in console
    query = df.writeStream \
        .outputMode("append") \
        .format("console") \
        .option("truncate", False) \
        .start()

    print("Query started")

    # Timeout for termination
    timeout = 60  # 1 minute timeout
    start_time = time.time()
    while time.time() - start_time < timeout:
        if query.isActive:
            print("Streaming...")
            time.sleep(10)  # Check every 10 seconds
        else:
            break

    query.stop()
    print("Query stopped")
    
except Exception as e:
    print("An error occurred:", e)
    traceback.print_exc()

finally:
    spark.stop()
    print("Spark session stopped")
version: '3.8'

services:
  mariadb:
    image: mariadb:10.5
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: password
      MYSQL_DATABASE: mydatabase
      MYSQL_USER: root
      MYSQL_PASSWORD: password
    volumes:
      - ./mariadb-config:/etc/mysql/mariadb.conf.d
      - D:\mariakafka\my.cnf:/etc/mysql/my.cnf
      - ./mysql:/var/lib/mysql
    ports:
      - "3306:3306"

  phpmyadmin:
    image: phpmyadmin
    restart: always
    ports:
      - 8080:80
    environment:
      - PMA_ARBITRARY=1
      - UPLOAD_LIMIT=2G
      - MEMORY_LIMIT=2G

  postgres:
    image: debezium/postgres:13
    restart: always
    volumes:
      - ./postgres:/var/lib/postgresql/data
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=docker
      - POSTGRES_PASSWORD=docker
      - POSTGRES_DB=exampledb

  pgadmin:
    image: dpage/pgadmin4
    restart: always
    environment:
      PGADMIN_DEFAULT_EMAIL: [email protected]
      PGADMIN_DEFAULT_PASSWORD: root
    ports:
      - "5050:80"
    depends_on:
      - postgres
 
  zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-server:5.5.1
    restart: always
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT"
      KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092"
      KAFKA_LISTENER_NAME: PLAINTEXT
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9991
    ports:
      - "9092:9092"

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.3
    restart: always
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8087,http://localhost:8087
    ports:
      - 8087:8087
    depends_on: [zookeeper, kafka]

  debezium:
    image: debezium/connect:1.4
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_status
      KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      OFFSET_FLUSH_INTERVAL_MS: 60000
      OFFSET_FLUSH_TIMEOUT_MS: 5000
      SHUTDOWN_TIMEOUT: 10000
    ports:
      - 8083:8083
    depends_on:
      - kafka
      - schema-registry
    volumes:
      - D:\mariakafka\kafka\config:/kafka/config
      - D:/mariakafka/mysql-connector:/usr/share/java/kafka-connect

  pyspark:
    image: jupyter/pyspark-notebook:latest
    ports:
      - "8888:8888"
    environment:
      - PYSPARK_SUBMIT_ARGS=--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 --jars /home/jovyan/work/mysql-connector-j-9.0.0.jar pyspark-shell
    volumes:
      - D:/mariakafka/pyspark:/home/jovyan/work
      - D:/mysql-connector-j-9.0.0.jar:/home/jovyan/work/mysql-connector-j-9.0.0.jar
    depends_on:
      - kafka

volumes:
  postgres_data:
  mariadb_data:
              
networks:
  flink-network:
    driver: bridge

docker exec -it  3179874d15c23934fc55b841a5650d6e07a33a72cbdd74de308615a0c11c45e0 bash
kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic mariadb.basecamp.employees --from-beginning --property schema.registry.url=http://schema-registry:8087

{"before":null,"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":0,"snapshot":{"string":"true"},"db":"basecamp","table":{"string":"employees"},"server_id":0,"gtid":null,"file":"mysql-bin.000032","pos":342,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1724124623364},"transaction":null}
{"before":null,"after":{"mariadb.basecamp.employees.Value":{"employee_id":2,"first_name":"Jane","last_name":"Mary"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":0,"snapshot":{"string":"true"},"db":"basecamp","table":{"string":"employees"},"server_id":0,"gtid":null,"file":"mysql-bin.000032","pos":342,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1724124623367},"transaction":null}
{"before":null,"after":{"mariadb.basecamp.employees.Value":{"employee_id":3,"first_name":"Alice","last_name":"Johnson"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":0,"snapshot":{"string":"last"},"db":"basecamp","table":{"string":"employees"},"server_id":0,"gtid":null,"file":"mysql-bin.000032","pos":342,"row":0,"thread":null,"query":null},"op":"r","ts_ms":{"long":1724124623369},"transaction":null}
{"before":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Maxx","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":1724126944000,"snapshot":{"string":"false"},"db":"basecamp","table":{"string":"employees"},"server_id":1,"gtid":null,"file":"mysql-bin.000032","pos":549,"row":0,"thread":{"long":0},"query":null},"op":"u","ts_ms":{"long":1724126944214},"transaction":null}
{"before":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Maxx","last_name":"Doe"}},"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":1724127005000,"snapshot":{"string":"false"},"db":"basecamp","table":{"string":"employees"},"server_id":1,"gtid":null,"file":"mysql-bin.000032","pos":855,"row":0,"thread":{"long":0},"query":null},"op":"u","ts_ms":{"long":1724127006261},"transaction":null}
{"before":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Maxx","last_name":"Doe"}},"after":{"mariadb.basecamp.employees.Value":{"employee_id":1,"first_name":"Max","last_name":"Doe"}},"source":{"version":"1.4.2.Final","connector":"mysql","name":"mariadb","ts_ms":1724127005000,"snapshot":{"string":"false"},"db":"basecamp","table":{"string":"employees"},"server_id":1,"gtid":null,"file":"mysql-bin.000032","pos":855,"row":0,"thread":{"long":0},"query":null},"op":"u","ts_ms":{"long":1724127912824},"transaction":null}
Мартьян
Вопрос задан10 февраля 2024 г.

1 Ответ

Ваш ответ

Загрузить файл.