Задание по склеиванию зависло на write_dynamic_frame.from_options()

1
6

При запуске скрипта ниже я добираюсь до части записи без проблем. Контейнер S3 заполняется данными, преобразование происходит правильно и т. д. Но моя задача просто выполняется и выполняется без записи данных в Redshift. Ошибок нет. Ничего в журналах, которые я могу найти, что указывало бы на какие-либо проблемы с подключением или записью.

Нет блокировок в целевой таблице. Она существует. Параметры подключения работают для получения данных, но не для их помещения. Что происходит?

Любая помощь приветствуется

various imports...

def get_secret(secret_name, region_name):
    session = boto3.session.Session()
    client = session.client(service_name='secretsmanager', region_name=region_name)
    get_secret_value_response = client.get_secret_value(SecretId=secret_name)
    secret = get_secret_value_response['SecretString']
    return eval(secret)  

args = getResolvedOptions(sys.argv, ['JOB_NAME', "TempDir"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

region_name = "" 
secret_name = ""  
redshift_temp_dir = args["TempDir"]
credentials = get_secret(secret_name, region_name)
redshift_user = credentials['username']
redshift_password = credentials['password']

redshift_connection_options = {
    "url": "",
    "dbtable": "",
    "user": redshift_user,
    "password": redshift_password,
    "redshiftTmpDir": redshift_temp_dir
}

source_df = glueContext.create_dynamic_frame.from_options(
    connection_type="redshift",
    connection_options=redshift_connection_options,
    transformation_ctx="source_df"
)

renamed_df = source_df.toDF().withColumn("lastupdatedtimestamp", col("lastupdatedtimestamp").cast("int"))
renamed_dynamic_df = DynamicFrame.fromDF(renamed_df, glueContext, "renamed_dynamic_df")

redshift_connection_options2 = {
    "url": "",
    "dbtable": "",
    "user": redshift_user,
    "password": redshift_password,
    "redshiftTmpDir": redshift_temp_dir
}

glueContext.write_dynamic_frame.from_options(
    frame=renamed_dynamic_df,
    connection_type="redshift",
    connection_options=redshift_connection_options2,
    transformation_ctx="target_df"
)

job.commit()

Екатерина
Вопрос задан5 февраля 2024 г.

1 Ответ

2

Зависание задания на этапе write_dynamic_frame.from_options() в Apache Spark может быть вызвано разными причинами. Давайте рассмотрим наиболее вероятные:

1. Проблемы с выводом данных:

  • Неверная конфигурация вывода:Проверьте конфигурацию вывода в write_dynamic_frame.from_options(). Убедитесь, что вы указываете правильный формат, путь и другие параметры.
  • Недостаточно ресурсов: Если вы выводите большое количество данных, возможно, не хватает ресурсов (памяти, дискового пространства) для завершения операции. 
  • Проблемы с системой хранения:Проблемы с файловой системой (например, ошибки записи, отсутствие прав доступа) могут препятствовать завершению операции вывода.

2. Проблемы с обработкой данных:

  • Ошибки в логике кода: Проверьте свою логику обработки данных в предыдущих этапах задания. Возможны ошибки, которые приводят к неправильному формату данных или зацикливанию.
  • Нехватка памяти: Если у вас слишком много данных в памяти, может возникнуть нехватка памяти и задание зависнет.

3. Проблемы с кластером Spark:

  • Нехватка ресурсов: Проверьте наличие достаточного количества узлов и ресурсов (памяти, процессоров) в кластере Spark.
  • Нестабильная связь: Проблемы с сетью между узлами кластера могут привести к зависанию задания.

Как отладить проблему:

  1. Просмотрите журналы Spark:Проверьте журналы Spark на наличие ошибок или предупреждений. 
  2. Проверьте конфигурацию задания:Убедитесь, что вы правильно настроили конфигурацию задания, включая параметры вывода и другие релевантные параметры.
  3. Уменьшите масштаб задания:Попробуйте уменьшить количество данных или разбить задание на несколько меньших заданий.
  4. Используйте инструменты профилирования: Используйте инструменты профилирования Spark (например, Spark UI) для выявления узких мест и проблем с производительностью.
  5. Проверьте настройки кластера:Убедитесь, что кластер Spark настроен правильно и имеет достаточно ресурсов для выполнения вашего задания.

Пример отладки кода:

from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Создание  SparkSession
spark = SparkSession.builder.appName("MySparkApp").getOrCreate()

# Загрузка  данных
df = spark.read.format("parquet").load("path/to/data.parquet")

# Обработка  данных
df = df.withColumn("new_column", col("column1") + lit(1))

# Запись  в  файловую  систему
df.write.format("parquet").mode("overwrite").save("path/to/output.parquet")

Проверьте следующие моменты:

  • Правильность путей:path/to/data.parquet и path/to/output.parquet.
  • Существование файла  data.parquet и прав доступа к папке path/to/output.parquet.
  • Наличие достаточно ресурсов в кластере Spark для обработки данных.

Важно: Устранение проблем с зависанием задания может требовать глубокого понимания конфигурации Spark и обработки данных. Изучите документацию Apache Spark и используйте инструменты отладки для выявления и устранения проблем.

 

Ираклий
Ответ получен4 сентября 2024 г.

Ваш ответ

Загрузить файл.