Skip to content

Commit

Permalink
added multi threading
Browse files Browse the repository at this point in the history
  • Loading branch information
kasperg3 committed Mar 21, 2024
1 parent 106abaa commit 272271a
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 80 deletions.
37 changes: 29 additions & 8 deletions trajallocpy/Agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,31 @@ def getMinTravelCost(point, task: TrajectoryTask, environment):
return result, shouldBeReversed


def test_calculatePathRewardWithNewTask(environment, agent, taskCurr, taskPrev, timePrev, taskNext, timeNext, Lambda):
# TODO
# * Keep track of the time/distance of each task in the path and store it in a vector, just like the path
# * Iterate through the vector and calculate

if taskPrev == None: # First task in the path
dt = getMinTravelCost(agent.state, taskCurr, environment)
minStart = max(taskCurr.start_time, agent.availability_time + dt)
else: # Not the first in the task
dt = getMinTravelCost(taskPrev.end, taskCurr, environment)
minStart = max(taskCurr.start_time, timePrev + distanceToCost(taskPrev.length) + dt) # i have to have time to do task at j-1 and go to task m

if taskNext == None:
maxStart = taskCurr.end_time
else: # Not the last task in the path and we can still make the promised task
dt = getMinTravelCost(taskCurr.end, taskNext, environment)
maxStart = min(taskCurr.end_time, timeNext - distanceToCost(taskCurr.length) - dt)

reward = getTimeDiscountedReward(dt, Lambda, taskCurr)
penalty = getTravelCost(agent.state, taskCurr.start, environment)
score = reward - penalty

return score, minStart, maxStart


def calculatePathRewardWithNewTask(j, n, state, tasks, path, environment, use_single_point_estimation=False, Lambda=0.95):
temp_path = list(path)
temp_path.insert(n, j)
Expand All @@ -96,7 +121,7 @@ def calculatePathRewardWithNewTask(j, n, state, tasks, path, environment, use_si
# travel cost to first task
travel_cost = getTravelCost(state, tasks[temp_path[0]].start, environment)
S_p = getTimeDiscountedReward(travel_cost, Lambda, tasks[temp_path[0]])

best_time = 0
# Use a single point instead of greedily optimising the direction
for p_idx in range(len(temp_path) - 1):
previous_task = tasks[temp_path[p_idx]]
Expand All @@ -107,9 +132,10 @@ def calculatePathRewardWithNewTask(j, n, state, tasks, path, environment, use_si
if p_idx == n - 1:
# The task is inserted at n, when evaluating the task use n-1 to determine whether it should be reversed
temp_cost, is_reversed = getMinTravelCost(previous_task.end, next_task, environment)

travel_cost += temp_cost

if p_idx == n:
elif p_idx == n:
# the task after has to use the is_reversed bool to determine where to travel from
if is_reversed:
travel_cost += getTravelCost(previous_task.end, next_task.start, environment)
Expand All @@ -123,12 +149,7 @@ def calculatePathRewardWithNewTask(j, n, state, tasks, path, environment, use_si
# Add the cost for returning home
travel_cost += getTravelCost(tasks[temp_path[-1]].end, state, environment)
S_p += getTimeDiscountedReward(travel_cost, Lambda, tasks[temp_path[-1]])
return S_p, is_reversed


def calculate_and_return(j, n, state, tasks, path, environment, use_single_point_estimation):
S_pj, should_be_reversed = calculatePathRewardWithNewTask(j, n, state, tasks, path, environment, use_single_point_estimation)
return j, n, S_pj, should_be_reversed
return (S_p, is_reversed, best_time)


# This is only used for evaluations!
Expand Down
105 changes: 63 additions & 42 deletions trajallocpy/CBBA.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
import copy
import itertools
import math
import multiprocessing
import random
from functools import cache
from queue import Queue
from typing import List

import numpy as np

from trajallocpy import Agent
from trajallocpy.Task import TrajectoryTask

EPSILON = 1e-6


class BundleResult:
def __init__(self, agent: Agent):
self.bundle = agent.bundle
self.path = agent.path
self.winning_agents = agent.winning_agents
self.winning_bids = agent.winning_bids
self.id = agent.id


class agent:
def __init__(
Expand Down Expand Up @@ -51,6 +64,8 @@ def __init__(
self.bundle = []
# Path
self.path = []
# times: List of time in seconds to each task in the path
self.times = []
# Maximum task capacity
if capacity is None:
raise Exception("Error: agent capacity cannot be None")
Expand All @@ -70,8 +85,17 @@ def __init__(
# socre function parameters
self.Lambda = 0.95

self.availability_time = 0

self.removal_list = np.zeros(self.task_num, dtype=np.int8)
self.removal_threshold = 15
self.removal_threshold = 5

def update_bundle_result(self, state: BundleResult):
if self.id == state.id:
self.bundle = state.bundle
self.path = state.path
self.winning_agents = state.winning_agents
self.winning_bids = state.winning_bids

def add_tasks(self, tasks):
self.tasks.extend(tasks)
Expand All @@ -90,33 +114,40 @@ def getCij(self):
Returns the cost list c_ij for agent i where the position n results in the greatest reward
"""
# Calculate Sp_i
S_p = Agent.calculatePathReward(self.state, self.getPathTasks(), self.environment, self.Lambda)
S_p = Agent.calculatePathReward(self.state, self.getPathTasks(), None, self.Lambda)
# init
best_pos = np.zeros(self.task_num, dtype=int)
c = np.zeros(self.task_num)
reverse = np.zeros(self.task_num)

best_time = 0
# Collect the tasks which should be considered for planning
ignore_tasks = [key for key, value in enumerate(self.removal_list) if value > self.removal_threshold]
tasks_to_check = set(range(len(self.tasks))).difference(self.bundle).difference(ignore_tasks)

for n, j in itertools.product(range(len(self.path) + 1), tasks_to_check):
S_pj, should_be_reversed = Agent.calculatePathRewardWithNewTask(
j, n, self.state, self.tasks, self.path, self.environment, self.use_single_point_estimation
S_pj, should_be_reversed, best_time = Agent.calculatePathRewardWithNewTask(
j, n, self.state, self.tasks, self.path, None, self.use_single_point_estimation
)
c_ijn = S_pj - S_p
if c[j] <= c_ijn:
c[j] = c_ijn # Store the cost
best_pos[j] = n
reverse[j] = should_be_reversed

return (best_pos, c, reverse)
return (best_pos, c, reverse, best_time)

def build_bundle(self):
while Agent.getTotalTravelCost(self.state, self.getPathTasks(), self.environment) <= self.capacity:
best_pos, c, reverse = self.getCij()
h = c > self.winning_bids
def update_time(self, index, time):
self.times.insert(index, time)
# Correct the times after the insertion
for i in range(index + 1, len(self.times)):
self.times[i] += time

def build_bundle(self, queue: multiprocessing.Queue):
while Agent.getTotalTravelCost(self.state, self.getPathTasks(), self.environment) <= self.capacity:
best_pos, c, reverse, best_time = self.getCij()
D1 = c - self.winning_bids > EPSILON
D2 = abs(c - self.winning_bids) <= EPSILON
h = D1 | (D2 & (self.id < self.winning_agents))
if sum(h) == 0: # No valid task
break

Expand All @@ -130,10 +161,13 @@ def build_bundle(self):

self.bundle.append(J_i)
self.path.insert(n_J, J_i)
self.update_time(n_J, best_time)

self.winning_bids[J_i] = c[J_i]
self.winning_agents[J_i] = self.id

queue.put(BundleResult(self))

def update_task(self):
id_list = list(self.Y.keys())
id_list.insert(0, self.id)
Expand Down Expand Up @@ -168,7 +202,7 @@ def update_task(self):
if z_ij == self.id:
if y_kj > y_ij:
self.__update(j, y_kj, z_kj)
elif abs(y_kj - y_ij) < np.finfo(float).eps: # Tie Breaker
elif abs(y_kj - y_ij) < EPSILON: # Tie Breaker
if k < self.id:
self.__update(j, y_kj, z_kj)
else:
Expand All @@ -181,7 +215,7 @@ def update_task(self):
m = z_ij
if (s_k[m] > self.timestamps[m]) or (y_kj > y_ij):
self.__update(j, y_kj, z_kj)
elif abs(y_kj - y_ij) < np.finfo(float).eps and k < self.id: # Tie Breaker
elif abs(y_kj - y_ij) < EPSILON and k < self.id: # Tie Breaker
self.__update(j, y_kj, z_kj)
# Rule 4
elif z_ij == -1:
Expand Down Expand Up @@ -214,7 +248,7 @@ def update_task(self):
if (s_k[m] >= self.timestamps[m]) and (y_kj > y_ij):
self.__update(j, y_kj, z_kj)
# Tie Breaker
elif (s_k[m] >= self.timestamps[m]) and (abs(y_kj - y_ij) < np.finfo(float).eps and m < self.id):
elif (s_k[m] >= self.timestamps[m]) and (abs(y_kj - y_ij) < EPSILON and m < self.id):
self.__update(j, y_kj, z_kj)
# Rule 10
elif z_ij == k:
Expand All @@ -234,7 +268,7 @@ def update_task(self):
elif (s_k[m] > self.timestamps[m]) and (y_kj > y_ij):
self.__update(j, y_kj, z_kj)
# Tie Breaker
elif (s_k[m] > self.timestamps[m]) and (abs(y_kj - y_ij) < np.finfo(float).eps):
elif (s_k[m] > self.timestamps[m]) and (abs(y_kj - y_ij) < EPSILON):
if m < n:
self.__update(j, y_kj, z_kj)
elif (s_k[n] > self.timestamps[n]) and (self.timestamps[m] > s_k[m]):
Expand Down Expand Up @@ -266,52 +300,39 @@ def update_task(self):
else:
raise Exception("Error while updating")

n_bar = len(self.bundle)
# Get n_bar
# TODO this can be done in each bundle start instead of here
for n in range(len(self.bundle)):
b_n = self.bundle[n]
if self.winning_agents[b_n] != self.id and n_bar > n:
n_bar = n # Find the minimum n in the agents bundle

b_idx1 = copy.deepcopy(self.bundle[n_bar + 1 :])

if len(b_idx1) > 0:
self.winning_bids[b_idx1] = 0
self.winning_agents[b_idx1] = -1

tasks_to_delete = self.bundle[n_bar:]

# Keep track of how many times this particular task has been removed
if len(tasks_to_delete) != 0:
self.removal_list[self.bundle[n_bar]] = self.removal_list[self.bundle[n_bar]] + 1

del self.bundle[n_bar:]

self.path = [ele for ele in self.path if ele not in tasks_to_delete]

self.time_step += 1

converged = False
# The agent has converged to a solution of no conflicts has been detected
if len(tasks_to_delete) == 0:
converged = True

return converged

def __update_path(self, task):
if task not in self.bundle:
return
index = self.bundle.index(task)
b_retry = self.bundle[index + 1 :]
for idx in b_retry:
self.winning_bids[idx] = 0
self.winning_agents[idx] = -1

self.removal_list[task] = self.removal_list[task] + 1
self.path = [num for num in self.path if num not in self.bundle[index:]]
self.bundle = self.bundle[:index]

def __update(self, j, y_kj, z_kj):
"""
Update values
"""
self.winning_bids[j] = y_kj
self.winning_agents[j] = z_kj
self.__update_path(j)

def __reset(self, j):
"""
Reset values
"""
self.winning_bids[j] = 0
self.winning_agents[j] = -1 # -1 means "none"
self.__update_path(j)

def __leave(self):
"""
Expand Down
Loading

0 comments on commit 272271a

Please # to comment.