Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running csv example results in an error #511

Open
pmizenin opened this issue Aug 22, 2023 · 1 comment
Open

Running csv example results in an error #511

pmizenin opened this issue Aug 22, 2023 · 1 comment
Labels
wontfix This will not be worked on

Comments

@pmizenin
Copy link

pmizenin commented Aug 22, 2023

Docker Compose version v2.19.1
Docker version 23.0.1
Darwin Kernel Version 22.1.0: Sun Oct 9 20:14:30 PDT 2022; root:xnu-8792.41.9~2/RELEASE_ARM64_T8103 arm64 arm
Working with commit 75304f9

Posting the following results in an error:

~/Projects/kafka-connect-file-pulse % curl -sX PUT http://localhost:8083/connectors/connect-file-pulse-quickstart-csv/config \ -d @examples/connect-file-pulse-quickstart-csv.json \ --header "Content-Type: application/json" | jq { "name": "connect-file-pulse-quickstart-csv", "config": { "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector", "filters": "ParseCSVLine, Drop", "filters.Drop.if": "{{ equals($value.artist, 'U2') }}", "filters.Drop.invert": "true", "filters.Drop.type": "io.streamthoughts.kafka.connect.filepulse.filter.DropFilter", "filters.ParseCSVLine.extract.column.name": "headers", "filters.ParseCSVLine.trim.column": "true", "filters.ParseCSVLine.separator": ";", "filters.ParseCSVLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter", "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy", "fs.cleanup.policy.triggered.on": "COMMITTED", "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing", "fs.listing.directory.path": "/tmp/kafka-connect/examples/", "fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter", "fs.listing.interval.ms": "10000", "file.filter.regex.pattern": ".*\\.csv$", "offset.policy.class": "io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy", "offset.attributes.string": "name", "skip.headers": "1", "topic": "connect-file-pulse-quickstart-csv", "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader", "tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore", "tasks.file.status.storage.bootstrap.servers": "broker:29092", "tasks.file.status.storage.topic": "connect-file-pulse-status", "tasks.file.status.storage.topic.partitions": "10", "tasks.file.status.storage.topic.replication.factor": "1", "tasks.max": "1", "name": "connect-file-pulse-quickstart-csv" }, "tasks": [], "type": "source" }

Here's the error

*2023-08-22 22:38:10 connect | 2023-08-22 20:38:10,852 ERROR [task-thread-connect-file-pulse-quickstart-csv-0] Error occurred while executing filter 'DelimitedRowFilter' on record='[name: headers, type: ARRAY, value: [title;album;duration;release;artist;type], name: message, type: STRING, value: 40;War;02:38;1983;U2;Rock]' (io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline) 2023-08-22 22:38:10 connect | 2023-08-22 20:38:10,950 INFO [task-thread-connect-file-pulse-quickstart-csv-0] Closed iterator for: [uri=file:/tmp/kafka-connect/examples/quickstart-musics-dataset.csv, name='quickstart-musics-dataset.csv', contentLength=6588, lastModified=1688789014000, contentDigest=[digest=1466679696, algorithm='CRC32'], userDefinedMetadata={system.inode=3021071, system.hostname=edd3d7bfb82b}] (io.streamthoughts.kafka.connect.filepulse.source.DelegateFileInputIterator) 2023-08-22 22:38:10 connect | 2023-08-22 20:38:10,951 ERROR [task-thread-connect-file-pulse-quickstart-csv-0] Error while processing source file '[uri=file:/tmp/kafka-connect/examples/quickstart-musics-dataset.csv, name='quickstart-musics-dataset.csv', contentLength=6588, lastModified=1688789014000, contentDigest=[digest=1466679696, algorithm='CRC32'], userDefinedMetadata={system.inode=3021071, system.hostname=edd3d7bfb82b}]' (io.streamthoughts.kafka.connect.filepulse.source.FileObjectStateReporter) 2023-08-22 22:38:10 connect | io.streamthoughts.kafka.connect.filepulse.expression.accessor.AccessException: Cannot access to field 'artist' 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.accessor.TypedStructAccessor.read(TypedStructAccessor.java:60) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.accessor.TypedStructAccessor.read(TypedStructAccessor.java:27) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.accessor.PropertyAccessors.evaluateReaders(PropertyAccessors.java:104) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.accessor.PropertyAccessors.readPropertyValue(PropertyAccessors.java:65) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.PropertyExpression.readValue(PropertyExpression.java:84) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.function.ExpressionArgument.evaluate(ExpressionArgument.java:46) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.function.AbstractExpressionFunctionInstance.lambda$invoke$0(AbstractExpressionFunctionInstance.java:40) 2023-08-22 22:38:10 connect | at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104) 2023-08-22 22:38:10 connect | at java.base/java.util.stream.IntPipeline$Head.forEachOrdered(IntPipeline.java:603) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.function.AbstractExpressionFunctionInstance.invoke(AbstractExpressionFunctionInstance.java:38) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.function.ExpressionFunctionExecutor.execute(ExpressionFunctionExecutor.java:50) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.FunctionExpression.readValue(FunctionExpression.java:58) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.SubstitutionExpression$ReplacementExpression.readValue(SubstitutionExpression.java:200) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.SubstitutionExpression$ReplacementExpression.readValue(SubstitutionExpression.java:215) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.SubstitutionExpression.readValue(SubstitutionExpression.java:105) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.SubstitutionExpression.readValue(SubstitutionExpression.java:93) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.filter.condition.ExpressionFilterCondition.apply(ExpressionFilterCondition.java:63) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.filter.condition.FilterCondition.lambda$revert$1(FilterCondition.java:45) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.filter.DropFilter.apply(DropFilter.java:44) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline$FilterNode.apply(DefaultRecordFilterPipeline.java:160) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline$FilterNode.lambda$apply$1(DefaultRecordFilterPipeline.java:173) 2023-08-22 22:38:10 connect | at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271) 2023-08-22 22:38:10 connect | at java.base/java.util.LinkedList$LLSpliterator.forEachRemaining(LinkedList.java:1239) 2023-08-22 22:38:10 connect | at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) 2023-08-22 22:38:10 connect | at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 2023-08-22 22:38:10 connect | at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) 2023-08-22 22:38:10 connect | at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 2023-08-22 22:38:10 connect | at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline$FilterNode.apply(DefaultRecordFilterPipeline.java:176) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:132) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:100) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.next(DefaultFileRecordsPollingConsumer.java:176) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:200) 2023-08-22 22:38:10 connect | at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:296) 2023-08-22 22:38:10 connect | at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:253) 2023-08-22 22:38:10 connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) 2023-08-22 22:38:10 connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) 2023-08-22 22:38:10 connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 2023-08-22 22:38:10 connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 2023-08-22 22:38:10 connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 2023-08-22 22:38:10 connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 2023-08-22 22:38:10 connect | at java.base/java.lang.Thread.run(Thread.java:829) 2023-08-22 22:38:10 connect | 2023-08-22 20:38:10,956 ERROR [task-thread-connect-file-pulse-quickstart-csv-0] Caught unexpected error while processing file. Ignore and continue (io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask) 2023-08-22 22:38:10 connect | io.streamthoughts.kafka.connect.filepulse.expression.accessor.AccessException: Cannot access to field 'artist' 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.accessor.TypedStructAccessor.read(TypedStructAccessor.java:60) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.accessor.TypedStructAccessor.read(TypedStructAccessor.java:27) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.accessor.PropertyAccessors.evaluateReaders(PropertyAccessors.java:104) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.accessor.PropertyAccessors.readPropertyValue(PropertyAccessors.java:65) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.PropertyExpression.readValue(PropertyExpression.java:84) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.function.ExpressionArgument.evaluate(ExpressionArgument.java:46) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.function.AbstractExpressionFunctionInstance.lambda$invoke$0(AbstractExpressionFunctionInstance.java:40) 2023-08-22 22:38:10 connect | at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104) 2023-08-22 22:38:10 connect | at java.base/java.util.stream.IntPipeline$Head.forEachOrdered(IntPipeline.java:603) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.function.AbstractExpressionFunctionInstance.invoke(AbstractExpressionFunctionInstance.java:38) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.function.ExpressionFunctionExecutor.execute(ExpressionFunctionExecutor.java:50) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.FunctionExpression.readValue(FunctionExpression.java:58) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.SubstitutionExpression$ReplacementExpression.readValue(SubstitutionExpression.java:200) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.SubstitutionExpression$ReplacementExpression.readValue(SubstitutionExpression.java:215) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.SubstitutionExpression.readValue(SubstitutionExpression.java:105) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.expression.SubstitutionExpression.readValue(SubstitutionExpression.java:93) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.filter.condition.ExpressionFilterCondition.apply(ExpressionFilterCondition.java:63) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.filter.condition.FilterCondition.lambda$revert$1(FilterCondition.java:45) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.filter.DropFilter.apply(DropFilter.java:44) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline$FilterNode.apply(DefaultRecordFilterPipeline.java:160) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline$FilterNode.lambda$apply$1(DefaultRecordFilterPipeline.java:173) 2023-08-22 22:38:10 connect | at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271) 2023-08-22 22:38:10 connect | at java.base/java.util.LinkedList$LLSpliterator.forEachRemaining(LinkedList.java:1239) 2023-08-22 22:38:10 connect | at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) 2023-08-22 22:38:10 connect | at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 2023-08-22 22:38:10 connect | at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) 2023-08-22 22:38:10 connect | at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 2023-08-22 22:38:10 connect | at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline$FilterNode.apply(DefaultRecordFilterPipeline.java:176) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:132) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:100) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.next(DefaultFileRecordsPollingConsumer.java:176) 2023-08-22 22:38:10 connect | at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:200) 2023-08-22 22:38:10 connect | at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:296) 2023-08-22 22:38:10 connect | at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:253) 2023-08-22 22:38:10 connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) 2023-08-22 22:38:10 connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) 2023-08-22 22:38:10 connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 2023-08-22 22:38:10 connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 2023-08-22 22:38:10 connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 2023-08-22 22:38:10 connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 2023-08-22 22:38:10 connect | at java.base/java.lang.Thread.run(Thread.java:829) 2023-08-22 22:38:10 connect | 2023-08-22 20:38:10,958 INFO [task-thread-connect-file-pulse-quickstart-csv-0] Completed all object files. FilePulse source task is transitioning to IDLE state while waiting for new reconfiguration request from source connector. (io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask)

Copy link
Contributor

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the wontfix This will not be worked on label Nov 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
wontfix This will not be worked on
Projects
None yet
Development

No branches or pull requests

1 participant