Skip to content

Commit

Permalink
Merge pull request #3742 from DataDog/anmarchenko/metrics_collection
Browse files Browse the repository at this point in the history
[SDTEST-409] Add metrics management capabilities
  • Loading branch information
anmarchenko authored Jul 4, 2024
2 parents f0e28ec + 8c023fd commit ceadfb6
Show file tree
Hide file tree
Showing 9 changed files with 918 additions and 76 deletions.
74 changes: 46 additions & 28 deletions lib/datadog/core/telemetry/metric.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,40 @@ module Core
module Telemetry
# Telemetry metrics data model (internal Datadog metrics for client libraries)
module Metric
def self.metric_id(type, name, tags = [])
"#{type}::#{name}::#{tags.join(',')}"
end

# Base class for all metric types
class Base
attr_reader :name, :tags, :values, :common, :interval
attr_reader :name, :tags, :values, :common

# @param name [String] metric name
# @param tags [Array<String>|Hash{String=>String}] metric tags as hash of array of "tag:val" strings
# @param common [Boolean] true if the metric is common for all languages, false for Ruby-specific metric
# @param interval [Integer] metrics aggregation interval in seconds
def initialize(name, tags: {}, common: true, interval: nil)
def initialize(name, tags: {}, common: true)
@name = name
@values = []
@tags = tags_to_array(tags)
@common = common
@interval = interval
end

def track(value); end
def id
@id ||= "#{type}::#{name}::#{tags.join(',')}"
end

def type; end
def track(value)
raise NotImplementedError, 'method must be implemented in subclasses'
end

def type
raise NotImplementedError, 'method must be implemented in subclasses'
end

def to_h
# @type var res: Hash[Symbol, untyped]
res = {
{
metric: name,
points: values,
type: type,
tags: tags,
common: common
}
res[:interval] = interval if interval
res
end

private
Expand All @@ -51,6 +50,29 @@ def tags_to_array(tags)
end
end

# Base class for metrics that require aggregation interval
class IntervalMetric < Base
attr_reader :interval

# @param name [String] metric name
# @param tags [Array<String>|Hash{String=>String}] metric tags as hash of array of "tag:val" strings
# @param common [Boolean] true if the metric is common for all languages, false for Ruby-specific metric
# @param interval [Integer] metrics aggregation interval in seconds
def initialize(name, interval:, tags: {}, common: true)
raise ArgumentError, 'interval must be a positive number' if interval.nil? || interval <= 0

super(name, tags: tags, common: common)

@interval = interval
end

def to_h
res = super
res[:interval] = interval
res
end
end

# Count metric adds up all the submitted values in a time interval. This would be suitable for a
# metric tracking the number of website hits, for instance.
class Count < Base
Expand All @@ -60,28 +82,23 @@ def type
TYPE
end

def inc(value = 1)
track(value)
end

def dec(value = 1)
track(-value)
end

def track(value)
value = value.to_i

if values.empty?
values << [Time.now.to_i, value]
else
values[0][0] = Time.now.to_i
values[0][1] += value
end
nil
end
end

# A gauge type takes the last value reported during the interval. This type would make sense for tracking RAM or
# CPU usage, where taking the last value provides a representative picture of the host’s behavior during the time
# interval.
class Gauge < Base
class Gauge < IntervalMetric
TYPE = 'gauge'

def type
Expand All @@ -95,15 +112,16 @@ def track(value)
values[0][0] = Time.now.to_i
values[0][1] = value
end
nil
end
end

# The rate type takes the count and divides it by the length of the time interval. This is useful if you’re
# interested in the number of hits per second.
class Rate < Base
class Rate < IntervalMetric
TYPE = 'rate'

def initialize(name, tags: {}, common: true, interval: nil)
def initialize(name, interval:, tags: {}, common: true)
super

@value = 0.0
Expand All @@ -115,9 +133,8 @@ def type

def track(value = 1.0)
@value += value

rate = interval ? @value / interval : 0.0
@values = [[Time.now.to_i, rate]]
@values = [[Time.now.to_i, @value / interval]]
nil
end
end

Expand All @@ -131,6 +148,7 @@ def type

def track(value)
values << value
nil
end

# distribution metric data does not have type field
Expand Down
79 changes: 79 additions & 0 deletions lib/datadog/core/telemetry/metrics_collection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# frozen_string_literal: true

require_relative 'event'
require_relative 'metric'

module Datadog
module Core
module Telemetry
# MetricsCollection is a thread-safe collection of metrics per namespace
class MetricsCollection
attr_reader :namespace, :interval

def initialize(namespace, aggregation_interval:)
@namespace = namespace
@interval = aggregation_interval

@mutex = Mutex.new

@metrics = {}
@distributions = {}
end

def inc(metric_name, value, tags: {}, common: true)
metric = Metric::Count.new(metric_name, tags: tags, common: common)
fetch_or_add_metric(metric, value)
end

def dec(metric_name, value, tags: {}, common: true)
metric = Metric::Count.new(metric_name, tags: tags, common: common)
fetch_or_add_metric(metric, -value)
end

def gauge(metric_name, value, tags: {}, common: true)
metric = Metric::Gauge.new(metric_name, tags: tags, common: common, interval: @interval)
fetch_or_add_metric(metric, value)
end

def rate(metric_name, value, tags: {}, common: true)
metric = Metric::Rate.new(metric_name, tags: tags, common: common, interval: @interval)
fetch_or_add_metric(metric, value)
end

def distribution(metric_name, value, tags: {}, common: true)
metric = Metric::Distribution.new(metric_name, tags: tags, common: common)
fetch_or_add_distribution(metric, value)
end

def flush!(queue)
@mutex.synchronize do
queue.enqueue(Event::GenerateMetrics.new(@namespace, @metrics.values)) if @metrics.any?
queue.enqueue(Event::Distributions.new(@namespace, @distributions.values)) if @distributions.any?

@metrics = {}
@distributions = {}
end
nil
end

private

def fetch_or_add_metric(metric, value)
@mutex.synchronize do
m = (@metrics[metric.id] ||= metric)
m.track(value)
end
nil
end

def fetch_or_add_distribution(metric, value)
@mutex.synchronize do
m = (@distributions[metric.id] ||= metric)
m.track(value)
end
nil
end
end
end
end
end
83 changes: 83 additions & 0 deletions lib/datadog/core/telemetry/metrics_manager.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# frozen_string_literal: true

require_relative 'metrics_collection'

module Datadog
module Core
module Telemetry
# MetricsManager aggregates and flushes metrics and distributions
class MetricsManager
attr_reader :enabled

def initialize(aggregation_interval:, enabled:)
@interval = aggregation_interval
@enabled = enabled
@mutex = Mutex.new

@collections = {}
end

def inc(namespace, metric_name, value, tags: {}, common: true)
return unless @enabled

# collection is thread-safe internally
collection = fetch_or_create_collection(namespace)
collection.inc(metric_name, value, tags: tags, common: common)
end

def dec(namespace, metric_name, value, tags: {}, common: true)
return unless @enabled

# collection is thread-safe internally
collection = fetch_or_create_collection(namespace)
collection.dec(metric_name, value, tags: tags, common: common)
end

def gauge(namespace, metric_name, value, tags: {}, common: true)
return unless @enabled

# collection is thread-safe internally
collection = fetch_or_create_collection(namespace)
collection.gauge(metric_name, value, tags: tags, common: common)
end

def rate(namespace, metric_name, value, tags: {}, common: true)
return unless @enabled

# collection is thread-safe internally
collection = fetch_or_create_collection(namespace)
collection.rate(metric_name, value, tags: tags, common: common)
end

def distribution(namespace, metric_name, value, tags: {}, common: true)
return unless @enabled

# collection is thread-safe internally
collection = fetch_or_create_collection(namespace)
collection.distribution(metric_name, value, tags: tags, common: common)
end

def flush!(queue)
return unless @enabled

collections = @mutex.synchronize { @collections.values }
collections.each { |col| col.flush!(queue) }

nil
end

def disable!
@enabled = false
end

private

def fetch_or_create_collection(namespace)
@mutex.synchronize do
@collections[namespace] ||= MetricsCollection.new(namespace, aggregation_interval: @interval)
end
end
end
end
end
end
Loading

0 comments on commit ceadfb6

Please sign in to comment.