Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"vector validate" is stuck when running with kafka source and elasticsearch sinks #20367

Open
Skunnyk opened this issue Apr 24, 2024 · 4 comments
Labels
sink: elasticsearch Anything `elasticsearch` sink related source: kafka Anything `kafka` source related type: bug A code related bug.

Comments

@Skunnyk
Copy link

Skunnyk commented Apr 24, 2024

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

Hi, first, thank you for your work on vector :)

I have a problem with vector 0.37.1 (debian package, but I can trigger the problem with 0.36 too), with vector validate getting stuck majority of the time.
It seems to be related to a kafka source and a elasticsearch sink. If I test other configurations, no problem so far. This may be related to discussions in issue #19333

A minimal configuration which trigger the problem is attached.

11:47:55 [email protected]:/etc/vector# vector validate
√ Loaded ["/etc/vector/vector.yaml"]
√ Component configuration
√ Health check "es" <--- # stuck indefinitely
^C^Z <--- # no way to ctrl+c/stop, only kill -9 works
[2]+  Stopped                 vector validate

Sometimes it works:

11:47:57 [email protected]:/etc/vector# vector validate
√ Loaded ["/etc/vector/vector.yaml"]
√ Component configuration
√ Health check "es"
------------------------------------
                           Validated

I can see an open kafka connection to port 9092 when the validate is stuck.
The offset=Invalid metadata="" in DEBUG is strange. The kafka cluster works correctly.

Thank you,

Configuration

sources:
  kafka:
    type: kafka
    bootstrap_servers: "broker1:9092,broker2:9092"
    group_id: vector
    topics:
      - syslog-vector

sinks:
  es:
    type: "elasticsearch"
    inputs: ["kafka"]
    endpoints:
      - "http://elasticsearch:9200/"
    mode: bulk
    bulk:
      index: "vector-%Y-%m-%d"

api:
  enabled: true
  address: "127.0.0.1:8686"

Version

0.37.1

Debug Output

2024-04-24T09:54:20.667711Z DEBUG vector::app: Internal log rate limit configured. internal_log_rate_secs=10                              
2024-04-24T09:54:20.667758Z  INFO vector::app: Log level is enabled. level="trace"                                                                                                                                   
2024-04-24T09:54:20.667838Z DEBUG vector::app: messaged="Building runtime." worker_threads=4                                                                                                                         
2024-04-24T09:54:20.667933Z TRACE mio::poll: registering event source with poller: token=Token(1), interests=READABLE    
√ Loaded ["/etc/vector/vector.yaml"]                                                                      
2024-04-24T09:54:20.672046Z DEBUG vector::topology::builder: Building new source. component=kafka                                                                                                                    
2024-04-24T09:54:20.673311Z TRACE source{component_kind="source" component_id=kafka component_type=kafka}: rdkafka::client: Create new librdkafka client 0x7f6489341400    
2024-04-24T09:54:20.673384Z TRACE source{component_kind="source" component_id=kafka component_type=kafka}: rdkafka::util: Destroying topic partition list: 0x7f64895f0530                                            
2024-04-24T09:54:20.673399Z TRACE source{component_kind="source" component_id=kafka component_type=kafka}: rdkafka::util: Destroyed topic partition list: 0x7f64895f0530
2024-04-24T09:54:20.673425Z DEBUG vector::topology::builder: Building new sink. component=es
2024-04-24T09:54:20.675601Z TRACE rdkafka::consumer::stream_consumer: Starting stream consumer wake loop: 0x7f6489341400
2024-04-24T09:54:20.681276Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector_core::tls::settings: Fetching system root certs.
2024-04-24T09:54:20.687608Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector_core::tls::settings: Fetching system root certs.
2024-04-24T09:54:20.693947Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: vector::internal_events::http_client: Sending HTTP request. uri=http://elasticsearch:9200/ method=GET version=HTTP/1.1 headers={"user-agent": "Vector/0.37.1 (x86_64-unknown-linux-gnu cb6635a 2024-04-09 13:45:06.561412437)", "accept-encoding": "identity"} body=[empty]                        
2024-04-24T09:54:20.694011Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::pool: checkout waiting for idle connection: ("http", elasticsearch9200)
2024-04-24T09:54:20.694068Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::connect::http: Http::connect; scheme=Some("http"), host=Some("elasticsearch"),port=Some(Port(9200))                                                                                                                                           2024-04-24T09:54:20.694949Z DEBUG hyper::client::connect::dns: resolving host="elasticsearch"
2024-04-24T09:54:20.697128Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::connect::http: connecting to 10.20.0.100:9200
2024-04-24T09:54:20.697190Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: mio::poll: registering event source with poller: token=Token(140069773503104), interests=READABLE | 
WRITABLE                                                                                                  
2024-04-24T09:54:20.697606Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::connect::http: connected to 10.20.0.100:9200
2024-04-24T09:54:20.697634Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::conn: client handshake Http1
2024-04-24T09:54:20.697650Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::client: handshake complete, spawning background dispatcher task
2024-04-24T09:54:20.697726Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::pool: checkout dropped for ("http", elasticsearch:9200)
2024-04-24T09:54:20.697733Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Busy }
2024-04-24T09:54:20.697802Z TRACE encode_headers: hyper::proto::h1::role: Client::encode method=GET, body=None
2024-04-24T09:54:20.697838Z DEBUG hyper::proto::h1::io: flushed 185 bytes
2024-04-24T09:54:20.697876Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: KeepAlive, keep_alive: Busy }
2024-04-24T09:54:20.699867Z TRACE hyper::proto::h1::conn: Conn::read_head
2024-04-24T09:54:20.699898Z TRACE hyper::proto::h1::io: received 653 bytes
2024-04-24T09:54:20.699981Z TRACE parse_headers: hyper::proto::h1::role: Response.parse bytes=653
2024-04-24T09:54:20.700021Z TRACE parse_headers: hyper::proto::h1::role: Response.parse Complete(121)
2024-04-24T09:54:20.700102Z DEBUG hyper::proto::h1::io: parsed 3 headers
2024-04-24T09:54:20.700174Z DEBUG hyper::proto::h1::conn: incoming body is content-length (532 bytes)
2024-04-24T09:54:20.700259Z TRACE hyper::proto::h1::decode: decode; state=Length(532)
2024-04-24T09:54:20.700259Z TRACE hyper::proto::h1::decode: decode; state=Length(532)
2024-04-24T09:54:20.700280Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: vector::internal_events::http_client: HTTP response. status=200 OK version=HTTP/1.1 headers={"x-elas
tic-product": "Elasticsearch", "content-type": "application/json; charset=UTF-8", "content-length": "532"} body=[532 bytes]
2024-04-24T09:54:20.700310Z DEBUG hyper::proto::h1::conn: incoming body completed
2024-04-24T09:54:20.700403Z TRACE hyper::proto::h1::conn: maybe_notify; read_from_io blocked
2024-04-24T09:54:20.700458Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-04-24T09:54:20.700480Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-04-24T09:54:20.700508Z TRACE hyper::client::pool: pool dropped, dropping pooled (("http", elasticsearch:9200))
2024-04-24T09:54:20.700575Z TRACE hyper::proto::h1::dispatch: client tx closed
2024-04-24T09:54:20.700601Z TRACE hyper::proto::h1::conn: State::close_read()
2024-04-24T09:54:20.700633Z TRACE hyper::proto::h1::conn: State::close_write()
2024-04-24T09:54:20.700691Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Closed, writing: Closed, keep_alive: Disabled }
2024-04-24T09:54:20.700713Z TRACE hyper::proto::h1::conn: shut down IO complete
2024-04-24T09:54:20.700780Z TRACE mio::poll: deregistering event source from poller    
2024-04-24T09:54:20.701133Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector::sinks::elasticsearch::common: Auto-detected Elasticsearch API version. version=7
2024-04-24T09:54:20.706750Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector_core::tls::settings: Fetching system root certs.
2024-04-24T09:54:20.712262Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector_core::tls::settings: Fetching system root certs.
√ Component configuration
2024-04-24T09:54:20.718159Z TRACE tower::buffer::worker: worker polling for next message
2024-04-24T09:54:20.718204Z TRACE vector::validate: Healthcheck for es starting.
2024-04-24T09:54:20.718317Z DEBUG http: vector::internal_events::http_client: Sending HTTP request. uri=http://elasticsearch:9200/_cluster/health method=GET version=HTTP/1.1 headers={"user-agent": "Vector/0.37.1 (x86_64-unknown-linux-gnu cb6635a 2024-04-09 13:45:06.561412437)", "accept-encoding": "identity"} body=[empty]
2024-04-24T09:54:20.718354Z TRACE http: hyper::client::pool: checkout waiting for idle connection: ("http", elasticsearch:9200)
2024-04-24T09:54:20.718411Z TRACE http: hyper::client::connect::http: Http::connect; scheme=Some("http"), host=Some("elasticsearch"), port=Some(Port(9200))
2024-04-24T09:54:20.718506Z DEBUG hyper::client::connect::dns: resolving host="elasticsearch"
2024-04-24T09:54:20.719112Z DEBUG http: hyper::client::connect::http: connecting to 10.20.0.100:9200
2024-04-24T09:54:20.719162Z TRACE http: mio::poll: registering event source with poller: token=Token(140069749659520), interests=READABLE | WRITABLE    
2024-04-24T09:54:20.719604Z DEBUG http: hyper::client::connect::http: connected to 10.20.0.100:9200
2024-04-24T09:54:20.719726Z TRACE http: hyper::client::conn: client handshake Http1
2024-04-24T09:54:20.720184Z TRACE http: hyper::client::client: handshake complete, spawning background dispatcher task
2024-04-24T09:54:20.720346Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Busy }
2024-04-24T09:54:20.720492Z TRACE http: hyper::client::pool: checkout dropped for ("http", elasticsearch:9200)
2024-04-24T09:54:20.720646Z TRACE encode_headers: hyper::proto::h1::role: Client::encode method=GET, body=None
2024-04-24T09:54:20.720809Z DEBUG hyper::proto::h1::io: flushed 200 bytes
2024-04-24T09:54:20.720947Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: KeepAlive, keep_alive: Busy }
2024-04-24T09:54:20.739522Z TRACE hyper::proto::h1::conn: Conn::read_head
2024-04-24T09:54:20.739536Z TRACE hyper::proto::h1::io: received 510 bytes
2024-04-24T09:54:20.739545Z TRACE parse_headers: hyper::proto::h1::role: Response.parse bytes=510
2024-04-24T09:54:20.739555Z TRACE parse_headers: hyper::proto::h1::role: Response.parse Complete(121)
2024-04-24T09:54:20.739566Z DEBUG hyper::proto::h1::io: parsed 3 headers
2024-04-24T09:54:20.739571Z DEBUG hyper::proto::h1::conn: incoming body is content-length (389 bytes)
2024-04-24T09:54:20.739581Z TRACE hyper::proto::h1::decode: decode; state=Length(389)
2024-04-24T09:54:20.739586Z DEBUG hyper::proto::h1::conn: incoming body completed
2024-04-24T09:54:20.739591Z TRACE hyper::proto::h1::conn: maybe_notify; read_from_io blocked
2024-04-24T09:54:20.739599Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-04-24T09:54:20.739604Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-04-24T09:54:20.739616Z TRACE http: hyper::client::pool: put; add idle connection for ("http", elasticsearch:9200)
2024-04-24T09:54:20.739624Z DEBUG http: hyper::client::pool: pooling idle connection for ("http", elasticsearch:9200)
2024-04-24T09:54:20.739642Z DEBUG http: vector::internal_events::http_client: HTTP response. status=200 OK version=HTTP/1.1 headers={"x-elastic-product": "Elasticsearch", "content-type": "application/json; charset=UTF-8", "content-length": "389"} body=[389 bytes]
2024-04-24T09:54:20.739670Z  INFO vector::topology::builder: Healthcheck passed.
2024-04-24T09:54:20.739684Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
√ Health check "es"
2024-04-24T09:54:20.739712Z TRACE vector::validate: Healthcheck for es done.
2024-04-24T09:54:20.739764Z TRACE rdkafka::util: Destroying queue: 0x7f64894504c0    
2024-04-24T09:54:20.739776Z TRACE rdkafka::util: Destroyed queue: 0x7f64894504c0    
2024-04-24T09:54:20.739788Z TRACE rdkafka::consumer::base_consumer: Destroying consumer: 0x7f6489341400     
2024-04-24T09:54:20.739815Z TRACE rdkafka::consumer: Running pre-rebalance with Assign(TPL {syslog-vector/0: offset=Invalid metadata="", error=Ok(()); syslog-vector/1: offset=Invalid metadata="", error=Ok(()); sys
log-vector/2: offset=Invalid metadata="", error=Ok(()); syslog-vector/3: offset=Invalid metadata="", error=Ok(()); syslog-vector/4: offset=Invalid metadata="", error=Ok(()); syslog-vector/5: offset=Invalid metadat
a="", error=Ok(()); syslog-vector/6: offset=Invalid metadata="", error=Ok(()); syslog-vector/7: offset=Invalid metadata="", error=Ok(()); syslog-vector/8: offset=Invalid metadata="", error=Ok(()); syslog-vector/9:
 offset=Invalid metadata="", error=Ok(())})    
2024-04-24T09:54:20.739819Z TRACE tower::buffer::worker: worker polling for next message
2024-04-24T09:54:20.739874Z TRACE tower::buffer::worker: buffer already closed
2024-04-24T09:54:20.740630Z TRACE hyper::client::pool: pool closed, canceling idle interval
2024-04-24T09:54:20.740716Z TRACE hyper::proto::h1::dispatch: client tx closed
2024-04-24T09:54:20.740825Z TRACE hyper::proto::h1::conn: State::close_read()
2024-04-24T09:54:20.740838Z TRACE hyper::proto::h1::conn: State::close_write()
2024-04-24T09:54:20.740848Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Closed, writing: Closed, keep_alive: Disabled }
2024-04-24T09:54:20.740875Z TRACE hyper::proto::h1::conn: shut down IO complete
2024-04-24T09:54:20.740894Z TRACE mio::poll: deregistering event source from poller  
<stuck here>

Example Data

No response

Additional Context

No response

References

#19333

@Skunnyk Skunnyk added the type: bug A code related bug. label Apr 24, 2024
@Skunnyk
Copy link
Author

Skunnyk commented Apr 24, 2024

A trace when validate works (once in a while, random behavior), we can see that everything is fine on rdkafka side 🤔

2024-04-24T10:09:24.856154Z DEBUG vector::app: Internal log rate limit configured. internal_log_rate_secs=10                              
2024-04-24T10:09:24.856210Z  INFO vector::app: Log level is enabled. level="trace"                                                                                                                                   
2024-04-24T10:09:24.856264Z DEBUG vector::app: messaged="Building runtime." worker_threads=4                                                                                                                         
2024-04-24T10:09:24.856354Z TRACE mio::poll: registering event source with poller: token=Token(1), interests=READABLE    
√ Loaded ["/etc/vector/vector.yaml"]                                                                      
2024-04-24T10:09:24.860493Z DEBUG vector::topology::builder: Building new source. component=kafka                                                                                                                    
2024-04-24T10:09:24.861738Z TRACE source{component_kind="source" component_id=kafka component_type=kafka}: rdkafka::client: Create new librdkafka client 0x7f28d6141400    
2024-04-24T10:09:24.861797Z TRACE rdkafka::consumer::stream_consumer: Starting stream consumer wake loop: 0x7f28d6141400    
2024-04-24T10:09:24.861885Z TRACE source{component_kind="source" component_id=kafka component_type=kafka}: rdkafka::util: Destroying topic partition list: 0x7f28d63f0530                           
2024-04-24T10:09:24.861906Z TRACE source{component_kind="source" component_id=kafka component_type=kafka}: rdkafka::util: Destroyed topic partition list: 0x7f28d63f0530    
2024-04-24T10:09:24.861938Z DEBUG vector::topology::builder: Building new sink. component=es                                                                                                                         
2024-04-24T10:09:24.869500Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector_core::tls::settings: Fetching system root certs.
2024-04-24T10:09:24.877121Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector_core::tls::settings: Fetching system root certs.
2024-04-24T10:09:24.883296Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: vector::internal_events::http_client: Sending HTTP request. uri=http://elasticsearch:9200/ method=GET version=HTTP/1.1 headers={"user-agent": "Vector/0.37.1 (x86_64-unknown-linux-gnu cb6635a 2024-04-09 13:45:06.561412437)", "accept-encoding": "identity"} body=[empty]                        
2024-04-24T10:09:24.883361Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::pool: checkout waiting for idle connection: ("http", elasticsearch:9200)   
2024-04-24T10:09:24.883414Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::connect::http: Http::connect; scheme=Some("http"), host=Some("elasticsearch"), port=Some(Port(9200))
2024-04-24T10:09:24.884344Z DEBUG hyper::client::connect::dns: resolving host="elasticsearch"
2024-04-24T10:09:24.886658Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::connect::http: connecting to 10.20.0.100:9200                                        
2024-04-24T10:09:24.886716Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: mio::poll: registering event source with poller: token=Token(139813365213824), interests=READABLE | 
WRITABLE                                                                                                  
2024-04-24T10:09:24.887021Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::connect::http: connected to 10.20.0.100:9200
2024-04-24T10:09:24.887047Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::conn: client handshake Http1
2024-04-24T10:09:24.887061Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::client: handshake complete, spawning background dispatcher task
2024-04-24T10:09:24.887128Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::pool: checkout dropped for ("http", elasticsearch:9200)
2024-04-24T10:09:24.887138Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Busy }
2024-04-24T10:09:24.887201Z TRACE encode_headers: hyper::proto::h1::role: Client::encode method=GET, body=None
2024-04-24T10:09:24.887244Z DEBUG hyper::proto::h1::io: flushed 185 bytes
2024-04-24T10:09:24.887257Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: KeepAlive, keep_alive: Busy }
2024-04-24T10:09:24.889167Z TRACE hyper::proto::h1::conn: Conn::read_head
2024-04-24T10:09:24.889189Z TRACE hyper::proto::h1::io: received 653 bytes
2024-04-24T10:09:24.889221Z TRACE parse_headers: hyper::proto::h1::role: Response.parse bytes=653
2024-04-24T10:09:24.889241Z TRACE parse_headers: hyper::proto::h1::role: Response.parse Complete(121)
2024-04-24T10:09:24.889258Z DEBUG hyper::proto::h1::io: parsed 3 headers
2024-04-24T10:09:24.889269Z DEBUG hyper::proto::h1::conn: incoming body is content-length (532 bytes)
2024-04-24T10:09:24.889296Z TRACE hyper::proto::h1::decode: decode; state=Length(532)
2024-04-24T10:09:24.889346Z DEBUG hyper::proto::h1::conn: incoming body completed
2024-04-24T10:09:24.889361Z TRACE hyper::proto::h1::conn: maybe_notify; read_from_io blocked
2024-04-24T10:09:24.889326Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: vector::internal_events::http_client: HTTP response. status=200 OK version=HTTP/1.1 headers={"x-elastic-product": "Elasticsearch", "content-type": "application/json; charset=UTF-8", "content-length": "532"} body=[532 bytes]
2024-04-24T10:09:24.889382Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-04-24T10:09:24.890196Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-04-24T10:09:24.890212Z TRACE hyper::client::pool: pool dropped, dropping pooled (("http", elasticsearch:9200))
2024-04-24T10:09:24.890232Z TRACE hyper::proto::h1::dispatch: client tx closed
2024-04-24T10:09:24.890241Z TRACE hyper::proto::h1::conn: State::close_read()
2024-04-24T10:09:24.890249Z TRACE hyper::proto::h1::conn: State::close_write()
2024-04-24T10:09:24.890261Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Closed, writing: Closed, keep_alive: Disabled }
2024-04-24T10:09:24.890289Z TRACE hyper::proto::h1::conn: shut down IO complete
2024-04-24T10:09:24.890304Z TRACE mio::poll: deregistering event source from poller    
2024-04-24T10:09:24.890353Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector::sinks::elasticsearch::common: Auto-detected Elasticsearch API version. version=7
2024-04-24T10:09:24.895920Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector_core::tls::settings: Fetching system root certs.
2024-04-24T10:09:24.901408Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector_core::tls::settings: Fetching system root certs.
2024-04-24T10:09:24.907324Z TRACE tower::buffer::worker: worker polling for next message
√ Component configuration
2024-04-24T10:09:24.907394Z TRACE vector::validate: Healthcheck for es starting.
2024-04-24T10:09:24.907435Z DEBUG http: vector::internal_events::http_client: Sending HTTP request. uri=http://elasticsearch:9200/_cluster/health method=GET version=HTTP/1.1 headers={"user-agent": "Vector/0.37.1 (x86_64-unknown-linux-gnu cb6635a 2024-04-09 13:45:06.561412437)", "accept-encoding": "identity"} body=[empty]
2024-04-24T10:09:24.907461Z TRACE http: hyper::client::pool: checkout waiting for idle connection: ("http", elasticsearch:9200)
2024-04-24T10:09:24.907522Z TRACE http: hyper::client::connect::http: Http::connect; scheme=Some("http"), host=Some("elasticsearch"), port=Some(Port(9200))
2024-04-24T10:09:24.907563Z DEBUG hyper::client::connect::dns: resolving host="elasticsearch"
2024-04-24T10:09:24.908195Z DEBUG http: hyper::client::connect::http: connecting to 10.20.0.100:9200
2024-04-24T10:09:24.908243Z TRACE http: mio::poll: registering event source with poller: token=Token(139813332916352), interests=READABLE | WRITABLE    
2024-04-24T10:09:24.908561Z DEBUG http: hyper::client::connect::http: connected to 10.20.0.100:9200
2024-04-24T10:09:24.908581Z TRACE http: hyper::client::conn: client handshake Http1
2024-04-24T10:09:24.908593Z TRACE http: hyper::client::client: handshake complete, spawning background dispatcher task
2024-04-24T10:09:24.908606Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Busy }
2024-04-24T10:09:24.908615Z TRACE http: hyper::client::pool: checkout dropped for ("http", elasticsearch:9200)
2024-04-24T10:09:24.908630Z TRACE encode_headers: hyper::proto::h1::role: Client::encode method=GET, body=None
2024-04-24T10:09:24.908652Z DEBUG hyper::proto::h1::io: flushed 200 bytes
2024-04-24T10:09:24.908658Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: KeepAlive, keep_alive: Busy }
2024-04-24T10:09:24.927680Z TRACE hyper::proto::h1::conn: Conn::read_head
2024-04-24T10:09:24.927697Z TRACE hyper::proto::h1::io: received 510 bytes
2024-04-24T10:09:24.927706Z TRACE parse_headers: hyper::proto::h1::role: Response.parse bytes=510
2024-04-24T10:09:24.927714Z TRACE parse_headers: hyper::proto::h1::role: Response.parse Complete(121)
2024-04-24T10:09:24.927727Z DEBUG hyper::proto::h1::io: parsed 3 headers
2024-04-24T10:09:24.927732Z DEBUG hyper::proto::h1::conn: incoming body is content-length (389 bytes)
2024-04-24T10:09:24.927742Z TRACE hyper::proto::h1::decode: decode; state=Length(389)
2024-04-24T10:09:24.927747Z DEBUG hyper::proto::h1::conn: incoming body completed
2024-04-24T10:09:24.927752Z TRACE hyper::proto::h1::conn: maybe_notify; read_from_io blocked
2024-04-24T10:09:24.927759Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-04-24T10:09:24.927765Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-04-24T10:09:24.927776Z TRACE http: hyper::client::pool: put; add idle connection for ("http", elasticsearch:9200)
2024-04-24T10:09:24.927786Z DEBUG http: hyper::client::pool: pooling idle connection for ("http", elasticsearch:9200)
2024-04-24T10:09:24.927802Z DEBUG http: vector::internal_events::http_client: HTTP response. status=200 OK version=HTTP/1.1 headers={"x-elastic-product": "Elasticsearch", "content-type": "application/json; charset=UTF-8", "content-length": "389"} body=[389 bytes]   
2024-04-24T10:09:24.927834Z  INFO vector::topology::builder: Healthcheck passed.
2024-04-24T10:09:24.927846Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
√ Health check "es"
2024-04-24T10:09:24.927875Z TRACE vector::validate: Healthcheck for es done.
2024-04-24T10:09:24.927917Z TRACE rdkafka::util: Destroying queue: 0x7f28d62504c0    
2024-04-24T10:09:24.927920Z TRACE tower::buffer::worker: worker polling for next message
2024-04-24T10:09:24.927930Z TRACE rdkafka::util: Destroyed queue: 0x7f28d62504c0    
2024-04-24T10:09:24.927939Z TRACE tower::buffer::worker: buffer already closed
2024-04-24T10:09:24.927947Z TRACE rdkafka::consumer::base_consumer: Destroying consumer: 0x7f28d6141400     
2024-04-24T10:09:24.928702Z TRACE hyper::client::pool: pool closed, canceling idle interval
2024-04-24T10:09:24.928720Z TRACE hyper::proto::h1::dispatch: client tx closed
2024-04-24T10:09:24.928725Z TRACE hyper::proto::h1::conn: State::close_read()
2024-04-24T10:09:24.928729Z TRACE hyper::proto::h1::conn: State::close_write()
2024-04-24T10:09:24.928734Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Closed, writing: Closed, keep_alive: Disabled }
2024-04-24T10:09:24.928764Z TRACE hyper::proto::h1::conn: shut down IO complete
2024-04-24T10:09:24.928772Z TRACE mio::poll: deregistering event source from poller    
2024-04-24T10:09:24.964436Z TRACE rdkafka::consumer::base_consumer: Consumer destroyed: 0x7f28d6141400    
2024-04-24T10:09:24.964460Z TRACE rdkafka::util: Destroying client: 0x7f28d6141400    
2024-04-24T10:09:24.964984Z TRACE rdkafka::util: Destroyed client: 0x7f28d6141400    
2024-04-24T10:09:24.965053Z TRACE rdkafka::consumer::stream_consumer: Shut down stream consumer wake loop: 0x7f28d6141400    
------------------------------------
                           Validated

@Skunnyk
Copy link
Author

Skunnyk commented Apr 24, 2024

I'm a bit speechless.
I can't really reproduce the problem on another setup.
After investigation, I decided to add latency on the network interface, because intuition
We have 0.2ms between vector and our source and sink (yes, good network)

Adding 15ms on vector:

tc qdisc add dev ens192  root netem delay 15ms

And, magic, vector validate works all time. This is, wow. If I remove the delay, it's stuck again.

Can somebody with the problem try this workaround ? 🙏

@jszwedko
Copy link
Member

I'm a bit speechless. I can't really reproduce the problem on another setup. After investigation, I decided to add latency on the network interface, because intuition We have 0.2ms between vector and our source and sink (yes, good network)

Adding 15ms on vector:

tc qdisc add dev ens192  root netem delay 15ms

And, magic, vector validate works all time. This is, wow. If I remove the delay, it's stuck again.

Can somebody with the problem try this workaround ? 🙏

Huh, now that is is strange. Are Kafka and Elasticsearch already up and running when Vector starts up?

@Skunnyk
Copy link
Author

Skunnyk commented Apr 25, 2024

Yes they are up during validate, and during starts up.

vector validate also test the sources, even if there is no output in the console about that.

Note: If I run vector, everything is OK, it consumes kafka and write to ES.

@jszwedko jszwedko added sink: elasticsearch Anything `elasticsearch` sink related source: kafka Anything `kafka` source related labels Apr 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sink: elasticsearch Anything `elasticsearch` sink related source: kafka Anything `kafka` source related type: bug A code related bug.
Projects
None yet
Development

No branches or pull requests

2 participants