Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed tracing for Sidekiq #2513

Merged
merged 15 commits into from
Apr 13, 2023
Merged
1 change: 1 addition & 0 deletions docs/GettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -2015,6 +2015,7 @@ end

| Key | Description | Default |
| --- | ----------- | ------- |
| `distributed_tracing` | Enabling [distributed tracing](#distributed-tracing) creates a parent-child relationship between the `sidekiq.push` span and the `sidekiq.job` span. <br /><br />**Important**: *Enabling distributed_tracing for asynchronous processing can result in drastic changes in your trace graph. Such cases include long running jobs, retried jobs, and jobs scheduled in the far future. Make sure to inspect your traces after enabling this feature.* | `false` |
| `tag_args` | Enable tagging of job arguments. `true` for on, `false` for off. | `false` |
| `error_handler` | Custom error handler invoked when a job raises an error. Provided `span` and `error` as arguments. Sets error on the span by default. Useful for ignoring transient errors. | `proc { \|span, error\| span.set_error(error) unless span.nil? }` |
| `quantize` | Hash containing options for quantization of job arguments. | `{}` |
Expand Down
13 changes: 10 additions & 3 deletions lib/datadog/tracing/contrib/sidekiq/client_tracer.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
require_relative '../../metadata/ext'
require_relative '../analytics'
require_relative 'distributed/propagation'
require_relative 'ext'
require_relative 'tracing'
require_relative 'utils'

module Datadog
module Tracing
module Contrib
module Sidekiq
# Tracer is a Sidekiq client-side middleware which traces job enqueues/pushes
class ClientTracer
include Tracing
include Utils

def initialize(options = {})
@sidekiq_service = options[:client_service_name] || configuration[:client_service_name]
Expand All @@ -20,7 +21,9 @@ def initialize(options = {})
def call(worker_class, job, queue, redis_pool)
resource = job_resource(job)

Datadog::Tracing.trace(Ext::SPAN_PUSH, service: @sidekiq_service) do |span|
Datadog::Tracing.trace(Ext::SPAN_PUSH, service: @sidekiq_service) do |span, trace_op|
propagation.inject!(trace_op, job) if configuration[:distributed_tracing]
marcotc marked this conversation as resolved.
Show resolved Hide resolved

span.resource = resource

span.set_tag(Contrib::Ext::Messaging::TAG_SYSTEM, Ext::TAG_COMPONENT)
Expand Down Expand Up @@ -50,6 +53,10 @@ def call(worker_class, job, queue, redis_pool)
def configuration
Datadog.configuration.tracing[:sidekiq]
end

def propagation
@propagation ||= Contrib::Sidekiq::Distributed::Propagation.new
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Settings < Contrib::Configuration::Settings
option :client_service_name
option :error_handler, default: Tracing::SpanOperation::Events::DEFAULT_ON_ERROR
option :quantize, default: {}
option :distributed_tracing, default: false
end
end
end
Expand Down
38 changes: 38 additions & 0 deletions lib/datadog/tracing/contrib/sidekiq/distributed/propagation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# frozen_string_literal: true

require_relative '../../../distributed/fetcher'
require_relative '../../../distributed/propagation'
require_relative '../../../distributed/b3_multi'
require_relative '../../../distributed/b3_single'
require_relative '../../../distributed/datadog'
require_relative '../../../distributed/none'
require_relative '../../../distributed/trace_context'
require_relative '../../../configuration/ext'

module Datadog
module Tracing
module Contrib
module Sidekiq
module Distributed
# Extracts and injects propagation through HTTP headers.
class Propagation < Tracing::Distributed::Propagation
def initialize
super(
propagation_styles: {
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_MULTI_HEADER =>
Tracing::Distributed::B3Multi.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_SINGLE_HEADER =>
Tracing::Distributed::B3Single.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_DATADOG =>
Tracing::Distributed::Datadog.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_TRACE_CONTEXT =>
Tracing::Distributed::TraceContext.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_NONE => Tracing::Distributed::None.new
})
end
end
end
end
end
end
end
18 changes: 16 additions & 2 deletions lib/datadog/tracing/contrib/sidekiq/server_tracer.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
require_relative '../../metadata/ext'
require_relative '../analytics'
require_relative 'ext'
require_relative 'tracing'
require_relative 'utils'
require_relative '../utils/quantization/hash'
require_relative 'distributed/propagation'

module Datadog
module Tracing
module Contrib
module Sidekiq
# Tracer is a Sidekiq server-side middleware which traces executed jobs
class ServerTracer
include Tracing
include Utils

QUANTIZE_SHOW_ALL = { args: { show: :all } }.freeze

Expand All @@ -19,9 +20,16 @@ def initialize(options = {})
@error_handler = options[:error_handler] || configuration[:error_handler]
end

# rubocop:disable Metrics/MethodLength
# rubocop:disable Metrics/AbcSize
def call(worker, job, queue)
resource = job_resource(job)

if configuration[:distributed_tracing]
trace_digest = propagation.extract(job)
Datadog::Tracing.continue_trace!(trace_digest)
end

service = worker_config(resource, :service_name) || @sidekiq_service
# DEV-2.0: Remove `tag_args`, as `quantize` can fulfill the same contract
tag_args = worker_config(resource, :tag_args) || configuration[:tag_args]
Expand Down Expand Up @@ -68,10 +76,16 @@ def call(worker, job, queue)

yield
end
# rubocop:enable Metrics/MethodLength
# rubocop:enable Metrics/AbcSize
end

private

def propagation
@propagation ||= Contrib::Sidekiq::Distributed::Propagation.new
end

def quantize_args(quantize, args)
quantize_options = quantize && quantize[:args]
quantize_options ||= {}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

require 'yaml'

require_relative '../../../core'
Expand All @@ -8,7 +10,7 @@ module Tracing
module Contrib
module Sidekiq
# Common functionality used by both client-side and server-side tracers.
module Tracing
module Utils
protected

# If class is wrapping something else, the interesting resource info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
end

Sidekiq::Testing.server_middleware.clear
Sidekiq::Extensions.enable_delay! if Sidekiq::VERSION > '5.0.0'
end

it 'traces job push' do
Expand Down Expand Up @@ -68,6 +67,8 @@
pending 'Broken in Ruby 3.1.0-preview1, see https://github.com/mperham/sidekiq/issues/5064'
end

Sidekiq::Extensions.enable_delay! if Sidekiq::VERSION > '5.0.0'

stub_const(
'DelayableClass',
Class.new do
Expand Down
215 changes: 215 additions & 0 deletions spec/datadog/tracing/contrib/sidekiq/distributed_tracing_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
require 'datadog/tracing/contrib/support/spec_helper'

require 'datadog/tracing/contrib/sidekiq/integration'
require 'datadog/tracing/contrib/sidekiq/client_tracer'
require 'datadog/tracing/contrib/sidekiq/server_tracer'

require 'sidekiq/testing'
require_relative 'support/legacy_test_helpers' if Sidekiq::VERSION < '4'
require 'sidekiq/api'

RSpec.describe 'Sidekiq distributed tracing' do
around do |example|
Sidekiq::Testing.fake! do
Sidekiq::Testing.server_middleware.clear
Sidekiq::Testing.server_middleware do |chain|
chain.add(Datadog::Tracing::Contrib::Sidekiq::ServerTracer)
end

example.run
end
end

after do
Datadog.configuration.tracing[:sidekiq].reset!
Sidekiq::Queues.clear_all
end

let!(:empty_worker) do
stub_const(
'EmptyWorker',
Class.new do
include Sidekiq::Worker
def perform; end
end
)
end

context 'when distributed tracing enabled' do
before do
Datadog.configure do |c|
c.tracing.instrument :sidekiq, distributed_tracing: true
end
end

context 'when dispatching' do
it 'propagates through serialized job' do
EmptyWorker.perform_async

job = EmptyWorker.jobs.first

expect(span).to be_root_span
expect(span.service).to eq(tracer.default_service)
expect(span.resource).to eq('EmptyWorker')
expect(span.get_tag('sidekiq.job.queue')).to eq('default')
expect(span.status).to eq(0)
expect(span.get_tag('component')).to eq('sidekiq')
expect(span.get_tag('operation')).to eq('push')
expect(span.get_tag('span.kind')).to eq('producer')

expect(job['x-datadog-trace-id']).to eq(span.trace_id.to_s)
expect(job['x-datadog-parent-id']).to eq(span.id.to_s)
expect(job['x-datadog-sampling-priority']).to eq('1')
expect(job['x-datadog-tags']).to eq('_dd.p.dm=-0')
expect(job).not_to include 'x-datadog-origin'
end
end

context 'when receiving' do
let(:trace_id) { Datadog::Tracing::Utils.next_id }
let(:span_id) { Datadog::Tracing::Utils.next_id }
let(:jid) { '123abc' }

it 'continues trace from serialized job' do
Sidekiq::Queues.push(
EmptyWorker.queue,
EmptyWorker.to_s,
EmptyWorker.sidekiq_options.merge(
'args' => [],
'class' => EmptyWorker.to_s,
'jid' => jid,
'x-datadog-trace-id' => trace_id.to_s,
'x-datadog-parent-id' => span_id.to_s,
'x-datadog-sampling-priority' => '2',
'x-datadog-tags' => '_dd.p.dm=-99',
'x-datadog-origin' => 'my-origin'
)
)

EmptyWorker.perform_one

expect(span.trace_id).to eq(trace_id)
expect(span.parent_id).to eq(span_id)
expect(span.service).to eq(tracer.default_service)
expect(span.resource).to eq('EmptyWorker')
expect(span.get_tag('sidekiq.job.queue')).to eq('default')
expect(span.status).to eq(0)
expect(span.get_tag('component')).to eq('sidekiq')
expect(span.get_tag('operation')).to eq('job')
expect(span.get_tag('span.kind')).to eq('consumer')

expect(trace.send(:meta)['_dd.p.dm']).to eq('-99')
expect(trace.sampling_priority).to eq(2)
expect(trace.origin).to eq('my-origin')
end
end

context 'round trip' do
it 'creates 2 spans for a distributed trace' do
EmptyWorker.perform_async
EmptyWorker.perform_one

expect(spans).to have(2).items

job_span, push_span = spans

expect(push_span).to be_root_span
expect(push_span.get_tag('sidekiq.job.id')).to eq(job_span.get_tag('sidekiq.job.id'))

expect(job_span.trace_id).to eq(push_span.trace_id)
expect(job_span.parent_id).to eq(push_span.id)
end
end
end

context 'when distributed tracing disabled' do
before do
Datadog.configure do |c|
c.tracing.instrument :sidekiq, distributed_tracing: false
end
end

context 'when dispatching' do
it 'does not propagate through serialized job' do
EmptyWorker.perform_async

job = EmptyWorker.jobs.first

expect(span).to be_root_span
expect(span.service).to eq(tracer.default_service)
expect(span.resource).to eq('EmptyWorker')
expect(span.get_tag('sidekiq.job.queue')).to eq('default')
expect(span.status).to eq(0)
expect(span.get_tag('component')).to eq('sidekiq')
expect(span.get_tag('operation')).to eq('push')
expect(span.get_tag('span.kind')).to eq('producer')

expect(job).to_not include('x-datadog-trace-id')
expect(job).to_not include('x-datadog-parent-id')
expect(job).to_not include('x-datadog-sampling-priority')
expect(job).to_not include('x-datadog-tags')
expect(job).to_not include('x-datadog-origin')
end
end

context 'when receiving' do
let(:trace_id) { Datadog::Tracing::Utils.next_id }
let(:span_id) { Datadog::Tracing::Utils.next_id }
let(:jid) { '123abc' }

it 'does not continue trace from serialized job' do
Sidekiq::Queues.push(
EmptyWorker.queue,
EmptyWorker.to_s,
EmptyWorker.sidekiq_options.merge(
'args' => [],
'class' => EmptyWorker.to_s,
'jid' => jid,
'x-datadog-trace-id' => trace_id.to_s,
'x-datadog-parent-id' => span_id.to_s,
'x-datadog-sampling-priority' => '2',
'x-datadog-tags' => '_dd.p.dm=99',
'x-datadog-origin' => 'my-origin'
)
)

EmptyWorker.perform_one

expect(span).to be_root_span
expect(span.trace_id).not_to eq(trace_id)
expect(span.parent_id).to eq(0)
expect(span.service).to eq(tracer.default_service)
expect(span.resource).to eq('EmptyWorker')
expect(span.get_tag('sidekiq.job.queue')).to eq('default')
expect(span.status).to eq(0)
expect(span.get_tag('component')).to eq('sidekiq')
expect(span.get_tag('operation')).to eq('job')
expect(span.get_tag('span.kind')).to eq('consumer')

expect(trace.send(:meta)['_dd.p.dm']).to eq('-0')
expect(trace.sampling_priority).to eq(1)
expect(trace.origin).to be_nil
end
end

context 'round trip' do
it 'creates 2 spans with separate traces' do
EmptyWorker.perform_async
EmptyWorker.perform_one

expect(spans).to have(2).items

job_span, push_span = spans

expect(push_span.trace_id).to_not eq(job_span.trace_id)
expect(push_span.get_tag('sidekiq.job.id')).to eq(job_span.get_tag('sidekiq.job.id'))

expect(push_span).to be_root_span
expect(job_span.resource).to eq('EmptyWorker')

expect(job_span).to be_root_span
expect(job_span.resource).to eq('EmptyWorker')
end
end
end
end
Loading