Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tracer] Introduce async configuration for test mode to use standard writer when needed #3158

Merged
merged 7 commits into from
Sep 27, 2023
4 changes: 2 additions & 2 deletions lib/datadog/core/workers/polling.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Core
module Workers
# Adds polling (async looping) behavior to workers
module Polling
SHUTDOWN_TIMEOUT = 1
DEFAULT_SHUTDOWN_TIMEOUT = 1

def self.included(base)
base.include(Workers::IntervalLoop)
Expand All @@ -21,7 +21,7 @@ def perform(*args)
end
end

def stop(force_stop = false, timeout = SHUTDOWN_TIMEOUT)
def stop(force_stop = false, timeout = DEFAULT_SHUTDOWN_TIMEOUT)
if running?
# Attempt graceful stop and wait
stop_loop
Expand Down
9 changes: 6 additions & 3 deletions lib/datadog/tracing/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,12 @@ def ensure_priority_sampling(sampler, settings)
# process, but can take a variety of options (including
# a fully custom instance) that makes the Tracer
# initialization process complex.
def build_writer(settings, agent_settings)
def build_writer(settings, agent_settings, options = settings.tracing.writer_options)
if (writer = settings.tracing.writer)
return writer
end

Tracing::Writer.new(agent_settings: agent_settings, **settings.tracing.writer_options)
Tracing::Writer.new(agent_settings: agent_settings, **options)
end

def subscribe_to_writer_events!(writer, sampler_delegator, test_mode)
Expand Down Expand Up @@ -223,8 +223,11 @@ def build_test_mode_sampler
end

def build_test_mode_writer(settings, agent_settings)
# Flush traces synchronously, to guarantee they are written.
writer_options = settings.tracing.test_mode.writer_options || {}

return build_writer(settings, agent_settings, writer_options) if settings.tracing.test_mode.async

# Flush traces synchronously, to guarantee they are written.
Tracing::SyncWriter.new(agent_settings: agent_settings, **writer_options)
end
end
Expand Down
6 changes: 6 additions & 0 deletions lib/datadog/tracing/configuration/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,12 @@ def self.extended(base)
o.env Tracing::Configuration::Ext::Test::ENV_MODE_ENABLED
end

# Use async writer in test mode
option :async do |o|
o.type :bool
o.default false
end

option :trace_flush

option :writer_options do |o|
Expand Down
5 changes: 3 additions & 2 deletions lib/datadog/tracing/workers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class AsyncTransport
DEFAULT_TIMEOUT = 5
BACK_OFF_RATIO = 1.2
BACK_OFF_MAX = 5
SHUTDOWN_TIMEOUT = 1
DEFAULT_SHUTDOWN_TIMEOUT = 1

attr_reader \
:trace_buffer
Expand All @@ -36,6 +36,7 @@ def initialize(options = {})

# Threading
@shutdown = ConditionVariable.new
@shutdown_timeout = options.fetch(:shutdown_timeout, DEFAULT_SHUTDOWN_TIMEOUT)
@mutex = Mutex.new
@worker = nil
@run = false
Expand Down Expand Up @@ -89,7 +90,7 @@ def stop

# Block until executor shutdown is complete or until timeout seconds have passed.
def join
@worker.join(SHUTDOWN_TIMEOUT)
@worker.join(@shutdown_timeout)
end

# Enqueue an item in the trace internal buffer. This operation is thread-safe
Expand Down
4 changes: 3 additions & 1 deletion lib/datadog/tracing/workers/trace_writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ def initialize(options = {})
# Workers::Queue settings
@buffer_size = options.fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE)
self.buffer = TraceBuffer.new(@buffer_size)

@shutdown_timeout = options.fetch(:shutdown_timeout, Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT)
end

# NOTE: #perform is wrapped by other modules:
Expand All @@ -119,7 +121,7 @@ def perform(traces)
nil
end

def stop(*args)
def stop(force_stop = false, timeout = @shutdown_timeout)
buffer.close if running?
super
end
Expand Down
5 changes: 4 additions & 1 deletion lib/datadog/tracing/writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def initialize(options = {})
Transport::HTTP.default(**transport_options)
end

@shutdown_timeout = options.fetch(:shutdown_timeout, Workers::AsyncTransport::DEFAULT_SHUTDOWN_TIMEOUT)

# handles the thread creation after an eventual fork
@mutex_after_fork = Mutex.new
@pid = nil
Expand Down Expand Up @@ -72,7 +74,8 @@ def start_worker
transport: @transport,
buffer_size: @buff_size,
on_trace: @trace_handler,
interval: @flush_interval
interval: @flush_interval,
shutdown_timeout: @shutdown_timeout
)

@worker.start
Expand Down
2 changes: 1 addition & 1 deletion sig/datadog/tracing/component.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module Datadog
def build_sampler: (untyped settings) -> untyped

def ensure_priority_sampling: (untyped sampler, untyped settings) -> untyped
def build_writer: (untyped settings, untyped agent_settings) -> untyped
def build_writer: (untyped settings, untyped agent_settings, ?Hash[Symbol, untyped] options) -> untyped

def subscribe_to_writer_events!: (untyped writer, untyped sampler, untyped test_mode) -> (nil | untyped)

Expand Down
131 changes: 81 additions & 50 deletions spec/datadog/core/configuration/components_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -785,73 +785,104 @@

context 'set to true' do
let(:enabled) { true }
let(:sync_writer) { Datadog::Tracing::SyncWriter.new }

before do
expect(Datadog::Tracing::SyncWriter)
.to receive(:new)
.with(agent_settings: agent_settings, **writer_options)
.and_return(writer)
end
context 'and :async' do
context 'is set' do
let(:writer) { Datadog::Tracing::Writer.new }
let(:writer_options) { { transport_options: :bar } }
let(:writer_options_test_mode) { { transport_options: :baz } }

context 'and :trace_flush' do
before do
allow(settings.tracing.test_mode)
.to receive(:trace_flush)
.and_return(trace_flush)
before do
allow(settings.tracing.test_mode)
.to receive(:async)
.and_return(true)

allow(settings.tracing.test_mode)
.to receive(:writer_options)
.and_return(writer_options_test_mode)

expect(Datadog::Tracing::SyncWriter)
.not_to receive(:new)

expect(Datadog::Tracing::Writer)
.to receive(:new)
.with(agent_settings: agent_settings, **writer_options_test_mode)
.and_return(writer)
end

it_behaves_like 'event publishing writer'
end

context 'is not set' do
let(:trace_flush) { nil }
let(:sync_writer) { Datadog::Tracing::SyncWriter.new }

it_behaves_like 'new tracer' do
let(:options) do
{
writer: kind_of(Datadog::Tracing::SyncWriter)
}
before do
expect(Datadog::Tracing::SyncWriter)
.to receive(:new)
.with(agent_settings: agent_settings, **writer_options)
.and_return(writer)
end

context 'and :trace_flush' do
before do
allow(settings.tracing.test_mode)
.to receive(:trace_flush)
.and_return(trace_flush)
end
let(:writer) { sync_writer }

it_behaves_like 'event publishing writer'
end
end
context 'is not set' do
let(:trace_flush) { nil }

context 'is set' do
let(:trace_flush) { instance_double(Datadog::Tracing::Flush::Finished) }
it_behaves_like 'new tracer' do
let(:options) do
{
writer: kind_of(Datadog::Tracing::SyncWriter)
}
end
let(:writer) { sync_writer }

it_behaves_like 'new tracer' do
let(:options) do
{
trace_flush: trace_flush,
writer: kind_of(Datadog::Tracing::SyncWriter)
}
it_behaves_like 'event publishing writer'
end
end
let(:writer) { sync_writer }

it_behaves_like 'event publishing writer'
end
end
end
context 'is set' do
let(:trace_flush) { instance_double(Datadog::Tracing::Flush::Finished) }

context 'and :writer_options' do
before do
allow(settings.tracing.test_mode)
.to receive(:writer_options)
.and_return(writer_options)
end
it_behaves_like 'new tracer' do
let(:options) do
{
trace_flush: trace_flush,
writer: kind_of(Datadog::Tracing::SyncWriter)
}
end
let(:writer) { sync_writer }

context 'are set' do
let(:writer_options) { { transport_options: :bar } }
it_behaves_like 'event publishing writer'
end
end
end

it_behaves_like 'new tracer' do
let(:options) do
{
writer: writer
}
context 'and :writer_options' do
before do
allow(settings.tracing.test_mode)
.to receive(:writer_options)
.and_return(writer_options)
end
let(:writer) { sync_writer }

it_behaves_like 'event publishing writer'
context 'are set' do
let(:writer_options) { { transport_options: :bar } }

it_behaves_like 'new tracer' do
let(:options) do
{
writer: writer
}
end
let(:writer) { sync_writer }

it_behaves_like 'event publishing writer'
end
end
end
end
end
Expand Down
20 changes: 18 additions & 2 deletions spec/datadog/core/workers/polling_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@
shared_context 'graceful stop' do
before do
allow(worker).to receive(:join)
.with(described_class::SHUTDOWN_TIMEOUT)
.with(described_class::DEFAULT_SHUTDOWN_TIMEOUT)
.and_return(true)
end
end

context 'when the worker has not been started' do
before do
allow(worker).to receive(:join)
.with(described_class::SHUTDOWN_TIMEOUT)
.with(described_class::DEFAULT_SHUTDOWN_TIMEOUT)
.and_return(true)
end

Expand Down Expand Up @@ -113,6 +113,22 @@
end
end
end

context 'given shutdown timeout' do
subject(:stop) { worker.stop(false, 1000) }
include_context 'graceful stop'

before do
expect(worker).to receive(:join)
.with(1000)
.and_return(true)

worker.perform
try_wait_until { worker.running? && worker.run_loop? }
end

it { is_expected.to be true }
end
end

describe '#enabled?' do
Expand Down
15 changes: 15 additions & 0 deletions spec/datadog/tracing/configuration/settings_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,21 @@ def propagation_inject_style
end
end

describe '#async' do
subject(:enabled) { settings.tracing.test_mode.async }

it { is_expected.to be false }
end

describe '#async=' do
it 'updates the #async setting' do
expect { settings.tracing.test_mode.async = true }
.to change { settings.tracing.test_mode.async }
.from(false)
.to(true)
end
end

describe '#writer_options' do
subject(:writer_options) { settings.tracing.test_mode.writer_options }

Expand Down
16 changes: 16 additions & 0 deletions spec/datadog/tracing/workers/trace_writer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,22 @@
end
end
end

context 'given shutdown_timeout' do
let(:options) { { shutdown_timeout: 1000 } }
include_context 'shuts down the worker'

context 'and the worker has been started' do
before do
expect(writer).to receive(:join).with(1000).and_return(true)

writer.perform
try_wait_until { writer.running? && writer.run_loop? }
end

it { is_expected.to be true }
end
end
end

describe '#work_pending?' do
Expand Down
4 changes: 2 additions & 2 deletions spec/datadog/tracing/workers_integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def wait_for_flush(num = 1, period = 0.1)
expect(trace_task).to have_received(:call).once
expect(service_task).to_not have_received(:call)
expect(@shutdown_end - @shutdown_beg)
.to be < Datadog::Tracing::Workers::AsyncTransport::SHUTDOWN_TIMEOUT
.to be < Datadog::Tracing::Workers::AsyncTransport::DEFAULT_SHUTDOWN_TIMEOUT
end
end

Expand All @@ -248,7 +248,7 @@ def wait_for_flush(num = 1, period = 0.1)
it 'interrupts the worker to speed up shutdown' do
expect(@shutdown_end - @shutdown_beg)
.to be_within(5).of(
Datadog::Tracing::Workers::AsyncTransport::SHUTDOWN_TIMEOUT
Datadog::Tracing::Workers::AsyncTransport::DEFAULT_SHUTDOWN_TIMEOUT
)
end
end
Expand Down
Loading
Loading