Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: MalformedInputException in MQTT Source connector in Azure K8s #939

Open
mukkchir opened this issue May 5, 2023 · 14 comments
Open

Comments

@mukkchir
Copy link

mukkchir commented May 5, 2023

Hi,

I have deployed Strimzi kafka clusters in azure k8s. After deploying the MQTT source connector, I get the following error upon
kubectl describe kctr -n kafka -

Error Trace

java.nio.charset.MalformedInputException: Input length = 1
              at java.base/java.nio.charset.CoderResult.throwException(CoderResult.java:274)
              at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
              at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:188)
              at java.base/java.io.InputStreamReader.read(InputStreamReader.java:177)
              at java.base/java.io.BufferedReader.fill(BufferedReader.java:162)
              at java.base/java.io.BufferedReader.read(BufferedReader.java:183)
              at scala.io.BufferedSource.$anonfun$iter$2(BufferedSource.scala:41)
              at scala.io.Codec.wrap(Codec.scala:74)
              at scala.io.BufferedSource.$anonfun$iter$1(BufferedSource.scala:41)
              at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
              at scala.collection.Iterator$$anon$27.next(Iterator.scala:1135)
              at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:637)
              at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
              at scala.io.Source.hasNext(Source.scala:253)
              at scala.collection.Iterator.isEmpty(Iterator.scala:466)
              at scala.collection.Iterator.isEmpty$(Iterator.scala:466)
              at scala.io.Source.isEmpty(Source.scala:205)
              at scala.collection.IterableOnceOps.mkString(IterableOnce.scala:1165)
              at scala.collection.IterableOnceOps.mkString$(IterableOnce.scala:1164)
              at scala.io.Source.mkString(Source.scala:205)
              at scala.collection.IterableOnceOps.mkString(IterableOnce.scala:1179)
              at scala.collection.IterableOnceOps.mkString$(IterableOnce.scala:1190)
              at scala.io.Source.mkString(Source.scala:205)
              at com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceTask.start(MqttSourceTask.scala:48)
              at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:274)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
              at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
              at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
              at java.base/java.lang.Thread.run(Thread.java:833)

MQTT source connector config

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: mqtt-hm-source-connector
  labels:
    strimzi.io/cluster: kafka-connect-cluster
spec:
  class: com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
  tasksMax: 1
  config:
    topics: test
    connect.mqtt.clean: false
    connect.mqtt.kcql: INSERT INTO test SELECT * FROM `MY_TOPIC` WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`
    connect.mqtt.converter.throw.on.error: true
    connect.mqtt.client.id: "CLIENT_ID"
    connect.mqtt.hosts: "ssl://mqtt.broker.com:8883" (THIS IS A THIRD PARTY CONNECTION)
    connect.mqtt.ssl.ca.cert: "/mycerts/ca_file.pem"
    connect.mqtt.ssl.cert: "/mycerts/crt_file.pem.crt"
    connect.mqtt.ssl.key: "/mycerts/key_file.pem.key"
    connect.mqtt.service.quality: 1
    connect.progress.enabled: true
    connect.mqtt.log.message: true
    errors.log.include.messages: true
    errors.log.enable: true
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false 

Strimzi kafka config

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: messaging
spec:
  kafka:
    version: 3.4.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
      - name: external
        port: 9094
        type: nodeport
        tls: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      default.replication.factor: 1
      min.insync.replicas: 1
      inter.broker.protocol.version: "3.4"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 10Gi
        deleteClaim: false
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

Steps to reproduce in azure k8s

  1. kubectl create namespace kafka
  2. kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
  3. kubectl create -f ABOVE_KAFKA_CONFIG.yaml -n kafka
  4. kubectl create -f ABOVE_MQTT_SOURCE_CONFIG.yaml -n kafka
  5. kubectl get kctr -n kafka (connector status appears not ready)
  6. kubectl describe kctr -n kafka (here the error trace can be seen)
  7. kubectl delete kctr MQTT_CONNECTOR_NAME -n kafka (to re-attach the connector with changes its better to delete the existing connector and repeat step-6)

Previously used versions

kafka - 3.2.0
Strimzi cluster operator - 0.29.0
stream reactor kafka connector version- 4.0.0

Currently used versions

Kafka - 3.4.0
Strimzi operator- 0.34.0
stream reactor kafka connector version- 4.2.0

In both of these versions, the same error above persisted. I think I'm missing some config but not sure what it is. Any guidance will be greatly helpful.

@davidsloan
Copy link
Collaborator

@mukkchir thanks for the detailed report. Is it possible that your default charset is set to something different to UTF-8?

Please could you try this build that I have prepared, with a possible fix.

Let me know how it goes.

@mukkchir
Copy link
Author

@davidsloan you are really a lifesaver and huge kudos to you for giving me a quick fix. It's working!
I'm now able to connect to the stream successfully. I would like to keep this ticket open for a few more days as I'm doing some tests on kafka and the connector you provided. I will report back to this thread in case of any errors/exceptions.

@mukkchir
Copy link
Author

mukkchir commented May 15, 2023

@davidsloan The connector is constantly looping over "error handling message on topic" and re-subscribing to topic with error description below:


	2023-05-15 08:24:04,304 DEBUG [mqtt-hm-source-connector|task-0] Message received on topic [TOPIC_NAME]. Message id =[3] , isDuplicate=false, payload=ENCRYPTED_DATA (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager) [MQTT Call: afaf2189-2fc7-4288-8ed4-0c90846c5ef3]
2023-05-15 08:24:04,304 ERROR [mqtt-hm-source-connector|task-0] Error handling message with id:3 on topic:TOPIC_NAME (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager) [MQTT Call: afaf2189-2fc7-4288-8ed4-0c90846c5ef3]
org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:249)
	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)
	at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
	at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.$anonfun$handleObject$2(JsonSimpleConverter.scala:142)
	at scala.collection.immutable.Map$Map1.foreach(Map.scala:258)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.handleObject(JsonSimpleConverter.scala:142)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:117)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.$anonfun$handleObject$1(JsonSimpleConverter.scala:135)
	at scala.collection.immutable.List.map(List.scala:246)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.handleObject(JsonSimpleConverter.scala:133)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:117)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.$anonfun$handleObject$1(JsonSimpleConverter.scala:135)
	at scala.collection.immutable.List.map(List.scala:246)
    at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.handleObject(JsonSimpleConverter.scala:133)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:117)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:98)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter.convert(JsonSimpleConverter.scala:46)
	at com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager.messageArrived(MqttManager.scala:122)
	at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:519)
	at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:417)
	at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:214)
	at java.base/java.lang.Thread.run(Thread.java:833)
2023-05-15 08:24:04,305 WARN [mqtt-hm-source-connector|task-0] Connection lost. Re-connecting is set to true (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager) [MQTT Call: afaf2189-2fc7-4288-8ed4-0c90846c5ef3]
MqttException (0) - org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
	at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:228)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:249)
	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)
	at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
	at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.$anonfun$handleObject$2(JsonSimpleConverter.scala:142)
	at scala.collection.immutable.Map$Map1.foreach(Map.scala:258)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.handleObject(JsonSimpleConverter.scala:142)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:117)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.$anonfun$handleObject$1(JsonSimpleConverter.scala:135)
	at scala.collection.immutable.List.map(List.scala:246)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.handleObject(JsonSimpleConverter.scala:133)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:117)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.$anonfun$handleObject$1(JsonSimpleConverter.scala:135)
	at scala.collection.immutable.List.map(List.scala:246)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.handleObject(JsonSimpleConverter.scala:133)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:117)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter$.convert(JsonSimpleConverter.scala:98)
	at com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter.convert(JsonSimpleConverter.scala:46)
	at com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager.messageArrived(MqttManager.scala:122)
	at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:519)
	at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:417)
	at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:214)
	... 1 more

Not sure what is happening. How to properly handle this in the connector config? Also, what happens if the task.max is set to 2 or more?

@davidsloan
Copy link
Collaborator

Not sure what is happening. How to properly handle this in the connector config? Also, what happens if the task.max is set to 2 or more?

@mukkchir are you able to send me some samples of your data? This would help troubleshooting this issue. You can find me on our community slack if you are not comfortable sending it on here. It is not obvious to me what the problem is without more information.

@mukkchir
Copy link
Author

mukkchir commented May 15, 2023

@davidsloan Sure! Here is the sample data on which the error is occurring:

{
   "_id":{
      "$oid":"VARCHAR"
   },
   "capability":"diagnostics",
   "data":{
      "diagnostics":{
         "tire_pressures":[
            {
               "data":{
                  "location":"front_left",
                  "pressure":{
                     "unit":"kilopascals",
                     "value":{
                        "$numberDouble":"230.0"
                     }
                  }
               },
               "timestamp":"2022-11-17T12:33:22.000Z"
            },
            {
               "data":{
                  "location":"front_right",
                  "pressure":{
                     "unit":"kilopascals",
                     "value":{
                        "$numberDouble":"230.0"
                     }
                  }
               },
               "timestamp":"2022-11-17T12:33:19.000Z"
            },
            {
               "data":{
                  "location":"rear_left",
                  "pressure":{
                     "unit":"kilopascals",
                     "value":{
                        "$numberDouble":"257.5"
                     }
                  }
               },
               "timestamp":"2022-11-17T12:33:50.000Z"
            },
            {
               "data":{
                  "location":"rear_right",
                  "pressure":{
                     "unit":"kilopascals",
                     "value":{
                        "$numberDouble":"252.5"
                     }
                  }
               },
               "timestamp":"2022-11-17T12:33:10.000Z"
            }
         ]
      }
   },
   "message_id":"VARCHAR",
   "property":"tire_pressures",
   "version":{
      "$numberInt":"1"
   },
   "num":"VARCHAR"
}

@davidsloan
Copy link
Collaborator

davidsloan commented May 15, 2023

I'm sorry I'm not seeing a problem with this input

I've put together a quick test here which you can see is working:
#942

Is this message any different from previous messages? Is there any schema change, missing/added fields etc here since the preceding message?

@mukkchir
Copy link
Author

mukkchir commented May 15, 2023

@davidsloan This is just the sample data. Since the data is encrypted, I'm working on decrypting it. I will post the actual data soon. Is there a config where I can move to next message if the previous message is faulty?

@spike83
Copy link

spike83 commented May 16, 2023

@davidsloan I've got the same issue with the jms source connector. When can we expect a release with the fix included? Thank you very much 🙏.

@mukkchir
Copy link
Author

@davidsloan seems like its the same data which I posted here.

@davidsloan
Copy link
Collaborator

@mukkchir this is message id 3, correct? Are message id 1 & 2 any different?

@davidsloan
Copy link
Collaborator

@mukkchir it seems like the issue might be that a different charset encoding is being configured by default. Please can you try setting this environment variable in the Connect container:

LC_ALL=en_US.UTF-8

And let me know how you get on.

@erezblm
Copy link

erezblm commented Jun 10, 2023

Hey,

@davidsloan
I was getting the exact same error with MalfromedInputException using strimzi and AKS deployment.
Your uploaded package fix the issue for me so thanks a lot for that, waiting for an update for official release.

I tried the AC_ALL=en_US.UTF-8 but it was ignore by the bash at the beginning of the connect:
"bash: warning: setlocale: LC_ALL: cannot change locale (en_US.UTF-8)"

Thanks.

@erezblm
Copy link

erezblm commented Aug 16, 2023

Hi @davidsloan, I'm now using the "sink connector" and I have the same problem with the malformed input.
Is your fix handling only the source connector?
Any idea if an official release will be soon with those fixes?

Thanks a lot, Erez.

@sunmeplz
Copy link

sunmeplz commented Sep 5, 2023

Im using mqtt connector inside debezium kafka connect docker, the issue was solved by adding LANG=en_US.UTF-8 variable to docker container

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants