Skip to content

Commit

Permalink
Add a cloudwatch argument to count_queued_submissions
Browse files Browse the repository at this point in the history
This steals liberally from check-celery-queues.py
  • Loading branch information
jibsheet committed Jun 21, 2018
1 parent a123c92 commit 0c2b33e
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 0 deletions.
104 changes: 104 additions & 0 deletions queue/management/commands/count_queued_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@

from __future__ import unicode_literals

from itertools import izip_longest
from queue.models import Submission

import backoff
import boto3
import botocore
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.db.models import Count
Expand All @@ -27,6 +31,11 @@ def add_arguments(self, parser):
action='store_true',
help='Submit New Relic custom metrics'
)
parser.add_argument(
'--cloudwatch',
action='store_true',
help='Submit CloudWatch custom metrics'
)

def handle(self, *args, **options):
"""
Expand All @@ -48,6 +57,10 @@ def handle(self, *args, **options):
if use_newrelic:
self.send_nr_metrics(queue_counts)

use_cloudwatch = options.get('cloudwatch')
if use_cloudwatch:
self.send_cloudwatch_metrics(queue_counts)

def pretty_print_queues(self, queue_counts):
"""
Send a tabulated log output of the queues and the counts to the console
Expand All @@ -73,3 +86,94 @@ def send_nr_metrics(self, queue_counts):
'Custom/XQueueLength/{}[submissions]'.format(queue['queue_name']),
queue['queue_count'],
application=nr_app)

def send_cloudwatch_metrics(self, queue_counts):
"""
Send custom metrics to AWS CloudWatch
"""
cloudwatch = CwBotoWrapper()
cloudwatch_configuration = settings.CLOUDWATCH_QUEUE_COUNT_METRICS
metric_name = 'queue_length'
dimension = 'queue'
environment = cloudwatch_configuration['environment']
deployment = cloudwatch_configuration['deployment']
namespace = "xqueue/{}-{}".format(environment,
deployment)

# iterate 10 at a time through the list of queues to stay under AWS limits.
for queues in grouper(queue_counts, 10):
# grouper can return a bunch of Nones and we want to skip those
queues = [q for q in queues if q is not None]
metric_data = []
for queue in queues:
metric_data.append({
'MetricName': metric_name,
'Dimensions': [{
"Name": dimension,
"Value": queue['queue_name']
}],
'Value': queue['queue_count']
})

if len(metric_data) > 0:
cloudwatch.put_metric_data(Namespace=namespace, MetricData=metric_data)

for queue in queues:
dimensions = [{'Name': dimension, 'Value': queue['queue_name']}]
threshold = cloudwatch_configuration['default_threshold']
if queue['queue_name'] in cloudwatch_configuration['thresholds']:
threshold = cloudwatch_configuration['thresholds'][queue['queue_name']]
# Period is in seconds - has to be over the max for an hour
period = 600
evaluation_periods = 6
comparison_operator = "GreaterThanThreshold"
treat_missing_data = "notBreaching"
statistic = "Maximum"
actions = [cloudwatch_configuration['sns_arn']]
alarm_name = "{}-{} {} queue length over threshold".format(environment,
deployment,
queue['queue_name'])

print('Creating or updating alarm "{}"'.format(alarm_name))
cloudwatch.put_metric_alarm(AlarmName=alarm_name,
AlarmDescription=alarm_name,
Namespace=namespace,
MetricName=metric_name,
Dimensions=dimensions,
Period=period,
EvaluationPeriods=evaluation_periods,
TreatMissingData=treat_missing_data,
Threshold=threshold,
ComparisonOperator=comparison_operator,
Statistic=statistic,
InsufficientDataActions=actions,
OKActions=actions,
AlarmActions=actions)


class CwBotoWrapper(object):
max_tries = 5

def __init__(self):
self.client = boto3.client('cloudwatch')

@backoff.on_exception(backoff.expo,
(botocore.exceptions.ClientError),
max_tries=max_tries)
def put_metric_data(self, *args, **kwargs):
return self.client.put_metric_data(*args, **kwargs)

@backoff.on_exception(backoff.expo,
(botocore.exceptions.ClientError),
max_tries=max_tries)
def put_metric_alarm(self, *args, **kwargs):
return self.client.put_metric_alarm(*args, **kwargs)


# Stolen right from the itertools recipes
# https://docs.python.org/3/library/itertools.html#itertools-recipes
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return izip_longest(*args, fillvalue=fillvalue)
30 changes: 30 additions & 0 deletions queue/management/commands/tests/test_count_queued_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,33 @@ def test_push_to_new_relic(self, mock_newrelic_agent):
mock_newrelic_agent.record_custom_metric.call_count)

mock_newrelic_agent.record_custom_metric.has_calls(expected_nr_calls, any_order=True)

@patch('boto3.client')
def test_push_to_cloudwatch(self, mock_boto3):
self._create_submission(queue_name="test-pull")
self._create_submission(queue_name="test2")
self._create_submission(queue_name="test2")
call_command('count_queued_submissions', '--cloudwatch', stdout=self.stdout)
self.assertRegexpMatches(self.stdout.getvalue(), r'test2\s*2\s*\ntest-pull\s*1')

metric_alarm_kwargs = []
for call in mock_boto3.mock_calls:
name, args, kwargs = call
if 'put_metric_name' in name:
self.assertEquals(len(kwargs['Metricdata']), 2)
self.assertEquals(kwargs,
{'Namespace': u'xqueue/dev-stack',
'MetricData': [
{u'Dimensions': [{u'Name': u'queue', u'Value': u'test2'}],
u'Value': 2,
u'MetricName': u'queue_length'
},
{u'Dimensions': [{u'Name': u'queue', u'Value': u'test-pull'}],
u'Value': 1,
u'MetricName': u'queue_length'}]})
if 'put_metric_alarm' in name:
metric_alarm_kwargs.append(kwargs)

self.assertEquals(len(metric_alarm_kwargs), 2)
self.assertEquals(metric_alarm_kwargs[0]['AlarmName'], u'dev-stack test2 queue length over threshold')
self.assertEquals(metric_alarm_kwargs[1]['AlarmName'], u'dev-stack test-pull queue length over threshold')
16 changes: 16 additions & 0 deletions xqueue/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,19 @@

# This is the list of users managed by update_users
USERS = None

# If you use count_queue_submissions to submit data to AWS CloudWatch you'll need to
# provide some information for how to construct the metrics and alarms.
# It will store metrics in a namespace of xqueue/environment-deployment and create an alarm
# for each queue with an alarm on the default_threshold. If you want a different threshold
# for a given queue, thresholds has a dictionary of "queue name" : "custom limit".
# All thresholds share the sns_arn.
CLOUDWATCH_QUEUE_COUNT_METRICS = {
'environment': 'dev',
'deployment': 'stack',
'sns_arn': 'arn:aws:sns:::',
'default_threshold': 50,
'thresholds': {
'test-pull': 100
}
}

0 comments on commit 0c2b33e

Please sign in to comment.