Skip to content
This repository has been archived by the owner on Apr 28, 2022. It is now read-only.

Commit

Permalink
Fix Lost Spans on Close (#182)
Browse files Browse the repository at this point in the history
* Make InMemorySender Asynchronous

To demonstrate lost span problem (to be shortly reported). When AppendAsync is
actually asynchronous, some spans are lost on Tracer Dispose. This change should
have no effect, but causes RemoteReporterTests.TestRemoteReporterFlushesOnClose
to fail intermittently.

Signed-off-by: phxnsharp <[email protected]>

* Do not use Task.Factory.StartNew

For some reason awaiting the task returned always completes immediately.

Signed-off-by: phxnsharp <[email protected]>

* Add Thread Safety to InMemorySender

This didn't help reliability, but I believe it is necessary because
different threads may be adding and flushing at the same time.

Signed-off-by: phxnsharp <[email protected]>

* Style cleanup

Use slightly better style for _blocker. Also, turns out async in
Flush causes other issues, removed for now.

Signed-off-by: phxnsharp <[email protected]>

* Fix Intermittent Test Failure

Found several problems that were contributing:
 - RemoteControlledSampler was using a long lived Wait() call on a thread pool thread, which was consuming Thread Pool threads and causing delays as the thread pool was exausted
 - RemoteReporter.ProcessQueueLoop does have long lived waits due to the BlockingQueue. Reworked it to work correctly on a LongLived thread without consuming thread pool threads.
 - Many of the tests were leaving behind the ProcessQueueLoop because the blocker was never released.

There is still an intermittent test failure, but that was there before I started changing code.

Signed-off-by: phxnsharp <[email protected]>
Co-authored-by: Benjamin Krämer <[email protected]>
  • Loading branch information
phxnsharp and Falco20019 committed Jul 15, 2020
1 parent 69d02ca commit c49058c
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 16 deletions.
6 changes: 3 additions & 3 deletions src/Jaeger.Core/Reporters/RemoteReporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class RemoteReporter : IReporter
_queueProcessorTask = Task.Factory.StartNew(ProcessQueueLoop, TaskCreationOptions.LongRunning);

_flushInterval = flushInterval;
_flushTask = Task.Factory.StartNew(FlushLoop, TaskCreationOptions.LongRunning);
_flushTask = Task.Run(FlushLoop);
}

public void Report(Span span)
Expand Down Expand Up @@ -123,14 +123,14 @@ private async Task FlushLoop()
while (!_commandQueue.IsAddingCompleted);
}

private async Task ProcessQueueLoop()
private void ProcessQueueLoop()
{
// This blocks until a command is available or IsCompleted=true
foreach (ICommand command in _commandQueue.GetConsumingEnumerable())
{
try
{
await command.ExecuteAsync().ConfigureAwait(false);
command.ExecuteAsync().Wait();
}
catch (SenderException ex)
{
Expand Down
6 changes: 3 additions & 3 deletions src/Jaeger.Core/Samplers/RemoteControlledSampler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ private RemoteControlledSampler(Builder builder)
/// <summary>
/// Updates <see cref="Sampler"/> to a new sampler when it is different.
/// </summary>
internal void UpdateSampler()
internal async void UpdateSampler()
{
try
{
SamplingStrategyResponse response = _samplingManager.GetSamplingStrategyAsync(_serviceName)
.ConfigureAwait(false).GetAwaiter().GetResult();
SamplingStrategyResponse response = await _samplingManager.GetSamplingStrategyAsync(_serviceName)
.ConfigureAwait(false);

_metrics.SamplerRetrieved.Inc(1);

Expand Down
39 changes: 29 additions & 10 deletions test/Jaeger.Core.Tests/Senders/InMemorySender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,46 @@ public List<Span> GetReceived()
return new List<Span>(_received);
}

public Task<int> AppendAsync(Span span, CancellationToken cancellationToken)
public async Task<int> AppendAsync(Span span, CancellationToken cancellationToken)
{
_blocker.Wait(cancellationToken);
//This serves to both make this call actually asynchronous and also to prevent the
//blocking call from consuming a Thread Pool thread.
await Task.Factory.StartNew(() => _blocker.Wait(cancellationToken),
TaskCreationOptions.LongRunning);

_appended.Add(span);
_received.Add(span);
return Task.FromResult(0);
lock (_appended)
{
_appended.Add(span);
_received.Add(span);
}
return 0;
}

public virtual Task<int> FlushAsync(CancellationToken cancellationToken)
{
int flushedSpans = _appended.Count;
_flushed.AddRange(_appended);
_appended.Clear();
//This conflicts with the way TestCloseWhenQueueFull is written. Since
//it blocks the process queue from ever ending, RemoteReporter.CloseAsync
//is guaranteed to timeout, which means cancellationToken here will already
//be set. This prevents the rest of the function from running, causing the
//test to fail.
//await Task.Delay(1, cancellationToken);

int flushedSpans;
lock (_appended )
{
flushedSpans = _appended.Count;
_flushed.AddRange(_appended);
_appended.Clear();
}

return Task.FromResult(flushedSpans);
}

public Task<int> CloseAsync(CancellationToken cancellationToken)
public async Task<int> CloseAsync(CancellationToken cancellationToken)
{
return FlushAsync(cancellationToken);
int result = await FlushAsync(cancellationToken);
AllowAppend();
return result;
}

public void BlockAppend()
Expand Down

0 comments on commit c49058c

Please sign in to comment.