Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add basic metrics view #1604

Merged
merged 21 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a2ea6f7
feat: add basic metrics view support
xuan-cao-swi Feb 21, 2024
62d71d1
feat: merge with main
xuan-cao-swi Feb 21, 2024
3a9431e
feat: lint
xuan-cao-swi Feb 21, 2024
cc99bec
feat: metric view lint
xuan-cao-swi Feb 22, 2024
6ed867f
Merge branch 'main' into metrics-view
xuan-cao-swi Jul 11, 2024
de366c6
feat: metrics - use flatten for 1-d array MetricData
xuan-cao-swi Jul 11, 2024
cf05b7e
Update metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb
xuan-cao-swi Jul 18, 2024
f1d65d9
Update metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb
xuan-cao-swi Jul 18, 2024
749a820
update doc
xuan-cao-swi Jul 18, 2024
c96ce20
revision
xuan-cao-swi Jul 22, 2024
469f234
align the test case
xuan-cao-swi Jul 22, 2024
89ee8ce
Update metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb
xuan-cao-swi Jul 22, 2024
cbbf0d6
Update metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb
xuan-cao-swi Jul 22, 2024
fed6c66
Merge branch 'main' into metrics-view
xuan-cao-swi Jul 25, 2024
667b096
Merge branch 'main' into metrics-view
xuan-cao-swi Sep 10, 2024
3d7c20e
Update metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb
xuan-cao-swi Sep 10, 2024
9f52a97
Update metrics_sdk/lib/opentelemetry/sdk/metrics/view.rb
xuan-cao-swi Sep 10, 2024
5eb1734
Update metrics_sdk/lib/opentelemetry/sdk/metrics/view/registered_view.rb
xuan-cao-swi Sep 10, 2024
86ea4dd
refactor view and add test
xuan-cao-swi Sep 10, 2024
08f7c1e
revision
xuan-cao-swi Oct 4, 2024
d91db1a
Merge branch 'main' into metrics-view
mwear Oct 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion exporter/otlp-metrics/test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
OpenTelemetry.logger = Logger.new(File::NULL)

module MockSum
def collect(start_time, end_time)
def collect(start_time, end_time, data_points)
start_time = 1_699_593_427_329_946_585 # rubocop:disable Lint/ShadowedArgument
end_time = 1_699_593_427_329_946_586 # rubocop:disable Lint/ShadowedArgument
super
Expand Down
1 change: 1 addition & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ module Metrics
require 'opentelemetry/sdk/metrics/meter'
require 'opentelemetry/sdk/metrics/meter_provider'
require 'opentelemetry/sdk/metrics/state'
require 'opentelemetry/sdk/metrics/view'
2 changes: 2 additions & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ module Aggregation
require 'opentelemetry/sdk/metrics/aggregation/histogram_data_point'
require 'opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram'
require 'opentelemetry/sdk/metrics/aggregation/sum'
require 'opentelemetry/sdk/metrics/aggregation/last_value'
require 'opentelemetry/sdk/metrics/aggregation/drop'
37 changes: 37 additions & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/drop.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Aggregation
# Contains the implementation of the Drop aggregation
class Drop
attr_reader :aggregation_temporality

def initialize(aggregation_temporality: :delta)
@aggregation_temporality = aggregation_temporality
end

def collect(start_time, end_time, data_points)
data_points.values.map!(&:dup)
end

def update(increment, attributes, data_points)
data_points[attributes] = NumberDataPoint.new(
{},
0,
0,
0,
0
)
nil
end
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,24 @@ def initialize(
boundaries: DEFAULT_BOUNDARIES,
record_min_max: true
)
@data_points = {}
@aggregation_temporality = aggregation_temporality
@boundaries = boundaries && !boundaries.empty? ? boundaries.sort : nil
@record_min_max = record_min_max
end

def collect(start_time, end_time)
def collect(start_time, end_time, data_points)
if @aggregation_temporality == :delta
# Set timestamps and 'move' data point values to result.
hdps = @data_points.values.map! do |hdp|
hdps = data_points.values.map! do |hdp|
hdp.start_time_unix_nano = start_time
hdp.time_unix_nano = end_time
hdp
end
@data_points.clear
data_points.clear
hdps
else
# Update timestamps and take a snapshot.
@data_points.values.map! do |hdp|
data_points.values.map! do |hdp|
hdp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation.
hdp.time_unix_nano = end_time
hdp = hdp.dup
Expand All @@ -53,14 +52,14 @@ def collect(start_time, end_time)
end
end

def update(amount, attributes)
hdp = @data_points.fetch(attributes) do
def update(amount, attributes, data_points)
hdp = data_points.fetch(attributes) do
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end

@data_points[attributes] = HistogramDataPoint.new(
data_points[attributes] = HistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
nil, # :time_unix_nano
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Aggregation
# Contains the implementation of the LastValue aggregation
class LastValue
attr_reader :aggregation_temporality

def initialize(aggregation_temporality: :delta)
@aggregation_temporality = aggregation_temporality
end

def collect(start_time, end_time, data_points)
if @aggregation_temporality == :delta
# Set timestamps and 'move' data point values to result.
ndps = data_points.values.map! do |ndp|
ndp.start_time_unix_nano = start_time
ndp.time_unix_nano = end_time
ndp
end
data_points.clear
ndps
else
# Update timestamps and take a snapshot.
data_points.values.map! do |ndp|
ndp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation.
ndp.time_unix_nano = end_time
ndp.dup
end
end
end

def update(increment, attributes, data_points)
data_points[attributes] = NumberDataPoint.new(
attributes,
nil,
nil,
increment,
nil
)
nil
end
end
end
end
end
end
13 changes: 6 additions & 7 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,30 @@ class Sum
def initialize(aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta))
# TODO: the default should be :cumulative, see issue #1555
@aggregation_temporality = aggregation_temporality
@data_points = {}
end

def collect(start_time, end_time)
def collect(start_time, end_time, data_points)
if @aggregation_temporality == :delta
# Set timestamps and 'move' data point values to result.
ndps = @data_points.values.map! do |ndp|
ndps = data_points.values.map! do |ndp|
ndp.start_time_unix_nano = start_time
ndp.time_unix_nano = end_time
ndp
end
@data_points.clear
data_points.clear
ndps
else
# Update timestamps and take a snapshot.
@data_points.values.map! do |ndp|
data_points.values.map! do |ndp|
ndp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation.
ndp.time_unix_nano = end_time
ndp.dup
end
end
end

def update(increment, attributes)
ndp = @data_points[attributes] || @data_points[attributes] = NumberDataPoint.new(
def update(increment, attributes, data_points)
ndp = data_points[attributes] || data_points[attributes] = NumberDataPoint.new(
attributes,
nil,
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def pull

def export(metrics)
@mutex.synchronize do
@metric_snapshots << metrics
metrics.instance_of?(Array) ? @metric_snapshots.concat(metrics) : @metric_snapshots << metrics
mwear marked this conversation as resolved.
Show resolved Hide resolved
end
SUCCESS
end
Expand Down
14 changes: 7 additions & 7 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ class MeterProvider < OpenTelemetry::Metrics::MeterProvider
Key = Struct.new(:name, :version)
private_constant(:Key)

attr_reader :resource, :metric_readers
attr_reader :resource, :metric_readers, :registered_views

def initialize(resource: OpenTelemetry::SDK::Resources::Resource.create)
@mutex = Mutex.new
@meter_registry = {}
@stopped = false
@metric_readers = []
@resource = resource
@registered_views = []
end

# Returns a {Meter} instance.
Expand Down Expand Up @@ -125,13 +126,12 @@ def register_synchronous_instrument(instrument)
end
end

# The type of the Instrument(s) (optional).
# The name of the Instrument(s). OpenTelemetry SDK authors MAY choose to support wildcard characters, with the question mark (?) matching exactly one character and the asterisk character (*) matching zero or more characters.
# The name of the Meter (optional).
# The version of the Meter (optional).
# The schema_url of the Meter (optional).
def add_view
# TODO: For each meter add this view to all applicable instruments
# The options of the Instrument(s). Useful key include: aggregation, type, unit, meter_name, meter_version, attribute_keys
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved
#
# TODO: add schema_url as part of options
def add_view(name, **options)
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved
@registered_views << View::RegisteredView.new(name, **options)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def collect
@epoch_end_time = now_in_nano
snapshot = @metric_streams.map { |ms| ms.collect(@epoch_start_time, @epoch_end_time) }
@epoch_start_time = @epoch_end_time
snapshot
snapshot.flatten!
mwear marked this conversation as resolved.
Show resolved Hide resolved
end
end

Expand Down
62 changes: 46 additions & 16 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,62 @@ def initialize(
@description = description
@unit = unit
@instrument_kind = instrument_kind
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved
@meter_provider = meter_provider
@meter_provider = meter_provider
xuan-cao-swi marked this conversation as resolved.
Show resolved Hide resolved
@instrumentation_scope = instrumentation_scope
@aggregation = aggregation
@default_aggregation = aggregation
@data_points = {}

@mutex = Mutex.new
end

def collect(start_time, end_time)
@mutex.synchronize do
MetricData.new(
@name,
@description,
@unit,
@instrument_kind,
@meter_provider.resource,
@instrumentation_scope,
@aggregation.collect(start_time, end_time),
@aggregation.aggregation_temporality,
start_time,
end_time
)
metric_data = []
mwear marked this conversation as resolved.
Show resolved Hide resolved
registered_views = find_registered_view

if registered_views.empty?
metric_data << aggregate_metric_data(start_time, end_time)
else
registered_views.each { |view| metric_data << aggregate_metric_data(start_time, end_time, aggregation: view.aggregation) }
end

metric_data
end

def update(value, attributes)
@mutex.synchronize { @aggregation.update(value, attributes) }
registered_views = find_registered_view
if registered_views.empty?
@mutex.synchronize { @default_aggregation.update(value, attributes, @data_points) }
else
registered_views.each do |view|
@mutex.synchronize do
attributes ||= {}
attributes.merge!(view.attribute_keys)
view.aggregation.update(value, attributes, @data_points)
end
end
end
end

def aggregate_metric_data(start_time, end_time, aggregation: nil)
aggregator = aggregation || @default_aggregation
MetricData.new(
@name,
@description,
@unit,
@instrument_kind,
@meter_provider.resource,
@instrumentation_scope,
aggregator.collect(start_time, end_time, @data_points),
aggregator.aggregation_temporality,
start_time,
end_time
)
end

def find_registered_view
registered_views = []
@meter_provider.registered_views.each { |view| registered_views << view if view.match_instrument(self) }
registered_views
end

def to_s
Expand Down
17 changes: 17 additions & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/view.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
# View
xuan-cao-swi marked this conversation as resolved.
Show resolved Hide resolved
module View
end
end
end
end

require 'opentelemetry/sdk/metrics/view/registered_view'
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module View
# RegisteredView
xuan-cao-swi marked this conversation as resolved.
Show resolved Hide resolved
class RegisteredView
attr_reader :name, :aggregation, :attribute_keys

def initialize(name, **options)
@name = name
@options = options
@aggregation = options[:aggregation]
@attribute_keys = options[:attribute_keys] || {}
end

def match_instrument(metric_stream)
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved
return false if @aggregation.nil?
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved
return false if @name && @name != metric_stream.name
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved
return false if @options[:type] && @options[:type] != metric_stream.instrument_kind
return false if @options[:unit] && @options[:unit] != metric_stream.unit
return false if @options[:meter_name] && @options[:meter_name] != metric_stream.instrumentation_scope.name
return false if @options[:meter_version] && @options[:meter_version] != metric_stream.instrumentation_scope.version

true
end
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
counter.add(4, attributes: { 'd' => 'e' })

metric_exporter.pull
last_snapshot = metric_exporter.metric_snapshots.last
last_snapshot = metric_exporter.metric_snapshots

_(last_snapshot).wont_be_empty
_(last_snapshot[0].name).must_equal('counter')
Expand Down
Loading
Loading