Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed tracing for Sidekiq #2513

Merged
merged 15 commits into from
Apr 13, 2023
Merged
2 changes: 1 addition & 1 deletion Appraisals
Original file line number Diff line number Diff line change
Expand Up @@ -1650,7 +1650,7 @@ elsif ruby_version?('3.2')
gem 'semantic_logger', '~> 4.0'
gem 'sequel'
gem 'shoryuken'
gem 'sidekiq'
gem 'sidekiq', '~> 6' # TODO: Support sidekiq 7.x
gem 'sneakers', '>= 2.12.0'
gem 'sqlite3', '>= 1.4.2'
gem 'sucker_punch'
Expand Down
9 changes: 8 additions & 1 deletion lib/datadog/tracing/contrib/sidekiq/client_tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require_relative '../../metadata/ext'
require_relative '../analytics'
require_relative 'distributed/propagation'
require_relative 'ext'
require_relative 'tracing'

Expand All @@ -22,7 +23,9 @@ def initialize(options = {})
def call(worker_class, job, queue, redis_pool)
resource = job_resource(job)

Datadog::Tracing.trace(Ext::SPAN_PUSH, service: @sidekiq_service) do |span|
Datadog::Tracing.trace(Ext::SPAN_PUSH, service: @sidekiq_service) do |span, trace_op|
propagation.inject!(trace_op, job) if configuration[:distributed_tracing]
marcotc marked this conversation as resolved.
Show resolved Hide resolved

span.resource = resource

span.set_tag(Datadog::Tracing::Metadata::Ext::TAG_COMPONENT, Ext::TAG_COMPONENT)
Expand Down Expand Up @@ -50,6 +53,10 @@ def call(worker_class, job, queue, redis_pool)
def configuration
Datadog.configuration.tracing[:sidekiq]
end

def propagation
Sidekiq::Distributed::Propagation::INSTANCE
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Settings < Contrib::Configuration::Settings
option :client_service_name
option :error_handler, default: Tracing::SpanOperation::Events::DEFAULT_ON_ERROR
option :quantize, default: {}
option :distributed_tracing, default: false
end
end
end
Expand Down
17 changes: 17 additions & 0 deletions lib/datadog/tracing/contrib/sidekiq/distributed/fetcher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: false
# typed: false

require_relative '../../../distributed/fetcher'

module Datadog
module Tracing
module Contrib
module Sidekiq
module Distributed
class Fetcher < Tracing::Distributed::Fetcher
end
end
end
end
end
end
40 changes: 40 additions & 0 deletions lib/datadog/tracing/contrib/sidekiq/distributed/propagation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# frozen_string_literal: true
# typed: true

require_relative 'fetcher'
require_relative '../../../distributed/propagation'
require_relative '../../../distributed/b3_multi'
require_relative '../../../distributed/b3_single'
require_relative '../../../distributed/datadog'
require_relative '../../../distributed/none'
require_relative '../../../distributed/trace_context'

module Datadog
module Tracing
module Contrib
module Sidekiq
module Distributed
# Extracts and injects propagation through HTTP headers.
class Propagation < Tracing::Distributed::Propagation
def initialize
super(
propagation_styles: {
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_MULTI_HEADER =>
Tracing::Distributed::B3Multi.new(fetcher: Fetcher),
TonyCTHsu marked this conversation as resolved.
Show resolved Hide resolved
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_SINGLE_HEADER =>
Tracing::Distributed::B3Single.new(fetcher: Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_DATADOG =>
Tracing::Distributed::Datadog.new(fetcher: Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_TRACE_CONTEXT =>
Tracing::Distributed::TraceContext.new(fetcher: Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_NONE => Tracing::Distributed::None.new
})
end

INSTANCE = Propagation.new
TonyCTHsu marked this conversation as resolved.
Show resolved Hide resolved
end
end
end
end
end
end
14 changes: 14 additions & 0 deletions lib/datadog/tracing/contrib/sidekiq/server_tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require_relative 'ext'
require_relative 'tracing'
require_relative '../utils/quantization/hash'
require_relative 'distributed/propagation'

module Datadog
module Tracing
Expand All @@ -21,9 +22,16 @@ def initialize(options = {})
@error_handler = options[:error_handler] || configuration[:error_handler]
end

# rubocop:disable Metrics/MethodLength
# rubocop:disable Metrics/AbcSize
def call(worker, job, queue)
resource = job_resource(job)

if configuration[:distributed_tracing]
trace_digest = propagation.extract(job)
Datadog::Tracing.continue_trace!(trace_digest)
end

service = worker_config(resource, :service_name) || @sidekiq_service
# DEV-2.0: Remove `tag_args`, as `quantize` can fulfill the same contract
tag_args = worker_config(resource, :tag_args) || configuration[:tag_args]
Expand Down Expand Up @@ -68,10 +76,16 @@ def call(worker, job, queue)

yield
end
# rubocop:enable Metrics/MethodLength
# rubocop:enable Metrics/AbcSize
end

private

def propagation
Sidekiq::Distributed::Propagation::INSTANCE
TonyCTHsu marked this conversation as resolved.
Show resolved Hide resolved
end

def quantize_args(quantize, args)
quantize_options = quantize && quantize[:args]
quantize_options ||= {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
end

Sidekiq::Testing.server_middleware.clear
Sidekiq::Extensions.enable_delay! if Sidekiq::VERSION > '5.0.0'
end

it 'traces job push' do
Expand Down Expand Up @@ -68,6 +67,8 @@
pending 'Broken in Ruby 3.1.0-preview1, see https://github.com/mperham/sidekiq/issues/5064'
end

Sidekiq::Extensions.enable_delay! if Sidekiq::VERSION > '5.0.0'

stub_const(
'DelayableClass',
Class.new do
Expand Down
119 changes: 119 additions & 0 deletions spec/datadog/tracing/contrib/sidekiq/distributed_tracing_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# typed: ignore
TonyCTHsu marked this conversation as resolved.
Show resolved Hide resolved

require 'datadog/tracing/contrib/support/spec_helper'

require 'datadog/tracing/contrib/sidekiq/integration'
require 'datadog/tracing/contrib/sidekiq/client_tracer'
require 'datadog/tracing/contrib/sidekiq/server_tracer'

require 'sidekiq/testing'
require 'sidekiq/api'

RSpec.describe 'Sidekiq distributed tracing' do
around do |example|
Sidekiq::Testing.fake! do
Sidekiq::Testing.server_middleware.clear
Sidekiq::Testing.server_middleware do |chain|
chain.add(Datadog::Tracing::Contrib::Sidekiq::ServerTracer)
end

example.run
end
end

before do
Datadog.configure do |c|
c.tracing.instrument :sidekiq, distributed_tracing: true
end
end

after do
Datadog.configuration.tracing[:sidekiq].reset!
Sidekiq::Queues.clear_all
end

let!(:empty_worker) do
stub_const(
'EmptyWorker',
Class.new do
include Sidekiq::Worker
def perform; end
end
)
end

context 'when dispatching' do
it do
EmptyWorker.perform_async

job = EmptyWorker.jobs.first

expect(span).to be_root_span
expect(span.service).to eq(tracer.default_service)
expect(span.resource).to eq('EmptyWorker')
expect(span.get_tag('sidekiq.job.queue')).to eq('default')
expect(span.status).to eq(0)
expect(span.get_tag('component')).to eq('sidekiq')
expect(span.get_tag('operation')).to eq('push')
expect(span.get_tag('span.kind')).to eq('producer')

expect(job['x-datadog-trace-id']).to eq(span.trace_id.to_s)
expect(job['x-datadog-parent-id']).to eq(span.id.to_s)
expect(job['x-datadog-sampling-priority']).to eq('1')
# expect(job["x-datadog-tags"]).to eq("_dd.p.dm=-0")
end
end

context 'when receiving' do
let(:trace_id) { Datadog::Tracing::Utils.next_id }
let(:span_id) { Datadog::Tracing::Utils.next_id }
let(:jid) { '123abc' }

it do
Sidekiq::Queues.push(
EmptyWorker.queue,
EmptyWorker.to_s,
EmptyWorker.sidekiq_options.merge(
'args' => [],
'class' => EmptyWorker.to_s,
'jid' => jid,
'x-datadog-trace-id' => trace_id.to_s,
'x-datadog-parent-id' => span_id.to_s,
'x-datadog-sampling-priority' => '1',
'x-datadog-tags' => '_dd.p.dm=-0',
)
)

EmptyWorker.perform_one

expect(span.trace_id).to eq(trace_id)
expect(span.parent_id).to eq(span_id)
expect(span.service).to eq(tracer.default_service)
expect(span.resource).to eq('EmptyWorker')
expect(span.get_tag('sidekiq.job.queue')).to eq('default')
expect(span.status).to eq(0)
expect(span.get_tag('component')).to eq('sidekiq')
expect(span.get_tag('operation')).to eq('job')
expect(span.get_tag('span.kind')).to eq('consumer')

# "x-datadog-sampling-priority" => "1",
# "x-datadog-tags" => "_dd.p.dm=-0",
end
end

context 'round trip' do
it do
EmptyWorker.perform_async
EmptyWorker.perform_one

expect(spans).to have(2).items

job_span, push_span = spans

expect(push_span).to be_root_span

expect(job_span.trace_id).to eq(push_span.trace_id)
expect(job_span.parent_id).to eq(push_span.id)
end
end
end
5 changes: 3 additions & 2 deletions spec/datadog/tracing/contrib/sidekiq/server_tracer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
Sidekiq::Testing.server_middleware do |chain|
chain.add(Datadog::Tracing::Contrib::Sidekiq::ServerTracer)
end

Sidekiq::Extensions.enable_delay! if Sidekiq::VERSION > '5.0.0'
end

it 'traces async job run' do
Expand All @@ -26,6 +24,7 @@
expect(spans).to have(2).items

span, _push = spans

expect(span.service).to eq(tracer.default_service)
expect(span.resource).to eq('EmptyWorker')
expect(span.get_tag('sidekiq.job.queue')).to eq('default')
Expand Down Expand Up @@ -184,6 +183,8 @@ def perform(id) end
pending 'Broken in Ruby 3.1.0-preview1, see https://github.com/mperham/sidekiq/issues/5064'
end

Sidekiq::Extensions.enable_delay! if Sidekiq::VERSION > '5.0.0'

stub_const(
'DelayableClass',
Class.new do
Expand Down