Add sweep module
The sweep module contains a sweep class and a class for the nearest neighbor sweep alorithm.
This commit is contained in:
136
sweep.py
Normal file
136
sweep.py
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
#!/usr/bin/python3
|
||||||
|
|
||||||
|
"""The sweep module defines a base class sweep for various sweep algorithms. It also contains a
|
||||||
|
child-class for each individual algorithm."""
|
||||||
|
|
||||||
|
import copy
|
||||||
|
|
||||||
|
import linesegment
|
||||||
|
import point
|
||||||
|
|
||||||
|
class Sweep():
|
||||||
|
|
||||||
|
"The Sweep class is the base class for each individual sweep algorithm"
|
||||||
|
|
||||||
|
# pylint: disable=too-many-instance-attributes
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._sss = None # The sweep status structure
|
||||||
|
self._es = None # The event structure
|
||||||
|
self._input = None # The input values
|
||||||
|
self._ready = False # If the sweep is reade to be executed
|
||||||
|
self._running = False # If the sweep is currently running
|
||||||
|
self._num_steps = {} # The number of steps that were needed to solve the problem
|
||||||
|
self._result = None # The result of the sweep
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""Start the performing the algorithm. Return True if it was successful"""
|
||||||
|
if not self._ready:
|
||||||
|
print("Error: Sweep is not yet ready to be performed")
|
||||||
|
return False
|
||||||
|
self._running = True
|
||||||
|
return True
|
||||||
|
|
||||||
|
class SweepNearestNeighbors(Sweep):
|
||||||
|
|
||||||
|
"""Calculate the pair of points inside a set, that have the minimal distance between each
|
||||||
|
other inside this set"""
|
||||||
|
|
||||||
|
def __init__(self, point_set=None):
|
||||||
|
super().__init__()
|
||||||
|
if isinstance(point_set, list):
|
||||||
|
for pnt in point_set:
|
||||||
|
if not isinstance(pnt, point.Point):
|
||||||
|
raise Exception(self, "Inputs have to be of type point.Point!")
|
||||||
|
self._input = copy.deepcopy(point_set)
|
||||||
|
self._ready = True
|
||||||
|
elif point_set is not None:
|
||||||
|
raise Exception(self, "Wrong input type for point_set")
|
||||||
|
self._sss = []
|
||||||
|
self._es = []
|
||||||
|
self._result = {"Success": False, \
|
||||||
|
"MinDistSoFar": None, \
|
||||||
|
"Startpoint": None, \
|
||||||
|
"Endpoint": None}
|
||||||
|
|
||||||
|
def exec(self):
|
||||||
|
"""Execute the sweep. Returns True on success and False on failure."""
|
||||||
|
if not self.start():
|
||||||
|
return False
|
||||||
|
|
||||||
|
while self._running:
|
||||||
|
self.step()
|
||||||
|
|
||||||
|
return self._result
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""Start the performing the algorithm. Return True if it was successful"""
|
||||||
|
if not super().start():
|
||||||
|
return False
|
||||||
|
|
||||||
|
self._result["Success"] = False
|
||||||
|
self._sss = []
|
||||||
|
# Populate event structure
|
||||||
|
self._es = copy.deepcopy(self._input)
|
||||||
|
# Sort the input elements
|
||||||
|
self._num_steps["sort"] = point.sort_point_set(self._es)
|
||||||
|
self._num_steps["sweep"] = 0
|
||||||
|
return True
|
||||||
|
|
||||||
|
def step(self):
|
||||||
|
"""Perform a single step of the algorithm."""
|
||||||
|
if not self._running:
|
||||||
|
return
|
||||||
|
|
||||||
|
event = self._es.pop(0)
|
||||||
|
|
||||||
|
if self._result["MinDistSoFar"] is not None:
|
||||||
|
while len(self._sss) > 0 and \
|
||||||
|
self._sss[0].get_x() <= (event.get_x() - self._result["MinDistSoFar"]):
|
||||||
|
del self._sss[0]
|
||||||
|
for pnt in self._sss:
|
||||||
|
if self._result["MinDistSoFar"] is None:
|
||||||
|
self._result["MinDistSoFar"] = linesegment.LineSegment(pnt, event).get_length()
|
||||||
|
elif (event.get_y() - self._result["MinDistSoFar"]) < \
|
||||||
|
pnt.get_y() < (event.get_y() + self._result["MinDistSoFar"]):
|
||||||
|
distance = linesegment.LineSegment(pnt, event).get_length()
|
||||||
|
if self._result["MinDistSoFar"] > distance > 0:
|
||||||
|
self._result["MinDistSoFar"] = distance
|
||||||
|
self._result["Startpoint"] = pnt
|
||||||
|
self._result["Endpoint"] = event
|
||||||
|
self._num_steps["sweep"] += 1
|
||||||
|
|
||||||
|
self._sss.append(event)
|
||||||
|
|
||||||
|
if len(self._es) == 0:
|
||||||
|
self._result["Success"] = True
|
||||||
|
self._running = False
|
||||||
|
|
||||||
|
def get_result_string(self, print_result=False):
|
||||||
|
"""Pack the result into a string."""
|
||||||
|
result_string = ""
|
||||||
|
if self._running:
|
||||||
|
result_string = "Sweep is still running."
|
||||||
|
elif not self._result["Success"]:
|
||||||
|
result_string = "Sweep was not successful."
|
||||||
|
else:
|
||||||
|
result_string = "Sweep successfully completed\n"
|
||||||
|
result_string += " Summary:\n"
|
||||||
|
result_string += " Number of Inputs:" + str(len(self._input)) + "\n"
|
||||||
|
result_string += " Inputs: ["
|
||||||
|
for pnt in self._input:
|
||||||
|
result_string += pnt.str() + ", "
|
||||||
|
result_string += "]\n"
|
||||||
|
result_string += " Nearest Neighbors: "
|
||||||
|
result_string += linesegment.LineSegment(self._result["Startpoint"], \
|
||||||
|
self._result["Endpoint"]).str()
|
||||||
|
result_string += "\n"
|
||||||
|
result_string += " Distance: " + str(self._result["MinDistSoFar"]) + "\n"
|
||||||
|
result_string += " Number of steps:\n"
|
||||||
|
result_string += " Sort:" + str(self._num_steps["sort"]) +"\n"
|
||||||
|
result_string += " Sweep:" + str(self._num_steps["sweep"]) +"\n"
|
||||||
|
|
||||||
|
|
||||||
|
if print_result:
|
||||||
|
print(result_string)
|
||||||
|
return result_string
|
||||||
Reference in New Issue
Block a user