Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sink__consumer_offsets topic into the S3 bucket using S3 Lenses Plugins #924

Open
cchidhu opened this issue Mar 21, 2023 · 2 comments
Open

Comments

@cchidhu
Copy link

cchidhu commented Mar 21, 2023

Issue Guidelines

We're attempting to use the S3 Connector to back up our Kafka Cluster for Disaster Recovery purposes. However, we're encountering an issue when trying to sink the __consumer_offsets topic into the S3 bucket. We've tried different configurations for the S3 Connector for this topic, but it continues to fail due to the binary format of the data. We're unsure which value converter to use for this topic. We've included the connector configuration and a trace from the SinkTask for reference. Can you please provide some advice on how to resolve this issue?

Please review these questions before submitting any issue?

What version of the Stream Reactor are you reporting this issue for?

2-8-4.0.0

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

Yes

Do you have a supported version of the data source/sink .i.e Cassadra 3.0.9?

Yes, S3 Sink Connector

Have you read the docs? Yes

What is the expected behaviour? __consumer_offset topic data should be sinked into the S3 bucket.

What was observed? SinkTask is throwing exception while sinking the binary data from __consumner_offset topic

What is your Connect cluster configuration (connect-avro-distributed.properties)?

What is your connector properties configuration (my-connector.properties)?

image

Please provide full log files (redact and sensitive information)

image

@davidsloan
Copy link
Collaborator

This is on our roadmap to provide a solution in the future.

@elapocha
Copy link

elapocha commented Jun 5, 2024

I managed to backup __consumer_offsets with the following connector configuration:
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector connect.s3.errors.log.include.messages=true tasks.max=2 topics=__consumer_offsets connect.s3.aws.auth.mode=Credentials connect.s3.aws.access.key=... flush.count=50000 connect.s3.kcql=INSERT INTO msk-s3-xxxxxx-consumer:my_workload SELECT * FROM __consumer_offsets STOREAS AVRO PROPERTIES('store.envelope'=true, 'store.envelope.key'= true, 'store.envelope.headers'=true, 'store.envelope.value'=true, 'store.envelope.metadata'= true) connect.s3.aws.region=eu-central-1 connect.s3.aws.secret.key=... value.converter=org.apache.kafka.connect.converters.ByteArrayConverter flush.interval=200 errors.log.enable=true key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

However, __consumer_offsets is a special topic, and Source Connector writing data back from s3 to kafka throws org.apache.kafka.common.errors.InvalidTopicException: The request attempted to perform an operation on an invalid topic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants