При запуске скрипта ниже я добираюсь до части записи без проблем. Контейнер S3 заполняется данными, преобразование происходит правильно и т. д. Но моя задача просто выполняется и выполняется без записи данных в Redshift. Ошибок нет. Ничего в журналах, которые я могу найти, что указывало бы на какие-либо проблемы с подключением или записью.
Нет блокировок в целевой таблице. Она существует. Параметры подключения работают для получения данных, но не для их помещения. Что происходит?
Любая помощь приветствуется
various imports...
def get_secret(secret_name, region_name):
session = boto3.session.Session()
client = session.client(service_name='secretsmanager', region_name=region_name)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
secret = get_secret_value_response['SecretString']
return eval(secret)
args = getResolvedOptions(sys.argv, ['JOB_NAME', "TempDir"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
region_name = ""
secret_name = ""
redshift_temp_dir = args["TempDir"]
credentials = get_secret(secret_name, region_name)
redshift_user = credentials['username']
redshift_password = credentials['password']
redshift_connection_options = {
"url": "",
"dbtable": "",
"user": redshift_user,
"password": redshift_password,
"redshiftTmpDir": redshift_temp_dir
}
source_df = glueContext.create_dynamic_frame.from_options(
connection_type="redshift",
connection_options=redshift_connection_options,
transformation_ctx="source_df"
)
renamed_df = source_df.toDF().withColumn("lastupdatedtimestamp", col("lastupdatedtimestamp").cast("int"))
renamed_dynamic_df = DynamicFrame.fromDF(renamed_df, glueContext, "renamed_dynamic_df")
redshift_connection_options2 = {
"url": "",
"dbtable": "",
"user": redshift_user,
"password": redshift_password,
"redshiftTmpDir": redshift_temp_dir
}
glueContext.write_dynamic_frame.from_options(
frame=renamed_dynamic_df,
connection_type="redshift",
connection_options=redshift_connection_options2,
transformation_ctx="target_df"
)
job.commit()