Skip to content

Commit

Permalink
minor performance improvements and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
kasperg3 committed Mar 5, 2024
1 parent ea1f997 commit 106abaa
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 291 deletions.
90 changes: 17 additions & 73 deletions trajallocpy/ACBBA.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import itertools
import math

Check failure on line 3 in trajallocpy/ACBBA.py

View workflow job for this annotation

GitHub Actions / test (3.10)

Ruff (F401)

trajallocpy/ACBBA.py:3:8: F401 `math` imported but unused
import random
import time
Expand Down Expand Up @@ -105,85 +106,28 @@ def getCij(self):
c = 0
reverse = None
best_task = None
# try all tasks
for j, task in self.tasks.items():
# If already in the bundle list
if j in self.bundle or self.removal_list.get(j, 0) > self.removal_threshold:
continue # Do not include if already in the bundle or if the removal threshold is exceeded
else:
for n in range(len(self.path) + 1):
S_pj, should_be_reversed = calculatePathRewardWithNewTask(
j, n, self.state, self.tasks, self.path, self.environment, self.use_single_point_estimation
)
c_ijn = S_pj - S_p

if c_ijn > c and c_ijn > self.y.get(j, -1): # TODO: Figure out if this is bad for the general solution
c = c_ijn # Store the cost
best_pos = n
reverse = should_be_reversed
best_task = j
# Collect the tasks which should be considered for planning
keys_above_threshold = [key for key, value in self.removal_list.items() if value > self.removal_threshold]
tasks_to_check = set(self.tasks.keys()).difference(self.bundle).difference(keys_above_threshold)
# Combine the tasks and positions to check

for n, j in itertools.product(range(len(self.path) + 1), tasks_to_check):
S_pj, should_be_reversed = calculatePathRewardWithNewTask(
j, n, self.state, self.tasks, self.path, self.environment, self.use_single_point_estimation
)
c_ijn = S_pj - S_p

if c_ijn > c: # and c_ijn > self.y.get(j, -1): # TODO: Figure out if this is bad for the general solution
c = c_ijn # Store the cost
best_pos = n
reverse = should_be_reversed
best_task = j
# reverse the task with max reward if necesarry
if reverse:
self.tasks[j].reverse()

return best_task, best_pos, c

# TODO this might be worth looking more into.
# def getCij(self):
# # Calculate Sp_i
# S_p = calculatePathReward(self.state, self.getPathTasks(), self.environment, self.Lambda)
# # init
# best_pos = None
# c = 0
# reverse = None
# best_task = None
# executor_tasks = []

# for j, task in self.tasks.items():
# if j in self.bundle or self.removal_list.get(j, 0) > self.removal_threshold:
# continue # Do not include if already in the bundle or if the removal threshold is exceeded
# else:
# for n in range(len(self.path) + 1):
# executor_tasks.append((j, n))

# for test in executor_tasks:
# j, n = test
# S_pj, should_be_reversed = calculatePathRewardWithNewTask(j, n, self.state, self.tasks, self.path, self.environment)
# c_ijn = S_pj - S_p
# if c_ijn > c and c_ijn > self.y.get(j, -1):
# c = c_ijn
# best_pos = n
# reverse = should_be_reversed
# best_task = j

# # reverse the task with max reward if necesarry
# if reverse:
# self.tasks[j].reverse()

# return best_task, best_pos, c

###
# with ThreadPoolExecutor() as executor:
# executor_tasks = []
# for j, task in self.tasks.items():
# if j in self.bundle or self.removal_list.get(j, 0) > self.removal_threshold:
# continue # Do not include if already in the bundle or if the removal threshold is exceeded
# else:
# for n in range(len(self.path) + 1):
# executor_tasks.append((j, n, self.state, self.tasks, self.path, None, self.use_single_point_estimation))
# for j, n, S_pj, should_be_reversed in executor.map(calculate_and_return, *zip(*executor_tasks)):
# c_ijn = S_pj - S_p
# if c_ijn > c and c_ijn > self.y.get(j, -1):
# c = c_ijn
# best_pos = n
# reverse = should_be_reversed
# best_task = j
# # reverse the task with max reward if necessary
# if reverse:
# self.tasks[best_task].reverse()
# return best_task, best_pos, c

def build_bundle(self):
if self.tasks is None:
return
Expand All @@ -206,7 +150,7 @@ def __update_time(self, task):
self.t[task] = time.monotonic()

def __action_rule(self, k, j, task, z_kj, y_kj, t_kj, z_ij, y_ij, t_ij) -> BidInformation:
eps = 20
eps = np.finfo(float).eps
i = self.id
sender_info = BidInformation(y=y_kj, z=z_kj, t=t_kj, j=j, k=self.id)
own_info = BidInformation(y=y_ij, z=z_ij, t=t_ij, j=j, k=self.id)
Expand Down
4 changes: 4 additions & 0 deletions trajallocpy/Agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from multiprocessing import Pool
from typing import List

import numpy as np

from trajallocpy.Task import TrajectoryTask


Expand Down Expand Up @@ -66,11 +68,13 @@ def getTravelPath(position, assigned_tasks, environment):
return full_path


@cache
def getTravelCost(start, end, environment):
return distanceToCost(getDistance(start, end, environment))


def getTimeDiscountedReward(cost, Lambda, task: TrajectoryTask):
# return np.exp((Lambda - 1) * cost) * task.reward
return Lambda ** (cost) * task.reward


Expand Down
226 changes: 16 additions & 210 deletions trajallocpy/CBBA.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import itertools
import math
import random
from functools import cache
Expand Down Expand Up @@ -94,22 +95,20 @@ def getCij(self):
best_pos = np.zeros(self.task_num, dtype=int)
c = np.zeros(self.task_num)
reverse = np.zeros(self.task_num)
# try all tasks
for j in range(self.task_num):
# If already in bundle list
if j in self.bundle or self.removal_list[j] > self.removal_threshold:
c[j] = 0 # Minimum Score
else:
# for each j calculate the path reward at each location in the local path
for n in range(len(self.path) + 1):
S_pj, should_be_reversed = Agent.calculatePathRewardWithNewTask(
j, n, self.state, self.tasks, self.path, self.environment, self.use_single_point_estimation
)
c_ijn = S_pj - S_p
if c[j] <= c_ijn:
c[j] = c_ijn # Store the cost
best_pos[j] = n
reverse[j] = should_be_reversed

# Collect the tasks which should be considered for planning
ignore_tasks = [key for key, value in enumerate(self.removal_list) if value > self.removal_threshold]
tasks_to_check = set(range(len(self.tasks))).difference(self.bundle).difference(ignore_tasks)

for n, j in itertools.product(range(len(self.path) + 1), tasks_to_check):
S_pj, should_be_reversed = Agent.calculatePathRewardWithNewTask(
j, n, self.state, self.tasks, self.path, self.environment, self.use_single_point_estimation
)
c_ijn = S_pj - S_p
if c[j] <= c_ijn:
c[j] = c_ijn # Store the cost
best_pos[j] = n
reverse[j] = should_be_reversed

return (best_pos, c, reverse)

Expand Down Expand Up @@ -269,6 +268,7 @@ def update_task(self):

n_bar = len(self.bundle)
# Get n_bar
# TODO this can be done in each bundle start instead of here
for n in range(len(self.bundle)):
b_n = self.bundle[n]
if self.winning_agents[b_n] != self.id and n_bar > n:
Expand Down Expand Up @@ -318,197 +318,3 @@ def __leave(self):
Do nothing
"""
pass


#!/usr/bin/env python3
import math
from dataclasses import dataclass
from functools import cache
from multiprocessing import Pool
from typing import List

from trajallocpy.Task import TrajectoryTask


@dataclass
class config:
id: int
position: list
capacity: int # time in seconds
max_velocity: float = 3 # m/s
max_acceleration: float = 1 # m/s^2


# class Agent:


@dataclass
class BidInformation:
y: float
z: int
t: float
j: int
k: int
# winning_score: float
# winning_agent: int
# timestamp: float
# task_id: int
# sender_id: int


def distanceToCost(dist, max_velocity=3, max_acceleration=1):
# Velocity ramp
d_a = (max_velocity**2) / max_acceleration
result = math.sqrt(4 * dist / max_acceleration) if dist < d_a else max_velocity / max_acceleration + dist / max_velocity
return result


@cache
def getDistance(start, end, environment=None):
# If there is no environment defined, use euclidean
if environment is None:
# This is a optimised way of calculating euclidean distance: https://stackoverflow.com/questions/37794849/efficient-and-precise-calculation-of-the-euclidean-distance
dist = [(a - b) ** 2 for a, b in zip(start, end)]
dist = math.sqrt(sum(dist))
else:
path, dist = environment.find_shortest_path(start, end, free_space_after=False, verify=False)
return dist


def getTravelPath(position, assigned_tasks, environment):
full_path = []
if len(assigned_tasks) > 0:
path, dist = environment.find_shortest_path(position, assigned_tasks[0].start, free_space_after=False, verify=False)
full_path.extend(path)
for i in range(len(assigned_tasks) - 1):
full_path.extend(assigned_tasks[i].trajectory.coords)
path, dist = environment.find_shortest_path(assigned_tasks[i].end, assigned_tasks[i + 1].start, free_space_after=False, verify=False)
full_path.extend(path)
full_path.extend(assigned_tasks[-1].trajectory.coords)
return full_path


def getTravelCost(start, end, environment):
return distanceToCost(getDistance(start, end, environment))


def getTimeDiscountedReward(cost, Lambda, task: TrajectoryTask):
return Lambda ** (cost) * task.reward


def getMinTravelCost(point, task: TrajectoryTask, environment):
result = getTravelCost(point, task.start, environment)
distance_to_end = getTravelCost(point, task.end, environment)
shouldBeReversed = False
if result > distance_to_end:
result = distance_to_end
shouldBeReversed = True
return result, shouldBeReversed


def calculatePathRewardWithNewTask(j, n, state, tasks, path, environment, use_single_point_estimation=False, Lambda=0.95):
temp_path = list(path)
temp_path.insert(n, j)
# print(j)
is_reversed = False
# travel cost to first task
travel_cost = getTravelCost(state, tasks[temp_path[0]].start, environment)
S_p = getTimeDiscountedReward(travel_cost, Lambda, tasks[temp_path[0]])

# Use a single point instead of greedily optimising the direction
for p_idx in range(len(temp_path) - 1):
previous_task = tasks[temp_path[p_idx]]
next_task = tasks[temp_path[p_idx + 1]]
if use_single_point_estimation:
travel_cost += getTravelCost(previous_task.end, next_task.start)
else:
if p_idx == n - 1:
# The task is inserted at n, when evaluating the task use n-1 to determine whether it should be reversed
temp_cost, is_reversed = getMinTravelCost(previous_task.end, next_task, environment)
travel_cost += temp_cost

if p_idx == n:
# the task after has to use the is_reversed bool to determine where to travel from
if is_reversed:
travel_cost += getTravelCost(previous_task.end, next_task.start, environment)
else:
travel_cost += getTravelCost(previous_task.end, next_task.start, environment)
else:
travel_cost += getTravelCost(previous_task.end, next_task.start, environment)
# Scale the travelcost with the reward/priority
S_p += getTimeDiscountedReward(travel_cost, Lambda, next_task)

# Add the cost for returning home
travel_cost += getTravelCost(tasks[temp_path[-1]].end, state, environment)
S_p += getTimeDiscountedReward(travel_cost, Lambda, tasks[temp_path[-1]])
return S_p, is_reversed


def calculate_and_return(j, n, state, tasks, path, environment, use_single_point_estimation):
S_pj, should_be_reversed = calculatePathRewardWithNewTask(j, n, state, tasks, path, environment, use_single_point_estimation)
return j, n, S_pj, should_be_reversed


# This is only used for evaluations!
def getTotalPathLength(position, task_list, environment):
total_length = 0
if len(task_list) != 0:
# Add the cost of travelling to the first task
total_length = getDistance(position, task_list[0].start, environment)
# The cost of travelling between tasks
for t_index in range(len(task_list) - 1):
total_length += getDistance(task_list[t_index].end, task_list[t_index + 1].start, environment)
# The cost of executing the task
for t_index in range(len(task_list)):
total_length += task_list[t_index].length
# Add the cost of returning home
total_length += getDistance(position, task_list[-1].end, environment)
return total_length


def getTotalTaskLength(task_list):
task_length = 0
for t_index in range(len(task_list)):
task_length += task_list[t_index].length
return task_length


def getTotalTravelCost(position, task_list: List[TrajectoryTask], environment):
total_cost = 0
if len(task_list) != 0:
# Add the cost of travelling to the first task
total_cost = getTravelCost(position, task_list[0].start, environment)
# The cost of travelling between tasks
for t_index in range(len(task_list) - 1):
total_cost += getTravelCost(task_list[t_index].end, task_list[t_index + 1].start, environment)
# The cost of executing the task
for t_index in range(len(task_list)):
total_cost += distanceToCost(task_list[t_index].length)
# Add the cost of returning home
total_cost += getTravelCost(position, task_list[-1].end, environment)
return total_cost


# S_i calculation of the agent
def calculatePathReward(position, task_list: List[TrajectoryTask], environment, Lambda=0.95):
S_p = 0

if len(task_list) > 0:
travel_cost = getTravelCost(position, task_list[0].start, environment)
S_p += getTimeDiscountedReward(travel_cost, Lambda, task_list[0])
for t_index in range(len(task_list) - 1):
travel_cost += getTravelCost(task_list[t_index].end, task_list[t_index + 1].start, environment)
S_p += getTimeDiscountedReward(travel_cost, Lambda, task_list[t_index + 1])
return S_p


def getTrajectory(task_list: List[TrajectoryTask]):
trajectory = []
if len(task_list) > 0:
trajectory.append(task_list[0].start)
for t_index in range(len(task_list) - 1):
trajectory.extend(task_list[t_index].trajectory.coords)
trajectory.append(task_list[t_index].end)
trajectory.extend(task_list[-1].trajectory.coords)
trajectory.append(task_list[-1].end)
return trajectory
Loading

0 comments on commit 106abaa

Please # to comment.