diff --git a/exporter/otlp-metrics/test/test_helper.rb b/exporter/otlp-metrics/test/test_helper.rb index 461b180653..9fc4466458 100644 --- a/exporter/otlp-metrics/test/test_helper.rb +++ b/exporter/otlp-metrics/test/test_helper.rb @@ -17,7 +17,7 @@ OpenTelemetry.logger = Logger.new(File::NULL) module MockSum - def collect(start_time, end_time) + def collect(start_time, end_time, data_points) start_time = 1_699_593_427_329_946_585 # rubocop:disable Lint/ShadowedArgument end_time = 1_699_593_427_329_946_586 # rubocop:disable Lint/ShadowedArgument super diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics.rb index 42807c708b..35593185f9 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics.rb @@ -20,3 +20,4 @@ module Metrics require 'opentelemetry/sdk/metrics/meter' require 'opentelemetry/sdk/metrics/meter_provider' require 'opentelemetry/sdk/metrics/state' +require 'opentelemetry/sdk/metrics/view' diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation.rb index f36aef0152..62f820eb93 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation.rb @@ -19,3 +19,5 @@ module Aggregation require 'opentelemetry/sdk/metrics/aggregation/histogram_data_point' require 'opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram' require 'opentelemetry/sdk/metrics/aggregation/sum' +require 'opentelemetry/sdk/metrics/aggregation/last_value' +require 'opentelemetry/sdk/metrics/aggregation/drop' diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/drop.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/drop.rb new file mode 100644 index 0000000000..f638c649a5 --- /dev/null +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/drop.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module SDK + module Metrics + module Aggregation + # Contains the implementation of the Drop aggregation + class Drop + attr_reader :aggregation_temporality + + def initialize(aggregation_temporality: :delta) + @aggregation_temporality = aggregation_temporality + end + + def collect(start_time, end_time, data_points) + data_points.values.map!(&:dup) + end + + def update(increment, attributes, data_points) + data_points[attributes] = NumberDataPoint.new( + {}, + 0, + 0, + 0, + 0 + ) + nil + end + end + end + end + end +end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb index 5af469b61d..53ab4ae12a 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb @@ -25,25 +25,24 @@ def initialize( boundaries: DEFAULT_BOUNDARIES, record_min_max: true ) - @data_points = {} @aggregation_temporality = aggregation_temporality @boundaries = boundaries && !boundaries.empty? ? boundaries.sort : nil @record_min_max = record_min_max end - def collect(start_time, end_time) + def collect(start_time, end_time, data_points) if @aggregation_temporality == :delta # Set timestamps and 'move' data point values to result. - hdps = @data_points.values.map! do |hdp| + hdps = data_points.values.map! do |hdp| hdp.start_time_unix_nano = start_time hdp.time_unix_nano = end_time hdp end - @data_points.clear + data_points.clear hdps else # Update timestamps and take a snapshot. - @data_points.values.map! do |hdp| + data_points.values.map! do |hdp| hdp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation. hdp.time_unix_nano = end_time hdp = hdp.dup @@ -53,14 +52,14 @@ def collect(start_time, end_time) end end - def update(amount, attributes) - hdp = @data_points.fetch(attributes) do + def update(amount, attributes, data_points) + hdp = data_points.fetch(attributes) do if @record_min_max min = Float::INFINITY max = -Float::INFINITY end - @data_points[attributes] = HistogramDataPoint.new( + data_points[attributes] = HistogramDataPoint.new( attributes, nil, # :start_time_unix_nano nil, # :time_unix_nano diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/last_value.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/last_value.rb new file mode 100644 index 0000000000..b2cffb74e2 --- /dev/null +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/last_value.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module SDK + module Metrics + module Aggregation + # Contains the implementation of the LastValue aggregation + class LastValue + attr_reader :aggregation_temporality + + def initialize(aggregation_temporality: :delta) + @aggregation_temporality = aggregation_temporality + end + + def collect(start_time, end_time, data_points) + if @aggregation_temporality == :delta + # Set timestamps and 'move' data point values to result. + ndps = data_points.values.map! do |ndp| + ndp.start_time_unix_nano = start_time + ndp.time_unix_nano = end_time + ndp + end + data_points.clear + ndps + else + # Update timestamps and take a snapshot. + data_points.values.map! do |ndp| + ndp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation. + ndp.time_unix_nano = end_time + ndp.dup + end + end + end + + def update(increment, attributes, data_points) + data_points[attributes] = NumberDataPoint.new( + attributes, + nil, + nil, + increment, + nil + ) + nil + end + end + end + end + end +end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb index 16cbccc430..f9e98d3e9f 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/sum.rb @@ -16,22 +16,21 @@ class Sum def initialize(aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta)) # TODO: the default should be :cumulative, see issue #1555 @aggregation_temporality = aggregation_temporality - @data_points = {} end - def collect(start_time, end_time) + def collect(start_time, end_time, data_points) if @aggregation_temporality == :delta # Set timestamps and 'move' data point values to result. - ndps = @data_points.values.map! do |ndp| + ndps = data_points.values.map! do |ndp| ndp.start_time_unix_nano = start_time ndp.time_unix_nano = end_time ndp end - @data_points.clear + data_points.clear ndps else # Update timestamps and take a snapshot. - @data_points.values.map! do |ndp| + data_points.values.map! do |ndp| ndp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation. ndp.time_unix_nano = end_time ndp.dup @@ -39,8 +38,8 @@ def collect(start_time, end_time) end end - def update(increment, attributes) - ndp = @data_points[attributes] || @data_points[attributes] = NumberDataPoint.new( + def update(increment, attributes, data_points) + ndp = data_points[attributes] || data_points[attributes] = NumberDataPoint.new( attributes, nil, nil, diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb index 8291ed9fe7..d0d8ccb902 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/in_memory_metric_pull_exporter.rb @@ -25,7 +25,7 @@ def pull def export(metrics, timeout: nil) @mutex.synchronize do - @metric_snapshots << metrics + @metric_snapshots.concat(Array(metrics)) end SUCCESS end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb index 205ff5db0d..1f3f867052 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb @@ -14,7 +14,7 @@ class MeterProvider < OpenTelemetry::Metrics::MeterProvider Key = Struct.new(:name, :version) private_constant(:Key) - attr_reader :resource, :metric_readers + attr_reader :resource, :metric_readers, :registered_views def initialize(resource: OpenTelemetry::SDK::Resources::Resource.create) @mutex = Mutex.new @@ -22,6 +22,7 @@ def initialize(resource: OpenTelemetry::SDK::Resources::Resource.create) @stopped = false @metric_readers = [] @resource = resource + @registered_views = [] end # Returns a {Meter} instance. @@ -125,13 +126,30 @@ def register_synchronous_instrument(instrument) end end - # The type of the Instrument(s) (optional). - # The name of the Instrument(s). OpenTelemetry SDK authors MAY choose to support wildcard characters, with the question mark (?) matching exactly one character and the asterisk character (*) matching zero or more characters. - # The name of the Meter (optional). - # The version of the Meter (optional). - # The schema_url of the Meter (optional). - def add_view - # TODO: For each meter add this view to all applicable instruments + # A View provides SDK users with the flexibility to customize the metrics that are output by the SDK. + # + # Example: + # + # OpenTelemetry.meter_provider.add_view('test', :aggregation => Aggregation::Drop.new, + # :type => :counter, :unit => 'smidgen', + # :meter_name => 'test', :meter_version => '1.0') + # + # + # @param [String] name Name of the view. + # @param [optional Hash] options For more precise matching, {View} and {MetricsStream} + # options may include: + # aggregation: An instance of an aggregation class, e.g. {ExplicitBucketHistogram}, {Sum}, {LastValue} + # type: A Symbol representing the instrument kind, e.g. :observable_gauge, :counter + # unit: A String matching an instrumentation unit, e.g. 'smidgen' + # meter_name: A String matching a meter name, e.g. meter_provider.meter('sample_meter_name', version: '1.2.0'), would be 'sample_meter_name' + # meter_version: A String matching a meter version, e.g. meter_provider.meter('sample_meter_name', version: '1.2.0'), would be '1.2.0' + # + # @return [nil] returns nil + # + def add_view(name, **options) + # TODO: add schema_url as part of options + @registered_views << View::RegisteredView.new(name, **options) + nil end end end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_store.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_store.rb index de2ecb83b6..b89eb09161 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_store.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_store.rb @@ -23,7 +23,8 @@ def initialize def collect @mutex.synchronize do @epoch_end_time = now_in_nano - snapshot = @metric_streams.map { |ms| ms.collect(@epoch_start_time, @epoch_end_time) } + # snapshot = @metric_streams.map { |ms| ms.collect(@epoch_start_time, @epoch_end_time) } + snapshot = @metric_streams.flat_map { |ms| ms.collect(@epoch_start_time, @epoch_end_time) } @epoch_start_time = @epoch_end_time snapshot end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb index 7f98115dce..05033f522c 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb @@ -30,30 +30,61 @@ def initialize( @instrument_kind = instrument_kind @meter_provider = meter_provider @instrumentation_scope = instrumentation_scope - @aggregation = aggregation + @default_aggregation = aggregation + @data_points = {} + @registered_views = [] + find_registered_view @mutex = Mutex.new end def collect(start_time, end_time) @mutex.synchronize do - MetricData.new( - @name, - @description, - @unit, - @instrument_kind, - @meter_provider.resource, - @instrumentation_scope, - @aggregation.collect(start_time, end_time), - @aggregation.aggregation_temporality, - start_time, - end_time - ) + metric_data = [] + if @registered_views.empty? + metric_data << aggregate_metric_data(start_time, end_time) + else + @registered_views.each { |view| metric_data << aggregate_metric_data(start_time, end_time, aggregation: view.aggregation) } + end + + metric_data end end def update(value, attributes) - @mutex.synchronize { @aggregation.update(value, attributes) } + if @registered_views.empty? + @mutex.synchronize { @default_aggregation.update(value, attributes, @data_points) } + else + @registered_views.each do |view| + @mutex.synchronize do + attributes ||= {} + attributes.merge!(view.attribute_keys) + view.aggregation.update(value, attributes, @data_points) if view.valid_aggregation? + end + end + end + end + + def aggregate_metric_data(start_time, end_time, aggregation: nil) + aggregator = aggregation || @default_aggregation + MetricData.new( + @name, + @description, + @unit, + @instrument_kind, + @meter_provider.resource, + @instrumentation_scope, + aggregator.collect(start_time, end_time, @data_points), + aggregator.aggregation_temporality, + start_time, + end_time + ) + end + + def find_registered_view + return if @meter_provider.nil? + + @meter_provider.registered_views.each { |view| @registered_views << view if view.match_instrument?(self) } end def to_s diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/view.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/view.rb new file mode 100644 index 0000000000..9606f383b1 --- /dev/null +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/view.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module SDK + module Metrics + # A View provides SDK users with the flexibility to customize the metrics that are output by the SDK. + module View + end + end + end +end + +require 'opentelemetry/sdk/metrics/view/registered_view' diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/view/registered_view.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/view/registered_view.rb new file mode 100644 index 0000000000..881f0f261e --- /dev/null +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/view/registered_view.rb @@ -0,0 +1,58 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module SDK + module Metrics + module View + # RegisteredView is an internal class used to match Views with a given {MetricStream} + class RegisteredView + attr_reader :name, :aggregation, :attribute_keys, :regex + + def initialize(name, **options) + @name = name + @options = options + @aggregation = options[:aggregation] + @attribute_keys = options[:attribute_keys] || {} + + generate_regex_pattern(name) + end + + def match_instrument?(metric_stream) + return false if @name && !name_match(metric_stream.name) + return false if @options[:type] && @options[:type] != metric_stream.instrument_kind + return false if @options[:unit] && @options[:unit] != metric_stream.unit + return false if @options[:meter_name] && @options[:meter_name] != metric_stream.instrumentation_scope.name + return false if @options[:meter_version] && @options[:meter_version] != metric_stream.instrumentation_scope.version + + true + end + + def name_match(stream_name) + !!@regex&.match(stream_name) + end + + def valid_aggregation? + @aggregation.class.name.rpartition('::')[0] == 'OpenTelemetry::SDK::Metrics::Aggregation' + end + + private + + def generate_regex_pattern(view_name) + regex_pattern = Regexp.escape(view_name) + + regex_pattern.gsub!('\*', '.*') + regex_pattern.gsub!('\?', '.') + + @regex = Regexp.new("^#{regex_pattern}$") + rescue StandardError + @regex = nil + end + end + end + end + end +end diff --git a/metrics_sdk/test/integration/in_memory_metric_pull_exporter_test.rb b/metrics_sdk/test/integration/in_memory_metric_pull_exporter_test.rb index bed4bdf935..3e060e79dc 100644 --- a/metrics_sdk/test/integration/in_memory_metric_pull_exporter_test.rb +++ b/metrics_sdk/test/integration/in_memory_metric_pull_exporter_test.rb @@ -26,7 +26,7 @@ counter.add(4, attributes: { 'd' => 'e' }) metric_exporter.pull - last_snapshot = metric_exporter.metric_snapshots.last + last_snapshot = metric_exporter.metric_snapshots _(last_snapshot).wont_be_empty _(last_snapshot[0].name).must_equal('counter') diff --git a/metrics_sdk/test/integration/periodic_metric_reader_test.rb b/metrics_sdk/test/integration/periodic_metric_reader_test.rb index 7bf00a08fc..5abdfbbf8d 100644 --- a/metrics_sdk/test/integration/periodic_metric_reader_test.rb +++ b/metrics_sdk/test/integration/periodic_metric_reader_test.rb @@ -34,7 +34,7 @@ _(snapshot.size).must_equal(2) - first_snapshot = snapshot[0] + first_snapshot = snapshot _(first_snapshot[0].name).must_equal('counter') _(first_snapshot[0].unit).must_equal('smidgen') _(first_snapshot[0].description).must_equal('a small amount of something') diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/drop_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/drop_test.rb new file mode 100644 index 0000000000..8c873107d9 --- /dev/null +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/drop_test.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +describe OpenTelemetry::SDK::Metrics::Aggregation::LastValue do + let(:data_points) { {} } + let(:drop_aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Drop.new(aggregation_temporality: aggregation_temporality) } + let(:aggregation_temporality) { :delta } + + # Time in nano + let(:start_time) { (Time.now.to_r * 1_000_000_000).to_i } + let(:end_time) { ((Time.now + 60).to_r * 1_000_000_000).to_i } + + it 'sets the timestamps' do + drop_aggregation.update(0, {}, data_points) + ndp = drop_aggregation.collect(start_time, end_time, data_points)[0] + _(ndp.start_time_unix_nano).must_equal(0) + _(ndp.time_unix_nano).must_equal(0) + end + + it 'aggregates and collects should collect no value for all collection' do + drop_aggregation.update(1, {}, data_points) + drop_aggregation.update(2, {}, data_points) + + drop_aggregation.update(2, { 'foo' => 'bar' }, data_points) + drop_aggregation.update(2, { 'foo' => 'bar' }, data_points) + + ndps = drop_aggregation.collect(start_time, end_time, data_points) + + _(ndps.size).must_equal(2) + _(ndps[0].value).must_equal(0) + _(ndps[0].attributes).must_equal({}) + + _(ndps[1].value).must_equal(0) + _(ndps[1].attributes).must_equal({}) + end +end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram_test.rb index 06c0e7b799..52ae019269 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram_test.rb @@ -7,6 +7,7 @@ require 'test_helper' describe OpenTelemetry::SDK::Metrics::Aggregation::ExplicitBucketHistogram do + let(:data_points) { {} } let(:ebh) do OpenTelemetry::SDK::Metrics::Aggregation::ExplicitBucketHistogram.new( aggregation_temporality: aggregation_temporality, @@ -23,19 +24,19 @@ describe '#collect' do it 'returns all the data points' do - ebh.update(0, {}) - ebh.update(1, {}) - ebh.update(5, {}) - ebh.update(6, {}) - ebh.update(10, {}) - - ebh.update(-10, 'foo' => 'bar') - ebh.update(1, 'foo' => 'bar') - ebh.update(22, 'foo' => 'bar') - ebh.update(55, 'foo' => 'bar') - ebh.update(80, 'foo' => 'bar') - - hdps = ebh.collect(start_time, end_time) + ebh.update(0, {}, data_points) + ebh.update(1, {}, data_points) + ebh.update(5, {}, data_points) + ebh.update(6, {}, data_points) + ebh.update(10, {}, data_points) + + ebh.update(-10, { 'foo' => 'bar' }, data_points) + ebh.update(1, { 'foo' => 'bar' }, data_points) + ebh.update(22, { 'foo' => 'bar' }, data_points) + ebh.update(55, { 'foo' => 'bar' }, data_points) + ebh.update(80, { 'foo' => 'bar' }, data_points) + + hdps = ebh.collect(start_time, end_time, data_points) _(hdps.size).must_equal(2) _(hdps[0].attributes).must_equal({}) _(hdps[0].count).must_equal(5) @@ -55,34 +56,34 @@ end it 'sets the timestamps' do - ebh.update(0, {}) - hdp = ebh.collect(start_time, end_time)[0] + ebh.update(0, {}, data_points) + hdp = ebh.collect(start_time, end_time, data_points)[0] _(hdp.start_time_unix_nano).must_equal(start_time) _(hdp.time_unix_nano).must_equal(end_time) end it 'calculates the count' do - ebh.update(0, {}) - ebh.update(0, {}) - ebh.update(0, {}) - ebh.update(0, {}) - hdp = ebh.collect(start_time, end_time)[0] + ebh.update(0, {}, data_points) + ebh.update(0, {}, data_points) + ebh.update(0, {}, data_points) + ebh.update(0, {}, data_points) + hdp = ebh.collect(start_time, end_time, data_points)[0] _(hdp.count).must_equal(4) end it 'does not aggregate between collects with default delta aggregation' do - ebh.update(0, {}) - ebh.update(1, {}) - ebh.update(5, {}) - ebh.update(6, {}) - ebh.update(10, {}) - hdps = ebh.collect(start_time, end_time) - - ebh.update(0, {}) - ebh.update(1, {}) - ebh.update(5, {}) - ebh.update(6, {}) - ebh.update(10, {}) + ebh.update(0, {}, data_points) + ebh.update(1, {}, data_points) + ebh.update(5, {}, data_points) + ebh.update(6, {}, data_points) + ebh.update(10, {}, data_points) + hdps = ebh.collect(start_time, end_time, data_points) + + ebh.update(0, {}, data_points) + ebh.update(1, {}, data_points) + ebh.update(5, {}, data_points) + ebh.update(6, {}, data_points) + ebh.update(10, {}, data_points) # Assert that the recent update does not # impact the already collected metrics _(hdps[0].count).must_equal(5) @@ -91,7 +92,7 @@ _(hdps[0].max).must_equal(10) _(hdps[0].bucket_counts).must_equal([1, 2, 2, 0, 0, 0, 0, 0, 0, 0, 0]) - hdps = ebh.collect(start_time, end_time) + hdps = ebh.collect(start_time, end_time, data_points) # Assert that we are not accumulating values # between calls to collect _(hdps[0].count).must_equal(5) @@ -105,18 +106,18 @@ let(:aggregation_temporality) { :not_delta } it 'allows metrics to accumulate' do - ebh.update(0, {}) - ebh.update(1, {}) - ebh.update(5, {}) - ebh.update(6, {}) - ebh.update(10, {}) - hdps = ebh.collect(start_time, end_time) - - ebh.update(0, {}) - ebh.update(1, {}) - ebh.update(5, {}) - ebh.update(6, {}) - ebh.update(10, {}) + ebh.update(0, {}, data_points) + ebh.update(1, {}, data_points) + ebh.update(5, {}, data_points) + ebh.update(6, {}, data_points) + ebh.update(10, {}, data_points) + hdps = ebh.collect(start_time, end_time, data_points) + + ebh.update(0, {}, data_points) + ebh.update(1, {}, data_points) + ebh.update(5, {}, data_points) + ebh.update(6, {}, data_points) + ebh.update(10, {}, data_points) # Assert that the recent update does not # impact the already collected metrics _(hdps[0].count).must_equal(5) @@ -125,7 +126,7 @@ _(hdps[0].max).must_equal(10) _(hdps[0].bucket_counts).must_equal([1, 2, 2, 0, 0, 0, 0, 0, 0, 0, 0]) - hdps1 = ebh.collect(start_time, end_time) + hdps1 = ebh.collect(start_time, end_time, data_points) # Assert that we are accumulating values # and not just capturing the delta since # the previous collect call @@ -148,38 +149,38 @@ describe '#update' do it 'accumulates across the default boundaries' do - ebh.update(0, {}) + ebh.update(0, {}, data_points) - ebh.update(1, {}) - ebh.update(5, {}) + ebh.update(1, {}, data_points) + ebh.update(5, {}, data_points) - ebh.update(6, {}) - ebh.update(10, {}) + ebh.update(6, {}, data_points) + ebh.update(10, {}, data_points) - ebh.update(11, {}) - ebh.update(25, {}) + ebh.update(11, {}, data_points) + ebh.update(25, {}, data_points) - ebh.update(26, {}) - ebh.update(50, {}) + ebh.update(26, {}, data_points) + ebh.update(50, {}, data_points) - ebh.update(51, {}) - ebh.update(75, {}) + ebh.update(51, {}, data_points) + ebh.update(75, {}, data_points) - ebh.update(76, {}) - ebh.update(100, {}) + ebh.update(76, {}, data_points) + ebh.update(100, {}, data_points) - ebh.update(101, {}) - ebh.update(250, {}) + ebh.update(101, {}, data_points) + ebh.update(250, {}, data_points) - ebh.update(251, {}) - ebh.update(500, {}) + ebh.update(251, {}, data_points) + ebh.update(500, {}, data_points) - ebh.update(501, {}) - ebh.update(1000, {}) + ebh.update(501, {}, data_points) + ebh.update(1000, {}, data_points) - ebh.update(1001, {}) + ebh.update(1001, {}, data_points) - hdp = ebh.collect(start_time, end_time)[0] + hdp = ebh.collect(start_time, end_time, data_points)[0] _(hdp.bucket_counts).must_equal([1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1]) _(hdp.sum).must_equal(4040) _(hdp.min).must_equal(0) @@ -190,8 +191,8 @@ let(:boundaries) { [4, 2, 1] } it 'sorts it' do - ebh.update(0, {}) - _(ebh.collect(start_time, end_time)[0].explicit_bounds).must_equal([1, 2, 4]) + ebh.update(0, {}, data_points) + _(ebh.collect(start_time, end_time, data_points)[0].explicit_bounds).must_equal([1, 2, 4]) end end @@ -199,8 +200,8 @@ let(:record_min_max) { false } it 'does not record min max values' do - ebh.update(-1, {}) - hdp = ebh.collect(start_time, end_time)[0] + ebh.update(-1, {}, data_points) + hdp = ebh.collect(start_time, end_time, data_points)[0] _(hdp.min).must_be_nil _(hdp.min).must_be_nil end @@ -210,14 +211,14 @@ let(:boundaries) { [0, 2, 4] } it 'aggregates' do - ebh.update(-1, {}) - ebh.update(0, {}) - ebh.update(1, {}) - ebh.update(2, {}) - ebh.update(3, {}) - ebh.update(4, {}) - ebh.update(5, {}) - hdp = ebh.collect(start_time, end_time)[0] + ebh.update(-1, {}, data_points) + ebh.update(0, {}, data_points) + ebh.update(1, {}, data_points) + ebh.update(2, {}, data_points) + ebh.update(3, {}, data_points) + ebh.update(4, {}, data_points) + ebh.update(5, {}, data_points) + hdp = ebh.collect(start_time, end_time, data_points)[0] _(hdp.bucket_counts).must_equal([2, 2, 2, 1]) end @@ -227,9 +228,9 @@ let(:boundaries) { [0] } it 'aggregates' do - ebh.update(-1, {}) - ebh.update(1, {}) - hdp = ebh.collect(start_time, end_time)[0] + ebh.update(-1, {}, data_points) + ebh.update(1, {}, data_points) + hdp = ebh.collect(start_time, end_time, data_points)[0] _(hdp.bucket_counts).must_equal([1, 1]) end @@ -239,9 +240,9 @@ let(:boundaries) { [] } it 'aggregates but does not record bucket counts' do - ebh.update(-1, {}) - ebh.update(3, {}) - hdp = ebh.collect(start_time, end_time)[0] + ebh.update(-1, {}, data_points) + ebh.update(3, {}, data_points) + hdp = ebh.collect(start_time, end_time, data_points)[0] _(hdp.bucket_counts).must_be_nil _(hdp.explicit_bounds).must_be_nil @@ -256,9 +257,9 @@ let(:boundaries) { nil } it 'aggregates but does not record bucket counts' do - ebh.update(-1, {}) - ebh.update(3, {}) - hdp = ebh.collect(start_time, end_time)[0] + ebh.update(-1, {}, data_points) + ebh.update(3, {}, data_points) + hdp = ebh.collect(start_time, end_time, data_points)[0] _(hdp.bucket_counts).must_be_nil _(hdp.explicit_bounds).must_be_nil diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/last_value_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/last_value_test.rb new file mode 100644 index 0000000000..8714b698d5 --- /dev/null +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/last_value_test.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +describe OpenTelemetry::SDK::Metrics::Aggregation::LastValue do + let(:data_points) { {} } + let(:last_value_aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new(aggregation_temporality: aggregation_temporality) } + let(:aggregation_temporality) { :delta } + + # Time in nano + let(:start_time) { (Time.now.to_r * 1_000_000_000).to_i } + let(:end_time) { ((Time.now + 60).to_r * 1_000_000_000).to_i } + + it 'sets the timestamps' do + last_value_aggregation.update(0, {}, data_points) + ndp = last_value_aggregation.collect(start_time, end_time, data_points)[0] + _(ndp.start_time_unix_nano).must_equal(start_time) + _(ndp.time_unix_nano).must_equal(end_time) + end + + it 'aggregates and collects should collect the last value' do + last_value_aggregation.update(1, {}, data_points) + last_value_aggregation.update(2, {}, data_points) + + last_value_aggregation.update(2, { 'foo' => 'bar' }, data_points) + last_value_aggregation.update(2, { 'foo' => 'bar' }, data_points) + + ndps = last_value_aggregation.collect(start_time, end_time, data_points) + _(ndps[0].value).must_equal(2) + _(ndps[0].attributes).must_equal({}, data_points) + + _(ndps[1].value).must_equal(2) + _(ndps[1].attributes).must_equal('foo' => 'bar') + end +end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/sum_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/sum_test.rb index 3c0a3931e3..18e07ec32d 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/sum_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/aggregation/sum_test.rb @@ -7,6 +7,7 @@ require 'test_helper' describe OpenTelemetry::SDK::Metrics::Aggregation::Sum do + let(:data_points) { {} } let(:sum_aggregation) { OpenTelemetry::SDK::Metrics::Aggregation::Sum.new(aggregation_temporality: aggregation_temporality) } let(:aggregation_temporality) { :delta } @@ -15,38 +16,38 @@ let(:end_time) { ((Time.now + 60).to_r * 1_000_000_000).to_i } it 'sets the timestamps' do - sum_aggregation.update(0, {}) - ndp = sum_aggregation.collect(start_time, end_time)[0] + sum_aggregation.update(0, {}, data_points) + ndp = sum_aggregation.collect(start_time, end_time, data_points)[0] _(ndp.start_time_unix_nano).must_equal(start_time) _(ndp.time_unix_nano).must_equal(end_time) end it 'aggregates and collects' do - sum_aggregation.update(1, {}) - sum_aggregation.update(2, {}) + sum_aggregation.update(1, {}, data_points) + sum_aggregation.update(2, {}, data_points) - sum_aggregation.update(2, 'foo' => 'bar') - sum_aggregation.update(2, 'foo' => 'bar') + sum_aggregation.update(2, { 'foo' => 'bar' }, data_points) + sum_aggregation.update(2, { 'foo' => 'bar' }, data_points) - ndps = sum_aggregation.collect(start_time, end_time) + ndps = sum_aggregation.collect(start_time, end_time, data_points) _(ndps[0].value).must_equal(3) - _(ndps[0].attributes).must_equal({}) + _(ndps[0].attributes).must_equal({}, data_points) _(ndps[1].value).must_equal(4) _(ndps[1].attributes).must_equal('foo' => 'bar') end it 'does not aggregate between collects' do - sum_aggregation.update(1, {}) - sum_aggregation.update(2, {}) - ndps = sum_aggregation.collect(start_time, end_time) + sum_aggregation.update(1, {}, data_points) + sum_aggregation.update(2, {}, data_points) + ndps = sum_aggregation.collect(start_time, end_time, data_points) - sum_aggregation.update(1, {}) + sum_aggregation.update(1, {}, data_points) # Assert that the recent update does not # impact the already collected metrics _(ndps[0].value).must_equal(3) - ndps = sum_aggregation.collect(start_time, end_time) + ndps = sum_aggregation.collect(start_time, end_time, data_points) # Assert that we are not accumulating values # between calls to collect _(ndps[0].value).must_equal(1) @@ -56,16 +57,16 @@ let(:aggregation_temporality) { :not_delta } it 'allows metrics to accumulate' do - sum_aggregation.update(1, {}) - sum_aggregation.update(2, {}) - ndps = sum_aggregation.collect(start_time, end_time) + sum_aggregation.update(1, {}, data_points) + sum_aggregation.update(2, {}, data_points) + ndps = sum_aggregation.collect(start_time, end_time, data_points) - sum_aggregation.update(1, {}) + sum_aggregation.update(1, {}, data_points) # Assert that the recent update does not # impact the already collected metrics _(ndps[0].value).must_equal(3) - ndps = sum_aggregation.collect(start_time, end_time) + ndps = sum_aggregation.collect(start_time, end_time, data_points) # Assert that we are accumulating values # and not just capturing the delta since # the previous collect call diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/counter_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/counter_test.rb index c00e2eba7b..ff5c3edfe2 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/counter_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/counter_test.rb @@ -20,7 +20,7 @@ it 'counts' do counter.add(1, attributes: { 'foo' => 'bar' }) metric_exporter.pull - last_snapshot = metric_exporter.metric_snapshots.last + last_snapshot = metric_exporter.metric_snapshots _(last_snapshot[0].name).must_equal('counter') _(last_snapshot[0].unit).must_equal('smidgen') diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/histogram_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/histogram_test.rb index bddc7f5568..771ffaef83 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/histogram_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/histogram_test.rb @@ -21,7 +21,7 @@ histogram.record(5, attributes: { 'foo' => 'bar' }) histogram.record(6, attributes: { 'foo' => 'bar' }) metric_exporter.pull - last_snapshot = metric_exporter.metric_snapshots.last + last_snapshot = metric_exporter.metric_snapshots _(last_snapshot[0].name).must_equal('histogram') _(last_snapshot[0].unit).must_equal('smidgen') diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/up_down_counter_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/up_down_counter_test.rb index 456c2c5052..687ad27a89 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/up_down_counter_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/instrument/up_down_counter_test.rb @@ -21,7 +21,7 @@ up_down_counter.add(1, attributes: { 'foo' => 'bar' }) up_down_counter.add(-2, attributes: { 'foo' => 'bar' }) metric_exporter.pull - last_snapshot = metric_exporter.metric_snapshots.last + last_snapshot = metric_exporter.metric_snapshots _(last_snapshot[0].name).must_equal('up_down_counter') _(last_snapshot[0].unit).must_equal('smidgen') diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/meter_provider_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/meter_provider_test.rb index f781d613b3..a82513a3c4 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/meter_provider_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/meter_provider_test.rb @@ -134,8 +134,26 @@ end end - # TODO: OpenTelemetry.meter_provider.add_view describe '#add_view' do + it 'adds a view with aggregation' do + OpenTelemetry.meter_provider.add_view('test', aggregation: ::OpenTelemetry::SDK::Metrics::Aggregation::Drop.new) + + registered_views = OpenTelemetry.meter_provider.instance_variable_get(:@registered_views) + + _(registered_views.size).must_equal 1 + _(registered_views[0].class).must_equal ::OpenTelemetry::SDK::Metrics::View::RegisteredView + _(registered_views[0].name).must_equal 'test' + _(registered_views[0].aggregation.class).must_equal ::OpenTelemetry::SDK::Metrics::Aggregation::Drop + end + + it 'add a view without aggregation but aggregation as nil' do + OpenTelemetry.meter_provider.add_view('test') + + registered_views = OpenTelemetry.meter_provider.instance_variable_get(:@registered_views) + + _(registered_views.size).must_equal 1 + _(registered_views[0].aggregation).must_be_nil + end end private diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/view/registered_view_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/view/registered_view_test.rb new file mode 100644 index 0000000000..4638485777 --- /dev/null +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/view/registered_view_test.rb @@ -0,0 +1,148 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +describe OpenTelemetry::SDK::Metrics::View::RegisteredView do + describe '#registered_view' do + before { reset_metrics_sdk } + + it 'emits metrics with no data_points if view is drop' do + OpenTelemetry::SDK.configure + + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new + OpenTelemetry.meter_provider.add_metric_reader(metric_exporter) + + meter = OpenTelemetry.meter_provider.meter('test') + OpenTelemetry.meter_provider.add_view('counter', aggregation: ::OpenTelemetry::SDK::Metrics::Aggregation::Drop.new) + + counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something') + + counter.add(1) + counter.add(2, attributes: { 'a' => 'b' }) + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots + + _(last_snapshot).wont_be_empty + _(last_snapshot[0].name).must_equal('counter') + _(last_snapshot[0].unit).must_equal('smidgen') + _(last_snapshot[0].description).must_equal('a small amount of something') + + _(last_snapshot[0].instrumentation_scope.name).must_equal('test') + + _(last_snapshot[0].data_points[0].value).must_equal 0 + _(last_snapshot[0].data_points[0].start_time_unix_nano).must_equal 0 + _(last_snapshot[0].data_points[0].time_unix_nano).must_equal 0 + + _(last_snapshot[0].data_points[1].value).must_equal 0 + _(last_snapshot[0].data_points[1].start_time_unix_nano).must_equal 0 + _(last_snapshot[0].data_points[1].time_unix_nano).must_equal 0 + end + + it 'emits metrics with only last value in data_points if view is last_value' do + OpenTelemetry::SDK.configure + + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new + OpenTelemetry.meter_provider.add_metric_reader(metric_exporter) + + meter = OpenTelemetry.meter_provider.meter('test') + OpenTelemetry.meter_provider.add_view('counter', aggregation: ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new) + + counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something') + + counter.add(1) + counter.add(2) + counter.add(3) + counter.add(4) + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots + + _(last_snapshot[0].data_points).wont_be_empty + _(last_snapshot[0].data_points[0].value).must_equal 4 + end + + it 'emits metrics with sum of value in data_points if view is last_value but not matching to instrument' do + OpenTelemetry::SDK.configure + + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new + OpenTelemetry.meter_provider.add_metric_reader(metric_exporter) + + meter = OpenTelemetry.meter_provider.meter('test') + OpenTelemetry.meter_provider.add_view('retnuoc', aggregation: ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new) + + counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something') + + counter.add(1) + counter.add(2) + counter.add(3) + counter.add(4) + + metric_exporter.pull + last_snapshot = metric_exporter.metric_snapshots + + _(last_snapshot[0].data_points).wont_be_empty + _(last_snapshot[0].data_points[0].value).must_equal 10 + end + end + + describe '#registered_view select instrument' do + let(:registered_view) { OpenTelemetry::SDK::Metrics::View::RegisteredView.new(nil, aggregation: ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new) } + let(:instrumentation_scope) do + OpenTelemetry::SDK::InstrumentationScope.new('test_scope', '1.0.1') + end + + let(:metric_stream) do + OpenTelemetry::SDK::Metrics::State::MetricStream.new('test', 'description', 'smidgen', :counter, nil, instrumentation_scope, nil) + end + + it 'registered view with matching name' do + registered_view.instance_variable_set(:@name, 'test') + registered_view.send(:generate_regex_pattern, 'test') + _(registered_view.match_instrument?(metric_stream)).must_equal true + end + + it 'registered view with matching type' do + registered_view.instance_variable_set(:@options, { type: :counter }) + _(registered_view.match_instrument?(metric_stream)).must_equal true + end + + it 'registered view with matching version' do + registered_view.instance_variable_set(:@options, { meter_version: '1.0.1' }) + _(registered_view.match_instrument?(metric_stream)).must_equal true + end + + it 'registered view with matching meter_name' do + registered_view.instance_variable_set(:@options, { meter_name: 'test_scope' }) + _(registered_view.match_instrument?(metric_stream)).must_equal true + end + + it 'do not registered view with unmatching name and matching type' do + registered_view.instance_variable_set(:@options, { type: :counter }) + registered_view.instance_variable_set(:@name, 'tset') + _(registered_view.match_instrument?(metric_stream)).must_equal false + end + + describe '#name_match' do + it 'name_match_for_wild_card' do + registered_view.instance_variable_set(:@name, 'log*2024?.txt') + registered_view.send(:generate_regex_pattern, 'log*2024?.txt') + _(registered_view.name_match('logfile20242.txt')).must_equal true + _(registered_view.name_match('log2024a.txt')).must_equal true + _(registered_view.name_match('log_test_2024.txt')).must_equal false + end + + it 'name_match_for_*' do + registered_view.instance_variable_set(:@name, '*') + registered_view.send(:generate_regex_pattern, '*') + _(registered_view.name_match('*')).must_equal true + _(registered_view.name_match('aaaaaaaaa')).must_equal true + _(registered_view.name_match('!@#$%^&')).must_equal true + end + end + end +end