Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema Registry & Amazon S3 Move Cleanup Policy Questions #584

Open
jwhitehead77 opened this issue Dec 24, 2023 · 0 comments
Open

Schema Registry & Amazon S3 Move Cleanup Policy Questions #584

jwhitehead77 opened this issue Dec 24, 2023 · 0 comments
Labels
question Further information is requested

Comments

@jwhitehead77
Copy link

I am running the latest version of the bitnami kafak docker images using docker compose. It has zookeeper, kafka, connect, and ksqldb all configured and working with other connectors so the setup is good. I also have localstack deployed in a container that mimics AWS services for my S3 bucket needs.

I needed a connector that would read files stored in AWS S3 in csv format and provide parsing, filtering, and transformations so that I can create a proper kafka message with a schema. After trying a number of connectors that just did not cut it, I finally came across File Pulse and man I was glad to find this connector as it does about 90% of what I need.

However, there are a couple of things that I have questions about because the documentation is either lacking, there seems to be a bug, or missing a feature that would be nice. I just have not been able to pinpoint which one it is yet.

First, curious as to why the schema that is created is not sent to the schema registry so that the message payload can flow through without the schema attached to each message. If the schema is in the schema registry then it can always be fetched and provided when needed. I have my connect setup to enable schemas so it seems that FilePulse does not support registering the schema...

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://schema-registry:8081

Second, when I use the AmazonS3MoveCleanupPolicy and set the fs.cleanup.policy.move.success.aws.prefix.path and fs.cleanup.policy.move.failure.aws.prefix.path options the file gets moved but no longer has a key.

The file entities.csv was placed in the root of the entity-data bucket where connector looks for files to process. This is a screenshot from localstack ui after processing the file and as you can see the the key is blank.
image

Connector Config Properties

{
  "tasks.max": "1",
  "skip.headers": "1",
  "topic": "entity-data-csv",
  "aws.s3.region": "us-east-1",
  "aws.s3.bucket.name": "entity-data",
  "aws.s3.path.style.access.enabled": "true",
  "aws.s3.default.object.storage.class":"STANDARD",
  "aws.s3.service.endpoint": "http://s3.localstack:4566",
  "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
  "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing",
  "fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
  "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy",
  "fs.cleanup.policy.move.success.aws.prefix.path": "processed",
  "fs.cleanup.policy.move.failure.aws.prefix.path": "failed",
  "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.AmazonS3RowFileInputReader",
  "tasks.file.status.storage.bootstrap.servers": "kafka:9092",
  "filters": "parseRow,setKey",
  "filters.parseRow.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
  "filters.parseRow.extract.column.name": "headers",
  "filters.parseRow.trim.column": "true",
  "filters.setKey.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
  "filters.setKey.field": "$key",
  "filters.setKey.value": "$value.ssn",
  "file.filter.regex.pattern": ".*\\.csv$",
  "offset.attributes.string": "name+hash"
}

This is a very awesome connector and it saved me a lot of coding to create my own.

Looking forward to your response.

Merry Christmas and Happy New Year
Jason Whitehead

@jwhitehead77 jwhitehead77 added the question Further information is requested label Dec 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant