Как запустить Python Apache Beam с помощью AWS Managed Apache Flink?

1
6

Я пытаюсь понять, как запустить простой Apache Beam с помощью Python, используя AWS Managed Apache Flink в качестве средства запуска

Приложение даже не может запуститься из-за следующего исключения, хотя практически нет информации/руководств, которые позволили бы его избежать (обычно есть только примеры приложений Java Beam для Managed Flink для AWS):

code задания, который я пытаюсь запустить, следующий:

(возможно, что сам code не работает, но у меня даже не было возможности запустить его до того, как было выдано исключение)

Конфигурация соответствует рекомендациям в документации AWS, zip-архив создается соответствующим образом, а также предоставляются все необходимые разрешения для служб. pyflink-dependencies.jar содержит только одну зависимость для aws-msk-iam-auth.

--------------------------- Python Process Started --------------------------  
Traceback (most recent call last):  
    File "/tmp/flink-web-5d98ef0c-f0f3-429e-ba53-60655903bff0/flink-web-upload/dac34d18-5e05-4c0c-ba49-f1b9d60dc8f9_code/main.py", line 53, in module  
        main()  
    File "/tmp/flink-web-5d98ef0c-f0f3-429e-ba53-60655903bff0/flink-web-upload/dac34d18-5e05-4c0c-ba49-f1b9d60dc8f9_code/main.py", line 36, in main  
        p  
    File "/usr/local/lib/python3.11/site-packages/apache_beam/transforms/ptransform.py", line 1110, in __ror__  
        return self.transform.__ror__(pvalueish, self.label)  
    File "/usr/local/lib/python3.11/site-packages/apache_beam/transforms/ptransform.py", line 623, in __ror__  
        result = p.apply(self, pvalueish, label)  
    File "/usr/local/lib/python3.11/site-packages/apache_beam/pipeline.py", line 679, in apply  
        return self.apply(transform, pvalueish)  
    File "/usr/local/lib/python3.11/site-packages/apache_beam/pipeline.py", line 741, in apply  
        pvalueish_result = self.runner.apply(transform, pvalueish, self._options)  
    File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/runner.py", line 203, in apply  
        return self.apply_PTransform(transform, input, options)  
    File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/runner.py", line 207, in apply_PTransform  
        return transform.expand(input)  
    File "/usr/local/lib/python3.11/site-packages/apache_beam/transforms/external.py", line 751, in expand  
        with ExternalTransform.service(expansion_service) as service:  
    File "/usr/local/lib/python3.11/contextlib.py", line 137, in __enter__  
        return next(self.gen)  
    File "/usr/local/lib/python3.11/site-packages/apache_beam/transforms/external.py", line 807, in service  
        with expansion_service as stub:  
    File "/usr/local/lib/python3.11/site-packages/apache_beam/transforms/external.py", line 1013, in __enter__  
        self._path_to_jar = subprocess_server.JavaJarServer.local_jar(  
    File "/usr/local/lib/python3.11/site-packages/apache_beam/utils/subprocess_server.py", line 376, in local_jar  
        os.makedirs(cache_dir)  
    File "frozen os", line 215, in makedirs  
    File "frozen os", line 225, in makedirs  
PermissionError: [Errno 13] Permission denied: '/opt/amazon/.apache_beam'  
--------------------------- Python Process Exited ---------------------------  
import logging

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions



def format_as_json(element):
    import json
    return json.dumps(element)


def main():
    options = {
        'output_bucket': 'test',
        'topic_name': 'test',
        "bootstrap_servers": "servers...."
    }
    topic = options.topic_name
    bootstrap_servers = options.bootstrap_servers
    s3_bucket_name = options.output_bucket
    consumer_config = {
        "bootstrap.servers": bootstrap_servers,
        "security.protocol": "SASL_SSL",
        "sasl.mechanism": "AWS_MSK_IAM",
        "sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
        "sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
    }

    pipeline_options = PipelineOptions()
    pipeline_options._all_options['streaming'] = True
    pipeline_options._all_options['enable_streaming_engine'] = True

    with beam.Pipeline(options=pipeline_options) as p:
        s = (
                p
                | 'ReadFromKafka' >> ReadFromKafka(consumer_config=consumer_config, topics=[topic])
                | 'Format as JSON' >> beam.Map(format_as_json)
                | 'Print' >> beam.Map(lambda x: log_value(x))
                | 'Write to S3' >> WriteToText(
                    file_path_prefix=f's3://{s3_bucket_name}/flink/new/',
                    file_name_suffix='.json'
                )
        )


def log_value(x):
    logging.info(x)
    return x


if __name__ == "__main__":
    main()
Герасим
Вопрос задан3 марта 2024 г.

1 Ответ

Ваш ответ

Загрузить файл.