Я пытаюсь понять, как запустить простой Apache Beam с помощью Python, используя AWS Managed Apache Flink в качестве средства запуска
Приложение даже не может запуститься из-за следующего исключения, хотя практически нет информации/руководств, которые позволили бы его избежать (обычно есть только примеры приложений Java Beam для Managed Flink для AWS):
code задания, который я пытаюсь запустить, следующий:
(возможно, что сам code не работает, но у меня даже не было возможности запустить его до того, как было выдано исключение)
Конфигурация соответствует рекомендациям в документации AWS, zip-архив создается соответствующим образом, а также предоставляются все необходимые разрешения для служб. pyflink-dependencies.jar
содержит только одну зависимость для aws-msk-iam-auth
.
--------------------------- Python Process Started --------------------------
Traceback (most recent call last):
File "/tmp/flink-web-5d98ef0c-f0f3-429e-ba53-60655903bff0/flink-web-upload/dac34d18-5e05-4c0c-ba49-f1b9d60dc8f9_code/main.py", line 53, in module
main()
File "/tmp/flink-web-5d98ef0c-f0f3-429e-ba53-60655903bff0/flink-web-upload/dac34d18-5e05-4c0c-ba49-f1b9d60dc8f9_code/main.py", line 36, in main
p
File "/usr/local/lib/python3.11/site-packages/apache_beam/transforms/ptransform.py", line 1110, in __ror__
return self.transform.__ror__(pvalueish, self.label)
File "/usr/local/lib/python3.11/site-packages/apache_beam/transforms/ptransform.py", line 623, in __ror__
result = p.apply(self, pvalueish, label)
File "/usr/local/lib/python3.11/site-packages/apache_beam/pipeline.py", line 679, in apply
return self.apply(transform, pvalueish)
File "/usr/local/lib/python3.11/site-packages/apache_beam/pipeline.py", line 741, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/runner.py", line 203, in apply
return self.apply_PTransform(transform, input, options)
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/runner.py", line 207, in apply_PTransform
return transform.expand(input)
File "/usr/local/lib/python3.11/site-packages/apache_beam/transforms/external.py", line 751, in expand
with ExternalTransform.service(expansion_service) as service:
File "/usr/local/lib/python3.11/contextlib.py", line 137, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.11/site-packages/apache_beam/transforms/external.py", line 807, in service
with expansion_service as stub:
File "/usr/local/lib/python3.11/site-packages/apache_beam/transforms/external.py", line 1013, in __enter__
self._path_to_jar = subprocess_server.JavaJarServer.local_jar(
File "/usr/local/lib/python3.11/site-packages/apache_beam/utils/subprocess_server.py", line 376, in local_jar
os.makedirs(cache_dir)
File "frozen os", line 215, in makedirs
File "frozen os", line 225, in makedirs
PermissionError: [Errno 13] Permission denied: '/opt/amazon/.apache_beam'
--------------------------- Python Process Exited ---------------------------
import logging
import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
def format_as_json(element):
import json
return json.dumps(element)
def main():
options = {
'output_bucket': 'test',
'topic_name': 'test',
"bootstrap_servers": "servers...."
}
topic = options.topic_name
bootstrap_servers = options.bootstrap_servers
s3_bucket_name = options.output_bucket
consumer_config = {
"bootstrap.servers": bootstrap_servers,
"security.protocol": "SASL_SSL",
"sasl.mechanism": "AWS_MSK_IAM",
"sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
"sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
}
pipeline_options = PipelineOptions()
pipeline_options._all_options['streaming'] = True
pipeline_options._all_options['enable_streaming_engine'] = True
with beam.Pipeline(options=pipeline_options) as p:
s = (
p
| 'ReadFromKafka' >> ReadFromKafka(consumer_config=consumer_config, topics=[topic])
| 'Format as JSON' >> beam.Map(format_as_json)
| 'Print' >> beam.Map(lambda x: log_value(x))
| 'Write to S3' >> WriteToText(
file_path_prefix=f's3://{s3_bucket_name}/flink/new/',
file_name_suffix='.json'
)
)
def log_value(x):
logging.info(x)
return x
if __name__ == "__main__":
main()