Skip to content

Commit

Permalink
Merge pull request #3158 from DataDog/anmarchenko/configurable_test_mode
Browse files Browse the repository at this point in the history
[Tracer] Introduce async configuration for test mode to use standard writer when needed
  • Loading branch information
anmarchenko authored Sep 27, 2023
2 parents ce73a51 + d0f3808 commit 157fee6
Show file tree
Hide file tree
Showing 14 changed files with 226 additions and 64 deletions.
4 changes: 2 additions & 2 deletions lib/datadog/core/workers/polling.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Core
module Workers
# Adds polling (async looping) behavior to workers
module Polling
SHUTDOWN_TIMEOUT = 1
DEFAULT_SHUTDOWN_TIMEOUT = 1

def self.included(base)
base.include(Workers::IntervalLoop)
Expand All @@ -21,7 +21,7 @@ def perform(*args)
end
end

def stop(force_stop = false, timeout = SHUTDOWN_TIMEOUT)
def stop(force_stop = false, timeout = DEFAULT_SHUTDOWN_TIMEOUT)
if running?
# Attempt graceful stop and wait
stop_loop
Expand Down
9 changes: 6 additions & 3 deletions lib/datadog/tracing/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,12 @@ def ensure_priority_sampling(sampler, settings)
# process, but can take a variety of options (including
# a fully custom instance) that makes the Tracer
# initialization process complex.
def build_writer(settings, agent_settings)
def build_writer(settings, agent_settings, options = settings.tracing.writer_options)
if (writer = settings.tracing.writer)
return writer
end

Tracing::Writer.new(agent_settings: agent_settings, **settings.tracing.writer_options)
Tracing::Writer.new(agent_settings: agent_settings, **options)
end

def subscribe_to_writer_events!(writer, sampler_delegator, test_mode)
Expand Down Expand Up @@ -223,8 +223,11 @@ def build_test_mode_sampler
end

def build_test_mode_writer(settings, agent_settings)
# Flush traces synchronously, to guarantee they are written.
writer_options = settings.tracing.test_mode.writer_options || {}

return build_writer(settings, agent_settings, writer_options) if settings.tracing.test_mode.async

# Flush traces synchronously, to guarantee they are written.
Tracing::SyncWriter.new(agent_settings: agent_settings, **writer_options)
end
end
Expand Down
6 changes: 6 additions & 0 deletions lib/datadog/tracing/configuration/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,12 @@ def self.extended(base)
o.env Tracing::Configuration::Ext::Test::ENV_MODE_ENABLED
end

# Use async writer in test mode
option :async do |o|
o.type :bool
o.default false
end

option :trace_flush

option :writer_options do |o|
Expand Down
5 changes: 3 additions & 2 deletions lib/datadog/tracing/workers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class AsyncTransport
DEFAULT_TIMEOUT = 5
BACK_OFF_RATIO = 1.2
BACK_OFF_MAX = 5
SHUTDOWN_TIMEOUT = 1
DEFAULT_SHUTDOWN_TIMEOUT = 1

attr_reader \
:trace_buffer
Expand All @@ -36,6 +36,7 @@ def initialize(options = {})

# Threading
@shutdown = ConditionVariable.new
@shutdown_timeout = options.fetch(:shutdown_timeout, DEFAULT_SHUTDOWN_TIMEOUT)
@mutex = Mutex.new
@worker = nil
@run = false
Expand Down Expand Up @@ -89,7 +90,7 @@ def stop

# Block until executor shutdown is complete or until timeout seconds have passed.
def join
@worker.join(SHUTDOWN_TIMEOUT)
@worker.join(@shutdown_timeout)
end

# Enqueue an item in the trace internal buffer. This operation is thread-safe
Expand Down
4 changes: 3 additions & 1 deletion lib/datadog/tracing/workers/trace_writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ def initialize(options = {})
# Workers::Queue settings
@buffer_size = options.fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE)
self.buffer = TraceBuffer.new(@buffer_size)

@shutdown_timeout = options.fetch(:shutdown_timeout, Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT)
end

# NOTE: #perform is wrapped by other modules:
Expand All @@ -119,7 +121,7 @@ def perform(traces)
nil
end

def stop(*args)
def stop(force_stop = false, timeout = @shutdown_timeout)
buffer.close if running?
super
end
Expand Down
5 changes: 4 additions & 1 deletion lib/datadog/tracing/writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def initialize(options = {})
Transport::HTTP.default(**transport_options)
end

@shutdown_timeout = options.fetch(:shutdown_timeout, Workers::AsyncTransport::DEFAULT_SHUTDOWN_TIMEOUT)

# handles the thread creation after an eventual fork
@mutex_after_fork = Mutex.new
@pid = nil
Expand Down Expand Up @@ -72,7 +74,8 @@ def start_worker
transport: @transport,
buffer_size: @buff_size,
on_trace: @trace_handler,
interval: @flush_interval
interval: @flush_interval,
shutdown_timeout: @shutdown_timeout
)

@worker.start
Expand Down
2 changes: 1 addition & 1 deletion sig/datadog/tracing/component.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module Datadog
def build_sampler: (untyped settings) -> untyped

def ensure_priority_sampling: (untyped sampler, untyped settings) -> untyped
def build_writer: (untyped settings, untyped agent_settings) -> untyped
def build_writer: (untyped settings, untyped agent_settings, ?Hash[Symbol, untyped] options) -> untyped

def subscribe_to_writer_events!: (untyped writer, untyped sampler, untyped test_mode) -> (nil | untyped)

Expand Down
131 changes: 81 additions & 50 deletions spec/datadog/core/configuration/components_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -785,73 +785,104 @@

context 'set to true' do
let(:enabled) { true }
let(:sync_writer) { Datadog::Tracing::SyncWriter.new }

before do
expect(Datadog::Tracing::SyncWriter)
.to receive(:new)
.with(agent_settings: agent_settings, **writer_options)
.and_return(writer)
end
context 'and :async' do
context 'is set' do
let(:writer) { Datadog::Tracing::Writer.new }
let(:writer_options) { { transport_options: :bar } }
let(:writer_options_test_mode) { { transport_options: :baz } }

context 'and :trace_flush' do
before do
allow(settings.tracing.test_mode)
.to receive(:trace_flush)
.and_return(trace_flush)
before do
allow(settings.tracing.test_mode)
.to receive(:async)
.and_return(true)

allow(settings.tracing.test_mode)
.to receive(:writer_options)
.and_return(writer_options_test_mode)

expect(Datadog::Tracing::SyncWriter)
.not_to receive(:new)

expect(Datadog::Tracing::Writer)
.to receive(:new)
.with(agent_settings: agent_settings, **writer_options_test_mode)
.and_return(writer)
end

it_behaves_like 'event publishing writer'
end

context 'is not set' do
let(:trace_flush) { nil }
let(:sync_writer) { Datadog::Tracing::SyncWriter.new }

it_behaves_like 'new tracer' do
let(:options) do
{
writer: kind_of(Datadog::Tracing::SyncWriter)
}
before do
expect(Datadog::Tracing::SyncWriter)
.to receive(:new)
.with(agent_settings: agent_settings, **writer_options)
.and_return(writer)
end

context 'and :trace_flush' do
before do
allow(settings.tracing.test_mode)
.to receive(:trace_flush)
.and_return(trace_flush)
end
let(:writer) { sync_writer }

it_behaves_like 'event publishing writer'
end
end
context 'is not set' do
let(:trace_flush) { nil }

context 'is set' do
let(:trace_flush) { instance_double(Datadog::Tracing::Flush::Finished) }
it_behaves_like 'new tracer' do
let(:options) do
{
writer: kind_of(Datadog::Tracing::SyncWriter)
}
end
let(:writer) { sync_writer }

it_behaves_like 'new tracer' do
let(:options) do
{
trace_flush: trace_flush,
writer: kind_of(Datadog::Tracing::SyncWriter)
}
it_behaves_like 'event publishing writer'
end
end
let(:writer) { sync_writer }

it_behaves_like 'event publishing writer'
end
end
end
context 'is set' do
let(:trace_flush) { instance_double(Datadog::Tracing::Flush::Finished) }

context 'and :writer_options' do
before do
allow(settings.tracing.test_mode)
.to receive(:writer_options)
.and_return(writer_options)
end
it_behaves_like 'new tracer' do
let(:options) do
{
trace_flush: trace_flush,
writer: kind_of(Datadog::Tracing::SyncWriter)
}
end
let(:writer) { sync_writer }

context 'are set' do
let(:writer_options) { { transport_options: :bar } }
it_behaves_like 'event publishing writer'
end
end
end

it_behaves_like 'new tracer' do
let(:options) do
{
writer: writer
}
context 'and :writer_options' do
before do
allow(settings.tracing.test_mode)
.to receive(:writer_options)
.and_return(writer_options)
end
let(:writer) { sync_writer }

it_behaves_like 'event publishing writer'
context 'are set' do
let(:writer_options) { { transport_options: :bar } }

it_behaves_like 'new tracer' do
let(:options) do
{
writer: writer
}
end
let(:writer) { sync_writer }

it_behaves_like 'event publishing writer'
end
end
end
end
end
Expand Down
20 changes: 18 additions & 2 deletions spec/datadog/core/workers/polling_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@
shared_context 'graceful stop' do
before do
allow(worker).to receive(:join)
.with(described_class::SHUTDOWN_TIMEOUT)
.with(described_class::DEFAULT_SHUTDOWN_TIMEOUT)
.and_return(true)
end
end

context 'when the worker has not been started' do
before do
allow(worker).to receive(:join)
.with(described_class::SHUTDOWN_TIMEOUT)
.with(described_class::DEFAULT_SHUTDOWN_TIMEOUT)
.and_return(true)
end

Expand Down Expand Up @@ -113,6 +113,22 @@
end
end
end

context 'given shutdown timeout' do
subject(:stop) { worker.stop(false, 1000) }
include_context 'graceful stop'

before do
expect(worker).to receive(:join)
.with(1000)
.and_return(true)

worker.perform
try_wait_until { worker.running? && worker.run_loop? }
end

it { is_expected.to be true }
end
end

describe '#enabled?' do
Expand Down
15 changes: 15 additions & 0 deletions spec/datadog/tracing/configuration/settings_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,21 @@ def propagation_inject_style
end
end

describe '#async' do
subject(:enabled) { settings.tracing.test_mode.async }

it { is_expected.to be false }
end

describe '#async=' do
it 'updates the #async setting' do
expect { settings.tracing.test_mode.async = true }
.to change { settings.tracing.test_mode.async }
.from(false)
.to(true)
end
end

describe '#writer_options' do
subject(:writer_options) { settings.tracing.test_mode.writer_options }

Expand Down
16 changes: 16 additions & 0 deletions spec/datadog/tracing/workers/trace_writer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,22 @@
end
end
end

context 'given shutdown_timeout' do
let(:options) { { shutdown_timeout: 1000 } }
include_context 'shuts down the worker'

context 'and the worker has been started' do
before do
expect(writer).to receive(:join).with(1000).and_return(true)

writer.perform
try_wait_until { writer.running? && writer.run_loop? }
end

it { is_expected.to be true }
end
end
end

describe '#work_pending?' do
Expand Down
4 changes: 2 additions & 2 deletions spec/datadog/tracing/workers_integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def wait_for_flush(num = 1, period = 0.1)
expect(trace_task).to have_received(:call).once
expect(service_task).to_not have_received(:call)
expect(@shutdown_end - @shutdown_beg)
.to be < Datadog::Tracing::Workers::AsyncTransport::SHUTDOWN_TIMEOUT
.to be < Datadog::Tracing::Workers::AsyncTransport::DEFAULT_SHUTDOWN_TIMEOUT
end
end

Expand All @@ -248,7 +248,7 @@ def wait_for_flush(num = 1, period = 0.1)
it 'interrupts the worker to speed up shutdown' do
expect(@shutdown_end - @shutdown_beg)
.to be_within(5).of(
Datadog::Tracing::Workers::AsyncTransport::SHUTDOWN_TIMEOUT
Datadog::Tracing::Workers::AsyncTransport::DEFAULT_SHUTDOWN_TIMEOUT
)
end
end
Expand Down
Loading

0 comments on commit 157fee6

Please sign in to comment.