Skip to content

Commit

Permalink
Add the queue name to the get method to handle multi-queue in the future
Browse files Browse the repository at this point in the history
  • Loading branch information
jacopo committed May 24, 2024
1 parent e3cff4e commit 1f14244
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 37 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ from postgrestq import TaskQueue

task_queue = TaskQueue(POSTGRES_CONN_STR, queue_name, reset=True)
while True:
task, task_id = task_queue.get()
task, task_id, _queue_name = task_queue.get()
if task is not None:
# do something with task and mark it as complete afterwards
task_queue.complete(task_id)
Expand All @@ -70,14 +70,17 @@ while True:
time.sleep(1)
```

Notice that `get()` returns the queue name too, in case in future multi-queue is implemented.
At the moment it's always the same as the queue_name given to the class.

Or you can even use the \_\_iter\_\_() method of the class TaskQueue and loop over the queue:

```py
from postgrestq import TaskQueue

task_queue = TaskQueue(POSTGRES_CONN_STR, queue_name, reset=True)

for task, id_ in taskqueue:
for task, id_, queue_name in taskqueue:
# do something with task and it's automatically
# marked as completed by the iterator at the end
# of the iteration
Expand Down
36 changes: 20 additions & 16 deletions postgrestq/task_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,11 @@ def add_many(
self.conn.commit()
return ret_ids

def get(self) -> Tuple[Optional[Dict[str, Any]], Optional[UUID]]:
def get(self) -> Tuple[
Optional[Dict[str, Any]],
Optional[UUID],
Optional[str],
]:
"""Get a task from the task queue (non-blocking).
This statement marks the next available task in the queue as
Expand All @@ -273,7 +277,7 @@ def get(self) -> Tuple[Optional[Dict[str, Any]], Optional[UUID]]:
In order to mark that task as done, you have to do:
>>> task, task_id = taskqueue.get()
>>> task, task_id, queue_name = taskqueue.get()
>>> # do something
>>> taskqueue.complete(task_id)
Expand All @@ -287,8 +291,8 @@ def get(self) -> Tuple[Optional[Dict[str, Any]], Optional[UUID]]:
Returns
-------
(task, task_id) :
The next item from the task list or (None, None) if it's
(task, task_id, queue_name) :
The next item from the task list or (None, None, None) if it's
empty
"""
Expand Down Expand Up @@ -323,13 +327,13 @@ def get(self) -> Tuple[Optional[Dict[str, Any]], Optional[UUID]]:
row = cur.fetchone()
conn.commit()
if row is None:
return None, None
return None, None, None
task_id, task = row
logger.info(f"Got task with id {task_id}")
return task, task_id
return task, task_id, self._queue_name

def get_many(self, amount: int) -> List[
Tuple[Optional[Dict[str, Any]], Optional[UUID]]
Tuple[Optional[Dict[str, Any]], Optional[UUID], Optional[str]],
]:
"""Same as get() but retrieves multiple tasks.
Expand All @@ -342,8 +346,8 @@ def get_many(self, amount: int) -> List[
Returns
-------
list of (task, task_id) :
The tasks and their IDs
list of (task, task_id, queue_name) :
The tasks and their IDs, and the queue_name
"""
conn = self.conn
Expand Down Expand Up @@ -375,9 +379,9 @@ def get_many(self, amount: int) -> List[
)

ret = []
for row in cur.fetchall():
logger.info(f"Got task with id {row[1]}")
ret.append(row)
for task, task_id in cur.fetchall():
logger.info(f"Got task with id {task_id}")
ret.append((task, task_id, self._queue_name,))
conn.commit()
return ret

Expand Down Expand Up @@ -665,14 +669,14 @@ def __iter__(
Yields
-------
(any, str) :
A tuple containing the task content and its id
(any, UUID, str) :
A tuple containing the task content, its id and the queue name
"""
while True:
task, id_ = self.get()
task, id_, queue_name = self.get()
if id_ is not None:
yield task, id_
yield task, id_, queue_name
self.complete(id_)
if self.is_empty():
logger.debug(
Expand Down
45 changes: 26 additions & 19 deletions tests/test_task_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,21 @@ def test_add(task_queue: TaskQueue):
)
task_ids.add(tid)
assert len(task_ids) == 3
task, _ = task_queue.get()
task, _, qname = task_queue.get()
assert task == TASKS[0]
task, _ = task_queue.get()
assert qname == "test_queue"
task, _, qname = task_queue.get()
assert task == TASKS[1]
assert qname == "test_queue"


def test_get(task_queue: TaskQueue):
TASK = {"foo": 1}
task_queue.add(TASK, LEASE_TIMEOUT)
task, _ = task_queue.get()
task, _, _ = task_queue.get()
assert task == TASK
# calling on empty queue returns None
assert task_queue.get() == (None, None)
assert task_queue.get() == (None, None, None)


def test_is_empty(task_queue: TaskQueue):
Expand All @@ -71,24 +73,26 @@ def test_is_empty(task_queue: TaskQueue):
task_queue.add({"foo": 1}, LEASE_TIMEOUT)
assert not task_queue.is_empty()

task, id_ = task_queue.get()
task, id_, _qname = task_queue.get()
assert not task_queue.is_empty()

assert _qname == "test_queue"
task_queue.complete(id_)
assert task_queue.is_empty()


def test_complete(task_queue: TaskQueue):
# boring case
task_queue.add({"foo": 1}, LEASE_TIMEOUT, ttl=1)
_, id_ = task_queue.get()
_, id_, qname = task_queue.get()
assert not task_queue.is_empty()
assert qname == "test_queue"
task_queue.complete(id_)
assert task_queue.is_empty()

# interesting case: we complete the task after it expired already
task_queue.add({"foo": 1}, LEASE_TIMEOUT, ttl=1)
_, id_ = task_queue.get()
_, id_, qname = task_queue.get()
assert qname == "test_queue"
time.sleep(LEASE_TIMEOUT + 0.1)
assert task_queue.is_empty()
task_queue.complete(id_)
Expand Down Expand Up @@ -162,13 +166,14 @@ def test_callback(task_queue: TaskQueue):

def test_reschedule(task_queue: TaskQueue):
task_queue.add({"foo": 1}, LEASE_TIMEOUT)
_, id_ = task_queue.get()
_, id_, qname = task_queue.get()
# task queue should be empty as 'foo' is in the processing queue
assert task_queue.get() == (None, None)

assert task_queue.get() == (None, None, None)
assert qname == "test_queue"
task_queue.reschedule(id_)
task, _ = task_queue.get()
task, _, qname = task_queue.get()
assert task == {"foo": 1}
assert qname == "test_queue"


def test_reschedule_error(task_queue: TaskQueue):
Expand All @@ -187,27 +192,28 @@ def test_full(task_queue: TaskQueue):

counter = 0
while True:
task, task_id = task_queue.get()
task, task_id, qname = task_queue.get()
if task is not None:
task_queue.complete(task_id)
counter += 1
if task_queue.is_empty():
break

assert counter == len(TASKS)
assert qname == "test_queue"


def test_complete_rescheduled_task(task_queue: TaskQueue):
TASK_CONTENT = {"sloth": 1}
task_queue.add(TASK_CONTENT, LEASE_TIMEOUT, ttl=3)

# start a task and let it expire...
_, task_id = task_queue.get()
_, task_id, qname = task_queue.get()
time.sleep(LEASE_TIMEOUT + 0.1)

# check and put it back into task queue
assert not task_queue.is_empty()

assert qname == "test_queue"
# now the task is completed, although it took a long time...
task_queue.complete(task_id)

Expand All @@ -221,16 +227,17 @@ def test_tolerate_double_completion(task_queue: TaskQueue):
task_queue.add(TASK_CONTENT, LEASE_TIMEOUT, ttl=3)

# start a task and let it expire...
task, task_id = task_queue.get()
task, task_id, qname = task_queue.get()
assert qname == "test_queue"
time.sleep(LEASE_TIMEOUT + 0.1)

# check and put it back into task queue
assert not task_queue.is_empty()

# get it again
_, task_redo_id = task_queue.get()
_, task_redo_id, qname = task_queue.get()
assert task_redo_id == task_id

assert qname == "test_queue"
# now the task is completed, although it took a long time...
task_queue.complete(task_id)

Expand Down Expand Up @@ -267,7 +274,7 @@ def test_iterator(task_queue: TaskQueue):
task_queue.add({"blip": "blop"}, LEASE_TIMEOUT, ttl=3)

found_tasks = []
for task, id in task_queue:
for task, id, qname in task_queue:
found_tasks.append(task)
assert found_tasks == [{"bla": "bla"}, {"blip": "blop"}]

Expand Down

0 comments on commit 1f14244

Please sign in to comment.