Skip to content

Commit

Permalink
Filtering CUPS redundent events
Browse files Browse the repository at this point in the history
  • Loading branch information
anxuae committed Oct 29, 2019
1 parent 5bc011d commit 7487cc9
Showing 1 changed file with 36 additions and 6 deletions.
42 changes: 36 additions & 6 deletions cups_notify/listener.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# -*- coding: utf-8 -*-

import socket
import threading
import os.path as osp
from contextlib import closing
from xml.etree import ElementTree
try:
from http.server import HTTPServer, BaseHTTPRequestHandler
Expand All @@ -13,6 +16,13 @@
from cups_notify import event


def find_free_port():
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.bind(('', 0))
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return sock.getsockname()[1]


class NotificationHandler(BaseHTTPRequestHandler):

def get_chunk_size(self):
Expand Down Expand Up @@ -49,10 +59,15 @@ def do_PUT(self):
else:
chunk_data = self.get_chunk_data(chunk_size)
root = ElementTree.fromstring(chunk_data.decode('utf-8'))

evts = []
for channel in root.iterfind('channel'):
for item in reversed([e for e in channel.iterfind('item')]):
txt = ElementTree.tostring(item, encoding='utf8')
self.server.callback(dict((elem.tag, elem.text) for elem in item.iter() if elem.text.strip()))
data = dict((elem.tag, elem.text) for elem in item.iter() if elem.text.strip())
evts.append(event.CupsEvent(data))

if evts:
self.server.notify(evts)

self.send_response(200)
self.end_headers()
Expand All @@ -61,13 +76,13 @@ def do_PUT(self):
class NotificationListerner(HTTPServer):

def __init__(self, cups_conn, callback, filters=None, address='localhost'):
HTTPServer.__init__(self, (address, 9988), NotificationHandler)
HTTPServer.__init__(self, (address, find_free_port()), NotificationHandler)
self._conn = cups_conn
self._thread = None
self._filters = filters or [event.CUPS_EVT_ALL]
self._rss_uri = 'rss://{}:{}'.format(self.server_address[0], self.server_address[1])

self.callback = callback
self._last_guid = -1
self._callback = callback

def start(self):
"""Start the notification server.
Expand All @@ -81,7 +96,11 @@ def start(self):
self.cancel_subscriptions()

# Renew notifications subscription
cups_uri = "ipp://localhost:{}".format(cups.getPort())
if osp.exists(cups.getServer()):
hostname = 'localhost'
else:
hostname = cups.getServer()
cups_uri = "ipp://{}:{}".format(hostname, cups.getPort())
self._conn.createSubscription(cups_uri,
recipient_uri=self._rss_uri,
events=self._filters)
Expand All @@ -93,6 +112,17 @@ def is_running(self):
return self._thread.is_alive()
return False

def notify(self, evts):
"""Notify the subscriber for new events. Events are filtered
because CUPS server re-sends all previously sent events.
"""
valid_evts = []
for evt in evts:
if evt.guid > self._last_guid:
valid_evts.append(evt.guid)
self._callback(evt)
self._last_guid = max(valid_evts)

def cancel_subscriptions(self):
"""Cancel all subscriptions related to the server URI.
"""
Expand Down

0 comments on commit 7487cc9

Please sign in to comment.