Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate Sudden Surge in "too far in past" errors in M3 Aggregator #4178

Open
atibdialpad opened this issue Jan 5, 2023 · 1 comment
Open

Comments

@atibdialpad
Copy link

We are using the fleet-of-m3-coordinators-and-m3-aggregators topology to aggregate metrics before sending it to downstream grafana remote storage
https://m3db.io/docs/how_to/any_remote_storage/#fleet-of-m3-coordinators-and-m3-aggregators

Apps ---> otel collector Prometheus Receiver -> M3 Coordinator --> M3 Aggregators --> M3 Coordinator (Aggregated metrics )-> Prometheus Remote Write to Grafana

We are observing a sudden surge in M3 Aggregation Errors of the type "too far in past" after around ~30 hours of traffic. (1.5 full day of prod traffic). I am not sure if its just caused by load since 1) the cluster looks very stable for the first full day of prod traffic 2) the increase in these errors are not gradual. It really is a sudden increase and that causes the M3 Coordinators to finally OOM and crash.

The surge in the above errors are accompanied by an increase in the ingest latency metric
Screenshot 2023-01-05 at 11 25 03 AM

Topology Details:

25 Otel Collector Pods -> 25 M3 Coordinator Nodes -> 24 M3 Agg Nodes - Shards 512 - RF 2

M3 Agg End to End details dashboard snapshot.pdf
M3 Coordinator Dashboard snapshot.pdf

I suspect something might be wrong with my configs

M3 Agg Log example:

{"level":"error","ts":1672883758.452722,"msg":"could not process message","error":"datapoint for aggregation too far in past: off_by=28m19.550704118s, timestamp=2023-01-05T01:22:23Z, past_limit=2023-01-05T01:50:   43Z, timestamp_unix_nanos=1672881743902000000, past_limit_unix_nanos=1672883443452704118","errorCauses":[{"error":"datapoint for aggregation too far in past: off_by=28m19.550704118s, timestamp=2023-01-05T01:22:    23Z, past_limit=2023-01-05T01:50:43Z, timestamp_unix_nanos=1672881743902000000, past_limit_unix_nanos=1672883443452704118"}],"shard":854,"proto":"type:TIMED_METRIC_WITH_METADATAS timed_metric_with_metadatas:       <metric:<type:GAUGE id:\"u'\\n\\000\\010\\000__name__\\032\\000varz_disconnect_end_bucket\\n\\000__rollup__\\004\\000true\\007\\000appname\\n\\000uber-                                                               voice\\013\\000client_type\\007\\000dialpad\\007\\000country\\002\\000mx\\010\\000instance\\014\\0000.0.0.0:8080\\003\\000job\\r\\000log-processor\\002\\000le\\005\\0000.                                            025\\006\\000module\\006\\000answer\\007\\000version\\n\\0002212-02-49\" time_nanos:1672881743902000000 > metadatas:<metadatas:<cutover_nanos:1672771058627771735 metadata:<pipelines:<aggregation_id:<id:128 >       storage_policies:<resolution:<window_size:15000000000 precision:1000000000 > retention:<period:7200000000000 > > pipeline:<> > > > > > "} 

Configs:
M3 Coordinator

listenAddress: 0.0.0.0:7201

logging:
  level: info

metrics:
  scope:
    prefix: "m3coordinator"
  prometheus:
    handlerPath: /metrics
    listenAddress: 0.0.0.0:3030
  sanitization: prometheus
  samplingRate: 1.0

backend: prom-remote

prometheusRemoteBackend: 
  endpoints:
    # There must be one endpoint for unaggregated metric (retention=0, resolution=0) so that m3 would be throw
    # an error.
    # We have a mapping rule to drop unaggregated metric (see downsample-> mappingRules below). So putting endpoint
    # here would NOT direct unaggregated metric.
    - name: unaggregated 
      address: http://nginx-reverse-proxy.monitoring.svc.cluster.local:9092/
          
    # Use the following endpoint for directing pre-aggregated metric to a self-host prometheus instance for testing.
    # address: http://prometheus-m3-pre-agg.m3aggregation.svc.cluster.local:9090/api/v1/write
        
    - name: nginx-sidecar
      address: http://nginx-reverse-proxy.monitoring.svc.cluster.local:9092/
     # Use the following endpoint for directing aggregated metric to a self-host prometheus instance for testing.
     # address: http://prometheus-agg.m3aggregation.svc.cluster.local:9090/api/v1/write
          
      storagePolicy:
        retention: 2h
        resolution: 15s
        downsample:
          all: false
          # all: false means not all the metric is going through downsampling, only those passed the filter
  connectTimeout: 15s # Default is 5s, increase to 15s
  maxIdleConns: 500 # Default is 100, increase to 500

clusterManagement:
  etcd:
    env: m3aggregation
    zone: embedded
    service: m3db
    cacheDir: /var/lib/m3kv
    etcdClusters:
      - zone: embedded
        endpoints:
          - etcd-0.etcd:2379
          - etcd-1.etcd:2379
          - etcd-2.etcd:2379

downsample:
  rules:
    # mapping rule to drop unaggregate metrics.
    mappingRules:
      - name: "Drop unaggregate metric"
        filter: "__name__:*"
        drop: True

    rollupRules:
      # Exclude instance label for non VARZ metrics.
      # eg. web client metric includes a device id in its 'instance' label.
      # eg. K8s metrics includes a pod id in its 'instance' label.
      - name: "Exclude instance for _count"
        filter: "__name__:[!v]??[!z]*_count instance:*"
        transforms:
        - rollup:
            metricName: "{{ .MetricName }}"
            excludeBy: ["instance"]
            aggregations: ["Sum"]
        storagePolicies:
        - resolution: 15s
          retention: 2h
      
      - name: "Exclude instance for _sum"
        filter: "__name__:[!v]??[!z]*_sum instance:*"
        transforms:
        - rollup:
            metricName: "{{ .MetricName }}"
            excludeBy: ["instance"]
            aggregations: ["Sum"]
        storagePolicies:
        - resolution: 15s
          retention: 2h
    
      - name: "Exclude instance for _bucket"
        filter: "__name__:[!v]??[!z]*_bucket instance:*"
        transforms:
        - rollup:
            metricName: "{{ .MetricName }}"
            excludeBy: ["instance"]
            aggregations: ["Sum"]
        storagePolicies:
        - resolution: 15s
          retention: 2h
    
      - name: "Exclude instance for _total"
        filter: "__name__:[!v]??[!z]*_total instance:*"
        transforms:
        - rollup:
            metricName: "{{ .MetricName }}"
            excludeBy: ["instance"]
            aggregations: ["Sum"]
        storagePolicies:
        - resolution: 15s
          retention: 2h
        
      # todo: figure out how to filter out varz metrics
      # We still need this for self monitoring gauge metrics. 
      - name: "Exclude instance for gauge"
        filter: "__name__:!*{_count,_sum,_bucket,_total} instance:*"
        transforms:
        - rollup:
            metricName: "{{ .MetricName }}"
            excludeBy: ["instance"]
            aggregations: ["Last"]
        storagePolicies:
        - resolution: 15s
          retention: 2h
      
      # VARZ's target id is log_processor_instance.
      # Here we use another set of roll up rule to exclude two labels.
      - name: "Exclude log_processor_instance for _count"
        filter: "__name__:varz_*_count log_processor_instance:*"
        transforms:
        - rollup:
            metricName: "{{ .MetricName }}"
            excludeBy: ["log_processor_instance"]
            aggregations: ["Sum"]
        storagePolicies:
        - resolution: 15s
          retention: 2h
      
      - name: "Exclude log_processor_instance for _sum"
        filter: "__name__:varz_*_sum log_processor_instance:*"
        transforms:
        - rollup:
            metricName: "{{ .MetricName }}"
            excludeBy: ["log_processor_instance"]
            aggregations: ["Sum"]
        storagePolicies:
        - resolution: 15s
          retention: 2h
    
      - name: "Exclude log_processor_instance for _bucket"
        filter: "__name__:varz_*_bucket log_processor_instance:*"
        transforms:
        - rollup:
            metricName: "{{ .MetricName }}"
            excludeBy: ["log_processor_instance"]
            aggregations: ["Sum"]
        storagePolicies:
        - resolution: 15s
          retention: 2h
    
      - name: "Exclude log_processor_instance for _total"
        filter: "__name__:varz_*_total log_processor_instance:*"
        transforms:
        - rollup:
            metricName: "{{ .MetricName }}"
            excludeBy: ["log_processor_instance"]
            aggregations: ["Sum"]
        storagePolicies:
        - resolution: 15s
          retention: 2h
      
      # todo: figure out how to apply this for varz metrics
      # - name: "Exclude instance & log_processor_instance for gauge"
      #   filter: "__name__:!*{_count,_sum,_bucket,_total} instance:* log_processor_instance:*"
      #   transforms:
      #   - rollup:
      #       metricName: "{{ .MetricName }}"
      #       excludeBy: ["instance", "log_processor_instance"]
      #       aggregations: ["Last"]
      #   storagePolicies:
      #   - resolution: 15s
      #     retention: 2h


  matcher:
    requireNamespaceWatchOnInit: false

  remoteAggregator:
    client:
      type: m3msg
      m3msg:
        producer:
          writer:
            topicName: aggregator_ingest
            topicServiceOverride:
              zone: embedded
              environment: m3aggregation
            placement:
              isStaged: true
            placementServiceOverride:
              namespaces:
                placement: /placement
            connection:
              numConnections: 64
            messagePool:
              size: 16384
              watermark:
                low: 0.7
                high: 1.0

# This is for configuring the ingestion server that will receive metrics from the m3aggregators on port 7507
ingest:
  ingester:
    workerPoolSize: 10000
    opPool:
      size: 10000
    retry:
      maxRetries: 1
      jitter: true
    logSampleRate: 0.01
  m3msg:
    server:
      listenAddress: "0.0.0.0:7507"
      retry:
        maxBackoff: 10s
        jitter: true

M3 Agg

logging:
  level: info

metrics:
  scope:
    prefix: m3aggregator
  prometheus:
    onError: none
    handlerPath: /metrics
    listenAddress: 0.0.0.0:6002
    timerType: histogram
  sanitization: prometheus
  samplingRate: 1.0
  extended: none

http:
  listenAddress: 0.0.0.0:6001
  readTimeout: 60s
  writeTimeout: 60s

m3msg:
  server:
    listenAddress: 0.0.0.0:6000
    retry:
      maxBackoff: 30s
  consumer:
    messagePool:
      size: 16384

kvClient:
  etcd:
    env: m3aggregation
    zone: embedded
    service: m3aggregator
    cacheDir: /var/lib/m3kv
    etcdClusters:
      - zone: embedded
        endpoints:
          - etcd-0.etcd:2379
          - etcd-1.etcd:2379
          - etcd-2.etcd:2379

runtimeOptions:
  kvConfig:
    environment: m3aggregation
    zone: embedded
  writeValuesPerMetricLimitPerSecondKey: write-values-per-metric-limit-per-second
  writeValuesPerMetricLimitPerSecond: 0
  writeNewMetricLimitClusterPerSecondKey: write-new-metric-limit-cluster-per-second
  writeNewMetricLimitClusterPerSecond: 0
  writeNewMetricNoLimitWarmupDuration: 0

aggregator:
  hostID:
    resolver: environment
    envVarName: M3AGGREGATOR_HOST_ID
  instanceID:
    type: host_id
  verboseErrors: true
  metricPrefix: ""
  counterPrefix: ""
  timerPrefix: ""
  gaugePrefix: ""
  aggregationTypes:
    counterTransformFnType: empty
    timerTransformFnType: suffix
    gaugeTransformFnType: empty
    aggregationTypesPool:
      size: 1024
    quantilesPool:
      buckets:
        - count: 256
          capacity: 4
        - count: 128
          capacity: 8
  stream:
    eps: 0.001
    capacity: 32
    streamPool:
      size: 4096
    samplePool:
      size: 4096
    floatsPool:
      buckets:
        - count: 4096
          capacity: 16
        - count: 2048
          capacity: 32
        - count: 1024
          capacity: 64
  client:
    type: m3msg
    m3msg:
      producer:
        writer:
          topicName: aggregator_ingest
          topicServiceOverride:
            zone: embedded
            environment: m3aggregation
          placement:
            isStaged: true
          placementServiceOverride:
            namespaces:
              placement: /placement
          messagePool:
            size: 16384
            watermark:
              low: 0.7
              high: 1
          messageRetry:
            initialBackoff: 5s # Chronosphere setting.
            maxBackoff: 5s # Chronosphere setting.
  placementManager:
    kvConfig:
      namespace: /placement
      environment: m3aggregation
      zone: embedded
    placementWatcher:
      key: m3aggregator
      initWatchTimeout: 10s
  hashType: murmur32
  bufferDurationBeforeShardCutover: 10m
  bufferDurationAfterShardCutoff: 10m
  bufferDurationForFutureTimedMetric: 10m # Allow test to write into future.
  bufferDurationForPastTimedMetric: 5m # Don't wait too long for timed metrics to flush.
  resignTimeout: 1m
  flushTimesManager:
    kvConfig:
      environment: m3aggregation
      zone: embedded
    flushTimesKeyFmt: shardset/%d/flush
    flushTimesPersistRetrier:
      initialBackoff: 100ms
      backoffFactor: 2.0
      maxBackoff: 30s
      maxRetries: 0
  electionManager:
    election:
      leaderTimeout: 10s
      resignTimeout: 10s
      ttlSeconds: 10
    serviceID:
      name: m3aggregator
      environment: m3aggregation
      zone: embedded
    electionKeyFmt: shardset/%d/lock
    campaignRetrier:
      initialBackoff: 100ms
      backoffFactor: 2.0
      maxBackoff: 2s
      forever: true
      jitter: true
    changeRetrier:
      initialBackoff: 100ms
      backoffFactor: 2.0
      maxBackoff: 5s
      forever: true
      jitter: true
    resignRetrier:
      initialBackoff: 100ms
      backoffFactor: 2.0
      maxBackoff: 5s
      forever: true
      jitter: true
    campaignStateCheckInterval: 1s
    shardCutoffCheckOffset: 30s
  flushManager:
    checkEvery: 1s
    jitterEnabled: true
    maxJitters:
      - flushInterval: 5s
        maxJitterPercent: 1.0
      - flushInterval: 10s
        maxJitterPercent: 0.5
      - flushInterval: 1m
        maxJitterPercent: 0.5
      - flushInterval: 10m
        maxJitterPercent: 0.5
      - flushInterval: 1h
        maxJitterPercent: 0.25
    numWorkersPerCPU: 0.5
    flushTimesPersistEvery: 10s
    maxBufferSize: 5m
    forcedFlushWindowSize: 10s
  flush:
    handlers:
      - dynamicBackend:
          name: m3msg
          hashType: murmur32
          producer:
            buffer:
              maxBufferSize: 1000000000 # max buffer before m3msg start dropping data.
            writer:
              topicName: aggregated_metrics
              topicServiceOverride:
                zone: embedded
                environment: m3aggregation
              messagePool:
                size: 16384
                watermark:
                  low: 0.7
                  high: 1
  passthrough:
    enabled: true
  forwarding:
    maxSingleDelay: 30s # Chronosphere setting.
    maxConstDelay: 5m # Need to add some buffer window, since timed metrics by default are delayed by 1min.
  entryTTL: 11h
  entryCheckInterval: 5m
  maxTimerBatchSizePerWrite: 140
  defaultStoragePolicies: []
  maxNumCachedSourceSets: 2
  discardNaNAggregatedValues: true
  entryPool:
    size: 4096
  counterElemPool:
    size: 4096
  timerElemPool:
    size: 4096
  gaugeElemPool:
    size: 0

Performance issues

If the issue is performance related, please provide the following information along with a description of the issue that you're experiencing:

  1. What service is experiencing the performance issue? (M3Coordinator, M3DB, M3Aggregator, etc)
  2. Approximately how many datapoints per second is the service handling?
  3. What is the approximate series cardinality that the series is handling in a given time window? I.E How many unique time series are being measured?
  4. What is the hardware configuration (number CPU cores, amount of RAM, disk size and types, etc) that the service is running on? Is the service the only process running on the host or is it colocated with other software?
  5. What is the configuration of the service? Please include any YAML files, as well as namespace / placement configuration (with any sensitive information anonymized if necessary).
  6. How are you using the service? For example, are you performing read/writes to the service via Prometheus, or are you using a custom script?

In addition to the above information, CPU and heap profiles are always greatly appreciated.

CPU / Heap Profiles

CPU and heap profiles are critical to helping us debug performance issues. All our services run with the net/http/pprof server enabled by default.

Instructions for obtaining CPU / heap profiles for various services are below, please attach these profiles to the issue whenever possible.

M3Coordinator

CPU
curl <HOST_NAME>:<PORT(default 7201)>/debug/pprof/profile?seconds=5 > m3coord_cpu.out

Heap
curl <HOST_NAME>:<PORT(default 7201)>/debug/pprof/heap > m3coord_heap.out

M3DB

CPU
curl <HOST_NAME>:<PORT(default 9004)>/debug/pprof/profile?seconds=5 > m3db_cpu.out

Heap
curl <HOST_NAME>:<PORT(default 9004)>/debug/pprof/heap -> m3db_heap.out

M3DB Grafana Dashboard Screenshots

If the service experiencing performance issues is M3DB and you're monitoring it using Prometheus, any screenshots you could provide using this dashboard would be helpful.

@atibdialpad
Copy link
Author

atibdialpad commented Jan 5, 2023

Observed couple of deviation between my config and the standard configs in the repo
entryTTL: 11h in the aggregation config ? I could see it being set to 1 hour in almost all the config examples in the repo.
bufferDurationForPastTimedMetric: 5m as opposed to 10 seconds in almost all the config examples in the repo.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant