diff --git a/README.md b/README.md index 0b0057e..8309ccc 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ from postgrestq import TaskQueue task_queue = TaskQueue(POSTGRES_CONN_STR, queue_name, reset=True) while True: - task, task_id = task_queue.get() + task, task_id, _queue_name = task_queue.get() if task is not None: # do something with task and mark it as complete afterwards task_queue.complete(task_id) @@ -70,6 +70,9 @@ while True: time.sleep(1) ``` +Notice that `get()` returns the queue name too, in case in future multi-queue is implemented. +At the moment it's always the same as the queue_name given to the class. + Or you can even use the \_\_iter\_\_() method of the class TaskQueue and loop over the queue: ```py @@ -77,7 +80,7 @@ from postgrestq import TaskQueue task_queue = TaskQueue(POSTGRES_CONN_STR, queue_name, reset=True) -for task, id_ in taskqueue: +for task, id_, queue_name in taskqueue: # do something with task and it's automatically # marked as completed by the iterator at the end # of the iteration diff --git a/postgrestq/task_queue.py b/postgrestq/task_queue.py index 8c1f949..b262188 100644 --- a/postgrestq/task_queue.py +++ b/postgrestq/task_queue.py @@ -256,7 +256,11 @@ def add_many( self.conn.commit() return ret_ids - def get(self) -> Tuple[Optional[Dict[str, Any]], Optional[UUID]]: + def get(self) -> Tuple[ + Optional[Dict[str, Any]], + Optional[UUID], + Optional[str], + ]: """Get a task from the task queue (non-blocking). This statement marks the next available task in the queue as @@ -273,7 +277,7 @@ def get(self) -> Tuple[Optional[Dict[str, Any]], Optional[UUID]]: In order to mark that task as done, you have to do: - >>> task, task_id = taskqueue.get() + >>> task, task_id, queue_name = taskqueue.get() >>> # do something >>> taskqueue.complete(task_id) @@ -287,8 +291,8 @@ def get(self) -> Tuple[Optional[Dict[str, Any]], Optional[UUID]]: Returns ------- - (task, task_id) : - The next item from the task list or (None, None) if it's + (task, task_id, queue_name) : + The next item from the task list or (None, None, None) if it's empty """ @@ -323,13 +327,13 @@ def get(self) -> Tuple[Optional[Dict[str, Any]], Optional[UUID]]: row = cur.fetchone() conn.commit() if row is None: - return None, None + return None, None, None task_id, task = row logger.info(f"Got task with id {task_id}") - return task, task_id + return task, task_id, self._queue_name def get_many(self, amount: int) -> List[ - Tuple[Optional[Dict[str, Any]], Optional[UUID]] + Tuple[Optional[Dict[str, Any]], Optional[UUID], Optional[str]], ]: """Same as get() but retrieves multiple tasks. @@ -342,8 +346,8 @@ def get_many(self, amount: int) -> List[ Returns ------- - list of (task, task_id) : - The tasks and their IDs + list of (task, task_id, queue_name) : + The tasks and their IDs, and the queue_name """ conn = self.conn @@ -375,9 +379,9 @@ def get_many(self, amount: int) -> List[ ) ret = [] - for row in cur.fetchall(): - logger.info(f"Got task with id {row[1]}") - ret.append(row) + for task, task_id in cur.fetchall(): + logger.info(f"Got task with id {task_id}") + ret.append((task, task_id, self._queue_name,)) conn.commit() return ret @@ -665,14 +669,14 @@ def __iter__( Yields ------- - (any, str) : - A tuple containing the task content and its id + (any, UUID, str) : + A tuple containing the task content, its id and the queue name """ while True: - task, id_ = self.get() + task, id_, queue_name = self.get() if id_ is not None: - yield task, id_ + yield task, id_, queue_name self.complete(id_) if self.is_empty(): logger.debug( diff --git a/tests/test_task_queue.py b/tests/test_task_queue.py index 70a24e1..f9e2b7d 100644 --- a/tests/test_task_queue.py +++ b/tests/test_task_queue.py @@ -50,19 +50,21 @@ def test_add(task_queue: TaskQueue): ) task_ids.add(tid) assert len(task_ids) == 3 - task, _ = task_queue.get() + task, _, qname = task_queue.get() assert task == TASKS[0] - task, _ = task_queue.get() + assert qname == "test_queue" + task, _, qname = task_queue.get() assert task == TASKS[1] + assert qname == "test_queue" def test_get(task_queue: TaskQueue): TASK = {"foo": 1} task_queue.add(TASK, LEASE_TIMEOUT) - task, _ = task_queue.get() + task, _, _ = task_queue.get() assert task == TASK # calling on empty queue returns None - assert task_queue.get() == (None, None) + assert task_queue.get() == (None, None, None) def test_is_empty(task_queue: TaskQueue): @@ -71,9 +73,9 @@ def test_is_empty(task_queue: TaskQueue): task_queue.add({"foo": 1}, LEASE_TIMEOUT) assert not task_queue.is_empty() - task, id_ = task_queue.get() + task, id_, _qname = task_queue.get() assert not task_queue.is_empty() - + assert _qname == "test_queue" task_queue.complete(id_) assert task_queue.is_empty() @@ -81,14 +83,16 @@ def test_is_empty(task_queue: TaskQueue): def test_complete(task_queue: TaskQueue): # boring case task_queue.add({"foo": 1}, LEASE_TIMEOUT, ttl=1) - _, id_ = task_queue.get() + _, id_, qname = task_queue.get() assert not task_queue.is_empty() + assert qname == "test_queue" task_queue.complete(id_) assert task_queue.is_empty() # interesting case: we complete the task after it expired already task_queue.add({"foo": 1}, LEASE_TIMEOUT, ttl=1) - _, id_ = task_queue.get() + _, id_, qname = task_queue.get() + assert qname == "test_queue" time.sleep(LEASE_TIMEOUT + 0.1) assert task_queue.is_empty() task_queue.complete(id_) @@ -162,13 +166,14 @@ def test_callback(task_queue: TaskQueue): def test_reschedule(task_queue: TaskQueue): task_queue.add({"foo": 1}, LEASE_TIMEOUT) - _, id_ = task_queue.get() + _, id_, qname = task_queue.get() # task queue should be empty as 'foo' is in the processing queue - assert task_queue.get() == (None, None) - + assert task_queue.get() == (None, None, None) + assert qname == "test_queue" task_queue.reschedule(id_) - task, _ = task_queue.get() + task, _, qname = task_queue.get() assert task == {"foo": 1} + assert qname == "test_queue" def test_reschedule_error(task_queue: TaskQueue): @@ -187,7 +192,7 @@ def test_full(task_queue: TaskQueue): counter = 0 while True: - task, task_id = task_queue.get() + task, task_id, qname = task_queue.get() if task is not None: task_queue.complete(task_id) counter += 1 @@ -195,6 +200,7 @@ def test_full(task_queue: TaskQueue): break assert counter == len(TASKS) + assert qname == "test_queue" def test_complete_rescheduled_task(task_queue: TaskQueue): @@ -202,12 +208,12 @@ def test_complete_rescheduled_task(task_queue: TaskQueue): task_queue.add(TASK_CONTENT, LEASE_TIMEOUT, ttl=3) # start a task and let it expire... - _, task_id = task_queue.get() + _, task_id, qname = task_queue.get() time.sleep(LEASE_TIMEOUT + 0.1) # check and put it back into task queue assert not task_queue.is_empty() - + assert qname == "test_queue" # now the task is completed, although it took a long time... task_queue.complete(task_id) @@ -221,16 +227,17 @@ def test_tolerate_double_completion(task_queue: TaskQueue): task_queue.add(TASK_CONTENT, LEASE_TIMEOUT, ttl=3) # start a task and let it expire... - task, task_id = task_queue.get() + task, task_id, qname = task_queue.get() + assert qname == "test_queue" time.sleep(LEASE_TIMEOUT + 0.1) # check and put it back into task queue assert not task_queue.is_empty() # get it again - _, task_redo_id = task_queue.get() + _, task_redo_id, qname = task_queue.get() assert task_redo_id == task_id - + assert qname == "test_queue" # now the task is completed, although it took a long time... task_queue.complete(task_id) @@ -267,7 +274,7 @@ def test_iterator(task_queue: TaskQueue): task_queue.add({"blip": "blop"}, LEASE_TIMEOUT, ttl=3) found_tasks = [] - for task, id in task_queue: + for task, id, qname in task_queue: found_tasks.append(task) assert found_tasks == [{"bla": "bla"}, {"blip": "blop"}]