Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(hard to track) mqtt sometimes hangs resubscribing with clean session = false #832

Open
kervel opened this issue Feb 9, 2022 · 5 comments

Comments

@kervel
Copy link

kervel commented Feb 9, 2022

What version of the Stream Reactor are you reporting this issue for?

3.0.1

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

running on v2.7 (i adapted build.gradle to match my kafka version)

What is the expected behaviour?

  • mqtt source connector using mTLS to mosquitto. mosquitto is configured to ignore the clientID and use the mTLS subject.
  • clean sessions = false
  • running in standalone mode (in the same pod as the mosquitto broker)
  • an unclean shutdown happens (OOM kill in my case)
  • kafka connect restarts (by kubernetes) and picks up where it left, not loosing any messages

What was observed?

  • kafka connect restarts, mosquitto sees a new connection from kafka connect and starts sending messages, but they never arrive at kafka connect side.
  • when enabling logging of messages i don't see any messages appearing anymore after the restart. however, mosquitto logs sending messages to kafkaconnect.
  • upon further inspection, the connector hangs in MqttManager#connectionComplete, in the call to paho subscribe()
  • i waited for up to 20 minutes before intervening

I found various similar (but not exactly the same) bug reports logged against paho mqttv3. So i modified lenses to use mqttv5 instead, and i cannot reproduce the issue anymore.

The modifications are rather trivial apart from clean session now being called clean start, but i can share them if needed.

What is your connector properties configuration (my-connector.properties)?

    name=mqtt-source
    connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
    tasks.max=1
    connect.mqtt.kcql=INSERT INTO mqtt SELECT * FROM +/event WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.BytesConverter`
    connect.mqtt.client.id=dm_source_id
    connect.mqtt.hosts=ssl://localhost:1883
    connect.mqtt.ssl.ca.cert=/opt/cert/ca.crt
    connect.mqtt.ssl.cert=/opt/cert/tls.crt
    connect.mqtt.ssl.key=/opt/cert/tls.key
    connect.mqtt.clean=false
    connect.mqtt.timeout=1000
    connect.mqtt.service.quality=2
    connect.mqtt.log.message=true
    producer.delivery.timeout.ms=20000
    producer.max.block.ms=60000
    producer.request.timeout.ms=15000

mosquitto config

    listener 1883 0.0.0.0
    allow_anonymous true
    require_certificate true
    use_identity_as_username true
    use_username_as_clientid true
    cafile /mosquitto/config/ca.crt
    certfile /mosquitto/config/server.crt
    keyfile /mosquitto/config/server.pem
    dhparamfile /mosquitto/config/dhparams.pem
    persistence true
    persistence_file /mosquitto/data/mosquitto.db
    persistent_client_expiration 2d
    queue_qos0_messages true
    max_queued_messages 10000
    log_type all
    acl_file /mosquitto/config/mosquitto.acl

Please provide full log files (redact and sensitive information)

this is on the first start:

[2022-02-09 20:38:30,720] INFO [Consumer clientId=connector-consumer-mqtt-sink-0, groupId=connect-mqtt-sink] Subscribed to topic(s): mqtt-outgoing (org.apache.kafka.clients.consumer.KafkaConsumer:965)
[2022-02-09 20:38:31,630] INFO Connected to ssl://localhost:1883 as dm_source_id (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager:50)
[2022-02-09 20:38:31,630] INFO Connected to ssl://localhost:1883 as dm_sink_id (com.datamountaineer.streamreactor.connect.mqtt.connection.MqttClientConnectionFn$:59)
[2022-02-09 20:38:31,636] WARN okay starting subscriptions (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager:153)
[2022-02-09 20:38:31,639] INFO Subscribed to topic [+/event] with QoS [2] (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager:160)

this is after a restart:

[2022-02-09 20:39:45,731] INFO [Consumer clientId=connector-consumer-mqtt-sink-0, groupId=connect-mqtt-sink] Subscribed to topic(s): mqtt-outgoing (org.apache.kafka.clients.consumer.KafkaConsumer:965)
[2022-02-09 20:39:46,619] INFO Connected to ssl://localhost:1883 as dm_sink_id (com.datamountaineer.streamreactor.connect.mqtt.connection.MqttClientConnectionFn$:59)
[2022-02-09 20:39:46,619] INFO Connected to ssl://localhost:1883 as dm_source_id (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager:50)
[2022-02-09 20:39:46,624] WARN okay starting subscriptions (com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager:153)

here, the last log message (subscribed) is missing because it never reaches that line of code. note that i added "okay starting subscriptions" print statement at the beginning of the connectionComplete function.

@kervel
Copy link
Author

kervel commented Feb 10, 2022

Hello,
unfortunately, in the mean time, i also managed to reproduce the problem using mqttv5. Seems stuck here:

│ "MQTT Call: dm_source_id" #45 prio=5 os_prio=0 cpu=6.39ms elapsed=151.10s tid=0x00007faa94017000 nid=0x42 in Object.wait()  [0x00007faabdd96000]                                                                                                                                                                                  │
│    java.lang.Thread.State: WAITING (on object monitor)                                                                                                                                                                                                                                                                            │
│     at java.lang.Object.wait([email protected]/Native Method)                                                                                                                                                                                                                                                                     │
│     - waiting on <0x00000000b7a6de10> (a java.lang.Object)                                                                                                                                                                                                                                                                        │
│     at java.lang.Object.wait([email protected]/Object.java:328)                                                                                                                                                                                                                                                                   │
│     at org.eclipse.paho.mqttv5.client.internal.Token.waitForResponse(Token.java:177)                                                                                                                                                                                                                                              │
│     - waiting to re-lock in wait() <0x00000000b7a6de10> (a java.lang.Object)                                                                                                                                                                                                                                                      │
│     at org.eclipse.paho.mqttv5.client.internal.Token.waitForCompletion(Token.java:130)                                                                                                                                                                                                                                            │
│     at org.eclipse.paho.mqttv5.client.MqttToken.waitForCompletion(MqttToken.java:76)                                                                                                                                                                                                                                              │
│     at org.eclipse.paho.mqttv5.client.MqttClient.subscribe(MqttClient.java:530)                                                                                                                                                                                                                                                   │
│     at org.eclipse.paho.mqttv5.client.MqttClient.subscribe(MqttClient.java:510)                                                                                                                                                                                                                                                   │
│     at org.eclipse.paho.mqttv5.client.MqttClient.subscribe(MqttClient.java:503)                                                                                                                                                                                                                                                   │
│     at com.datamountaineer.streamreactor.connect.mqtt.source.MqttManager.connectComplete(MqttManager.scala:163)                                                                                                                                                                                                                   │
│     at org.eclipse.paho.mqttv5.client.internal.ConnectActionListener.onSuccess(ConnectActionListener.java:175)                                                                                                                                                                                                                    │
│     at org.eclipse.paho.mqttv5.client.internal.CommsCallback.fireActionEvent(CommsCallback.java:358)                                                                                                                                                                                                                              │
│     at org.eclipse.paho.mqttv5.client.internal.CommsCallback.handleActionComplete(CommsCallback.java:285)                                                                                                                                                                                                                         │
│     - locked <0x00000000b784b4a8> (a org.eclipse.paho.mqttv5.client.MqttToken)     

@kervel
Copy link
Author

kervel commented Feb 10, 2022

And ... i think i found something that works reliably. I now do:

  client.setTimeToWait(1000)

before connecting the client. This way, the .subscribe() call never blocks more than 1 sec. since the subscribe works (it always works, its just paho that doesn't see it) even if it times out it continues fine.

@kervel kervel changed the title (hard to track) mqttv3 sometimes fails to resubscribe with clean session = false (hard to track) mqtt sometimes hangs resubscribing with clean session = false Feb 10, 2022
@harish-mohanadas
Copy link

harish-mohanadas commented Dec 18, 2023

The issue seems to still exist in version 6.0.0.

client.setTimeToWait will throw an MqttException on time out.

Could we check if there is a reconnect with clean session = false, and not subscribe in this case? client.subscribe will timeout and throw on the first connection, but proceed on reconnection.

--- a/kafka-connect-mqtt/src/main/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManager.scala
+++ b/kafka-connect-mqtt/src/main/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManager.scala
@@ -50,6 +50,7 @@ class MqttManager(
   client.setCallback(this)
 
   logger.info(s"Connecting to ${settings.connection}")
+  client.setTimeToWait(5000)
   client.connect(options)
   logger.info(s"Connected to ${settings.connection} as ${settings.clientId}")
 
@@ -165,9 +166,10 @@ class MqttManager(
     val topic = sourceToTopicMap.keySet.toArray
     val qos   = Array.fill(sourceToTopicMap.keySet.size)(settings.mqttQualityOfService)
 
-    if (reconnect)
+    if (reconnect && !options.isCleanSession())
       logger.warn(s"Reconnected. Resubscribing to topic $topic...")
-    client.subscribe(topic, qos)
+    else client.subscribe(topic, qos)
+
     if (reconnect)
       logger.warn(s"Resubscribed to topic $topic with QoS $qos")
     else logger.info(s"Subscribed to topic $topic with QoS $qos")

Is there a cleaner method?

@harish-mohanadas
Copy link

I could solve the issue by wrapping client.subscribe and the logging that follows in a Future to avoid blocking the connectComplete callback.

--- a/kafka-connect-mqtt/src/main/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManager.scala
+++ b/kafka-connect-mqtt/src/main/scala/io/lenses/streamreactor/connect/mqtt/source/MqttManager.scala
@@ -28,6 +28,9 @@ import java.util
 import java.util.Base64
 import java.util.concurrent.LinkedBlockingQueue
 import java.util.concurrent.TimeUnit
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
 import scala.jdk.CollectionConverters.ListHasAsScala
 
 class MqttManager(
@@ -167,9 +170,14 @@ class MqttManager(
 
     if (reconnect)
       logger.warn(s"Reconnected. Resubscribing to topic $topic...")
-    client.subscribe(topic, qos)
-    if (reconnect)
-      logger.warn(s"Resubscribed to topic $topic with QoS $qos")
-    else logger.info(s"Subscribed to topic $topic with QoS $qos")
+
+    Future {
+      client.subscribe(topic, qos)
+      if (reconnect)
+        logger.warn(s"Resubscribed to topic $topic with QoS $qos")
+      else logger.info(s"Subscribed to topic $topic with QoS $qos")
+    }
+
+    return
   }
 }

@chriline
Copy link

chriline commented Apr 3, 2024

I believe we're running into this very issue:
With connect.mqtt.clean=false the connector occasionally stops working and does not recover. While Resubscribing is logged, Resubscribed is not. According to broker logs messages are being sent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants