-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
created clustering module for lidar points (#40)
* created clustering module * init return type annotation * silence pylint * formatting * added result check
- Loading branch information
Showing
4 changed files
with
318 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
""" | ||
Clusters together LiDAR detections. | ||
""" | ||
|
||
import math | ||
|
||
from .. import detection_cluster | ||
from .. import detection_point | ||
from .. import lidar_detection | ||
|
||
|
||
class Clustering: | ||
""" | ||
Groups together LiDAR detections into clusters. | ||
""" | ||
|
||
def __init__(self, max_cluster_distance: float) -> None: | ||
""" | ||
Initialize max distance between points in the same cluster. | ||
""" | ||
self.max_cluster_distance = max_cluster_distance | ||
|
||
self.__clockwise = False | ||
self.__last_point = None | ||
self.__last_angle = None | ||
self.cluster = [] | ||
|
||
def __calculate_distance_between_two_points( | ||
self, p1: detection_point.DetectionPoint, p2: detection_point.DetectionPoint | ||
) -> float: | ||
""" | ||
Distance calculation between two points. | ||
""" | ||
return math.sqrt((p1.x - p2.x) ** 2 + (p1.y - p2.y) ** 2) | ||
|
||
def run( | ||
self, detection: lidar_detection.LidarDetection | ||
) -> "tuple[bool, detection_cluster.DetectionCluster | None]": | ||
""" | ||
Returns a DetectionCluster consisting of LidarDetections. | ||
""" | ||
# convert to x, y coordinates | ||
detection_angle_in_radians = detection.angle * math.pi / 180 | ||
x = math.cos(detection_angle_in_radians) * detection.distance | ||
y = math.sin(detection_angle_in_radians) * detection.distance | ||
|
||
result, point = detection_point.DetectionPoint.create(x, y) | ||
if not result: | ||
return False, None | ||
|
||
if self.__last_point is None: | ||
self.__last_point = point | ||
self.__last_angle = detection.angle | ||
self.cluster.append(point) | ||
return False, None | ||
|
||
# if lidar direction changes, start a new cluster | ||
direction_switched = False | ||
current_direction = self.__clockwise | ||
if detection.angle < self.__last_angle: | ||
self.__clockwise = False | ||
elif detection.angle > self.__last_angle: | ||
self.__clockwise = True | ||
if current_direction != self.__clockwise: | ||
direction_switched = True | ||
self.__last_angle = detection.angle | ||
|
||
# check distance from last point | ||
distance_from_last_point = self.__calculate_distance_between_two_points( | ||
point, self.__last_point | ||
) | ||
self.__last_point = point | ||
|
||
# if far enough, send current cluster, initialize new one | ||
if distance_from_last_point > self.max_cluster_distance or direction_switched: | ||
result, new_cluster = detection_cluster.DetectionCluster.create(self.cluster) | ||
if not result: | ||
return False, None | ||
self.cluster = [] | ||
self.cluster.append(point) | ||
return True, new_cluster | ||
|
||
# if close enough, cluster together | ||
self.cluster.append(point) | ||
return False, None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
""" | ||
Gets detection clusters. | ||
""" | ||
|
||
import queue | ||
|
||
from modules import lidar_detection | ||
from worker import queue_wrapper | ||
from worker import worker_controller | ||
from . import clustering | ||
|
||
|
||
def clustering_worker( | ||
max_cluster_distance: float, | ||
detection_in_queue: queue_wrapper.QueueWrapper, | ||
cluster_out_queue: queue_wrapper.QueueWrapper, | ||
controller: worker_controller.WorkerController, | ||
) -> None: | ||
""" | ||
Worker process. | ||
max_cluster_distance: max distance between points in the same cluster in metres. | ||
""" | ||
clusterer = clustering.Clustering(max_cluster_distance) | ||
|
||
while not controller.is_exit_requested(): | ||
controller.check_pause() | ||
|
||
try: | ||
detection: lidar_detection.LidarDetection = detection_in_queue.queue.get_nowait() | ||
if detection is None: | ||
break | ||
except queue.Empty: | ||
continue | ||
|
||
result, value = clusterer.run(detection) | ||
if not result: | ||
continue | ||
|
||
cluster_out_queue.queue.put(value) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
""" | ||
Clustering worker integration test. | ||
""" | ||
|
||
import multiprocessing as mp | ||
import queue | ||
import random | ||
import time | ||
|
||
from modules import detection_cluster | ||
from modules import lidar_detection | ||
from modules.clustering import clustering_worker | ||
from worker import queue_wrapper | ||
from worker import worker_controller | ||
|
||
# Constants | ||
QUEUE_MAX_SIZE = 10 | ||
DELAY = 0.1 | ||
MAX_CLUSTER_DISTANCE = 3.0 # metres | ||
|
||
|
||
def simulate_detection_worker(in_queue: queue_wrapper.QueueWrapper) -> None: | ||
""" | ||
Place example lidar reading into the queue. | ||
""" | ||
random_distance = random.randint(0, 50) | ||
result, detection = lidar_detection.LidarDetection.create(random_distance, 0.0) | ||
assert result | ||
assert detection is not None | ||
|
||
in_queue.queue.put(detection) | ||
|
||
|
||
def main() -> int: | ||
""" | ||
Main function. | ||
""" | ||
|
||
# Setup | ||
controller = worker_controller.WorkerController() | ||
mp_manager = mp.Manager() | ||
|
||
detection_in_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE) | ||
cluster_out_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE) | ||
|
||
worker = mp.Process( | ||
target=clustering_worker.clustering_worker, | ||
args=( | ||
MAX_CLUSTER_DISTANCE, | ||
detection_in_queue, | ||
cluster_out_queue, | ||
controller, | ||
), | ||
) | ||
|
||
# Run | ||
worker.start() | ||
|
||
while True: | ||
simulate_detection_worker(detection_in_queue) | ||
try: | ||
input_data: detection_cluster.DetectionCluster = cluster_out_queue.queue.get_nowait() | ||
|
||
assert input_data is not None | ||
assert str(type(input_data)) == "<class 'modules.detection_cluster.DetectionCluster'>" | ||
|
||
print(input_data) | ||
|
||
except queue.Empty: | ||
time.sleep(DELAY) | ||
continue | ||
|
||
time.sleep(DELAY) | ||
|
||
# Teardown | ||
controller.request_exit() | ||
|
||
detection_in_queue.fill_and_drain_queue() | ||
|
||
worker.join() | ||
|
||
return 0 | ||
|
||
|
||
if __name__ == "__main__": | ||
result_main = main() | ||
if result_main < 0: | ||
print(f"ERROR: Status code: {result_main}") | ||
|
||
print("Done!") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
""" | ||
Test for clustering module. | ||
""" | ||
|
||
import pytest | ||
|
||
from modules import lidar_detection | ||
from modules.clustering import clustering | ||
|
||
MAX_CLUSTER_DISTANCE = 0.5 # metres | ||
|
||
# pylint: disable=redefined-outer-name | ||
|
||
|
||
@pytest.fixture() | ||
def clustering_maker() -> clustering.Clustering: # type: ignore | ||
""" | ||
Construct a clustering instance with predefined max cluster distance limit. | ||
""" | ||
clustering_instance = clustering.Clustering(MAX_CLUSTER_DISTANCE) | ||
yield clustering_instance | ||
|
||
|
||
@pytest.fixture() | ||
def cluster_member_1() -> lidar_detection.LidarDetection: # type: ignore | ||
""" | ||
Creates a LidarDetection that should be clustered with cluster_member_2 and cluster_member_3. | ||
""" | ||
result, detection = lidar_detection.LidarDetection.create(1.0, -7.0) | ||
assert result | ||
assert detection is not None | ||
yield detection | ||
|
||
|
||
@pytest.fixture() | ||
def cluster_member_2() -> lidar_detection.LidarDetection: # type: ignore | ||
""" | ||
Creates a LidarDetection that should be clustered with cluster_member_1 and cluster_member_3. | ||
""" | ||
result, detection = lidar_detection.LidarDetection.create(1.0, -9.0) | ||
assert result | ||
assert detection is not None | ||
yield detection | ||
|
||
|
||
@pytest.fixture() | ||
def cluster_member_3() -> lidar_detection.LidarDetection: # type: ignore | ||
""" | ||
Creates a LidarDetection that should be clustered with cluster_member_1 and cluster_member_2. | ||
""" | ||
result, detection = lidar_detection.LidarDetection.create(1.0, -11.0) | ||
assert result | ||
assert detection is not None | ||
yield detection | ||
|
||
|
||
@pytest.fixture() | ||
def cluster_outsider() -> lidar_detection.LidarDetection: # type: ignore | ||
""" | ||
Creates a LidarDetection that should be clustered on its own. | ||
""" | ||
result, detection = lidar_detection.LidarDetection.create(3.0, -13.0) | ||
assert result | ||
assert detection is not None | ||
yield detection | ||
|
||
|
||
class TestClustering: | ||
""" | ||
Test for the Clustering.run() method. | ||
""" | ||
|
||
def test_clustering( | ||
self, | ||
clustering_maker: clustering.Clustering, | ||
cluster_member_1: lidar_detection.LidarDetection, | ||
cluster_member_2: lidar_detection.LidarDetection, | ||
cluster_member_3: lidar_detection.LidarDetection, | ||
cluster_outsider: lidar_detection.LidarDetection, | ||
) -> None: | ||
""" | ||
Test clustering module. | ||
""" | ||
expected = None | ||
result, cluster = clustering_maker.run(cluster_member_1) | ||
assert not result | ||
assert cluster == expected | ||
|
||
expected = None | ||
result, cluster = clustering_maker.run(cluster_member_2) | ||
assert not result | ||
assert cluster == expected | ||
|
||
expected = None | ||
result, cluster = clustering_maker.run(cluster_member_3) | ||
assert not result | ||
assert cluster == expected | ||
|
||
expected = 3 | ||
result, cluster = clustering_maker.run(cluster_outsider) | ||
assert result | ||
assert cluster is not None | ||
assert len(cluster.detections) == expected |