Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Special Character Handling #557

Open
e160842 opened this issue Oct 5, 2023 · 3 comments
Open

Allow Special Character Handling #557

e160842 opened this issue Oct 5, 2023 · 3 comments
Labels
wontfix This will not be worked on

Comments

@e160842
Copy link

e160842 commented Oct 5, 2023

Describe the issue
Using this programming to transfer data fails for data that includes Special Chars ( $%^& etc.) , especially when those Special Char's are in the Header (field names) of the data :(

Describe the solution you'd like
Enable this programming to 'handle' Special Chars

Describe alternatives you've considered
we have to create extra step to "cleanse" Special Chars before we use this programming - not efficient.

Additional context
Please let me know if any clarifications needed :)

@e160842 e160842 changed the title Allow Special Characters in the file Allow Special Character Handling Oct 5, 2023
@fhussonnois
Copy link
Member

Hi @e160842, could you please provide the connector's configuration you used to process data ? Thanks

@e160842
Copy link
Author

e160842 commented Oct 31, 2023

@fhussonnois - thank you for your support - below is sample connector config, with proprietary data obscured - Please let me know if any parts are unclear . .. .

apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
name: XXXXXXXXXXXXXXXXXXXXX
namespace: ${namespace}
spec:
name: XXXXXXXXXXXXXXXXXXXXXXXXXX
class: "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"
taskMax: 1
restartPolicy:
type: "OnFailure"
maxRetry: 10
connectRest:
endpoint: https://connector.${nspace}.xxx.cluster.local:8XXX
authentication:
type: bearer
bearer:
secretRef: v2-connectors-apikeys
configs:
errors.log.enable: "true"
errors.log.include.messages: "true"
connector.class: "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"
tasks.max: 1
fs.listing.class: "io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing"
aws.credentials.provider.class: "com.amazonaws.auth.InstanceProfileCredentialsProvider"
aws.s3.region: "us-east-1"
aws.s3.bucket.name: "XXXXXXXXXXXXXXXXXXXXXX"
aws.s3.bucket.prefix: "XXXXXXXXXXXXXXXXXX"
aws.s3.default.object.storage.class: "STANDARD"
fs.listing.filters: "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter"
fs.listing.interval.ms: "300000"
tasks.reader.class: "io.streamthoughts.kafka.connect.filepulse.fs.reader.AmazonS3RowFileInputReader"
topic: "${env}.stage.XXXXXXXXXXXXXXXX"
offset.policy.class: "io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy"
filters: "ParseCSVLine,SetKey,ProvideId,AnnotateWithType"
filters.ParseCSVLine.type: "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter"
filters.ParseCSVLine.trim.column: "true"
filters.ParseCSVLine.separator: "|"
filters.ParseCSVLine.columns:

"AMS: # Alternative - Hybrid Tru|AMS: % Alternative - Hybrid Tru|AMS: $ Electric/Hybrid Cars|AMS: % Electric/Hybrid Cars|AMS: # Alternative - Natural_Gas|AMS: Alternative Power>Natural Ga|Property/Realty: Home Stories|Property/Realty: Home Bath|Property/Realty: Home Bedrooms|Property/Realty: Home Total Rooms|Property/Realty: Home Exterior Wall T|BehaviorBank: Hi-tech owner|BehaviorBank: Internet/online subscri|SRVY:HH Acty/Int:Socl Caus/Con:Enviro"

filters.SetKey.type: "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter"
filters.SetKey.field: "$key"
filters.SetKey.value: "$value.Number"
filters.AnnotateWithType.type: "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter"
filters.AnnotateWithType.field: "$._t"
filters.AnnotateWithType.value: "XXXXXXXXXX"
filters.ProvideId.type: "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter"
filters.ProvideId.field: "$._id"
filters.ProvideId.value: "$value.Number"
fs.cleanup.policy.class: "io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy"
fs.cleanup.policy.triggered.on: "COMMITTED"
fs.cleanup.policy.move.success.aws.bucket.name: "SUCCESS_XXXXXXXX"
fs.cleanup.policy.move.success.aws.prefix.path: "SUCCESS/processed"
fs.cleanup.policy.move.failure.aws.bucket.name: "FAILURE_XXXXXX"
fs.cleanup.policy.move.failure.aws.prefix.path: "FAILURE/error"	
file.filter.regex.pattern: ".*\\.csv|.*\\.txt$"
offset.attributes.string: "name"
skip.headers: "1"
tasks.file.status.storage.class: "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore"
tasks.file.status.storage.bootstrap.servers: "kafka.${namespace}.svc.cluster.local:9071"
tasks.file.status.storage.topic: "${env}.prep.XXXXXXXXData-Status"
tasks.file.status.storage.topic.partitions: "1"
tasks.file.status.storage.topic.replication.factor: "3"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
key.converter.schemas.enable: "false"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable: "false"
tasks.file.status.storage.producer.security.protocol : "SASL_SSL"
tasks.file.status.storage.producer.ssl.endpoint.identification.algorithm : "https"
tasks.file.status.storage.producer.sasl.mechanism : "PLAIN"
tasks.file.status.storage.producer.sasl.jaas.config : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$${file:/mnt/secrets/v2-connectors-apikeys/plain.txt:username}\" password =\"$${file:/mnt/secrets/apikeys/plain.txt:password}\";"
tasks.file.status.storage.producer.request.timeout.ms : "20000"
tasks.file.status.storage.consumer.security.protocol : "SASL_SSL"
tasks.file.status.storage.consumer.ssl.endpoint.identification.algorithm : "https"
tasks.file.status.storage.consumer.sasl.mechanism : "PLAIN"
tasks.file.status.storage.consumer.sasl.jaas.config : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$${file:/mnt/secretkeys/plain.txt:username}\" password=\"$${file:/mnt/secretkeys/plain.txt:password}\";"
tasks.file.status.storage.consumer.request.timeout.ms : "20000"
principal.service.name: "$${file:/mnt/secrets/connector-configs/s3-src:username}"
principal.service.password: "$${file:/mnt/secrets/src:password}"

Copy link
Contributor

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the wontfix This will not be worked on label Jan 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
wontfix This will not be worked on
Projects
None yet
Development

No branches or pull requests

2 participants