Skip to content

Integrate method to anticipate future agent location #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: strategies
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

def scenario(world):
defense_size = 30
offense_size = 60
offense_size = 90

world.add_location(tuple(defense_center), [0, 0, 255, 1])

Expand Down
176 changes: 159 additions & 17 deletions gym-swarm-sim/envs/swarmsimmaster/components/solution/svs_test_sol.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
defense_center = [-10, 0, 0]
offense_center = [10, 0, 0]

k_max_concurrent_targets = 1
k_target_rad = 0.25
k_max_defender_kills = 3

def solution(world, stats=None):
global terminated

Expand Down Expand Up @@ -44,7 +48,7 @@ def solution(world, stats=None):
stats.increment_losses()
else:
# defense policy applied
def_p4(world, defense_drones, offense_drones)
def_p4_intercept(world, defense_drones, offense_drones)

# check deaths in offense after defense moves
proximity_death_check(world, defense_drones, offense_drones, lambda d : 1 - 3*d)
Expand Down Expand Up @@ -74,7 +78,7 @@ def oob_check(world, defense_drones, offense_drones):
death_routine(o)
else:
break

for d in sorted_defense:
dist = np.linalg.norm(np.array(d.coordinates) - np.array(defense_center))
if (dist > 40):
Expand Down Expand Up @@ -130,7 +134,7 @@ def proximity_death_check(world, defense_drones, offense_drones, chance_func):
# death check for offense
for a in offense_drones:
_, near_defense = get_local_agents(world, a, offense_drones, defense_drones)

# check if dead
for d in near_defense:
distance = np.linalg.norm(np.array(d.coordinates) - np.array(a.coordinates))
Expand All @@ -149,9 +153,13 @@ def proximity_death_check(world, defense_drones, offense_drones, chance_func):
# defense death check 1: defenders die after eliminating `n` attackers
def def_death_check1(world, defense_drones, offense_drones):
for a in defense_drones:
if (a.kills) >= 3:
if (a.kills) >= k_max_defender_kills:
death_routine(a)

def off_p0(world, defense_drones, offense_drones):
for a in offense_drones:
move_toward(a, (0, 10, 0))

# off policy 1: go to center directly
def off_p1(world, defense_drones, offense_drones):
for a in offense_drones:
Expand Down Expand Up @@ -212,6 +220,40 @@ def off_p3(world, defense_drones, offense_drones):
else:
move_toward(a, defense_center)

def off_p4(world, defense_drones, offense_drones):
"""Offense Policy 4: Make half of the defense drones be equidistant from two closest defense drones. Other half should go towards the target"""
obj_vec = np.array(defense_center)
dist_to_defense_limit = 2
counter = 0
dodge_angle = lambda x: min(90, max(80, 3 * 90 / x))
crit_dist = 1
dive_dist = 4
crit_locked_on = 1
dodge_dist = 4
num_closest_drones = 5

for offense in offense_drones:
closest_drones_arr = []
offense_vec = np.array(offense.coordinates)
sorted_defense = sorted(defense_drones, key=lambda defense: np.linalg.norm(np.array(defense.coordinates) - offense_vec))
for i in range(min(num_closest_drones, len(defense_drones))):
closest_drones_arr.append(np.array(sorted_defense[i].coordinates))
closest_def_vec = np.array(sorted_defense[0].coordinates)
second_closest_def_vec = np.array(sorted_defense[1].coordinates)
obj_dist = np.linalg.norm(obj_vec - offense_vec)
# if close enough to objective or at least closer to objective than closest defense drone, move toward objective
if obj_dist < dive_dist or obj_dist < np.linalg.norm(obj_vec - closest_def_vec):
move_toward(offense, defense_center)
# if there are more than CRIT_LOCKED_ON defense drones within a CRIT_DIST distance, start moving away
elif num_in_range(offense, sorted_defense, crit_dist) >= crit_locked_on:
move_toward(offense, 2 * offense_vec - closest_def_vec)
#make the offense go to the midpoint of closest_def_vec and second_closest_def_vec
elif np.linalg.norm(closest_def_vec - offense_vec) <= dodge_dist:
mid_loc_between_defense_drones = sum(closest_drones_arr) / len(closest_drones_arr)
move_toward(offense, mid_loc_between_defense_drones)
else:
move_toward(offense, defense_center)

#######
def dodge_vec(agent, sorted_agents, angle=90):
"""Returns a vector to dodge towards.
Expand Down Expand Up @@ -291,11 +333,6 @@ def def_p3(world, defense_drones, offense_drones):
# with at most `n` other defenders targeting
# if all targeted, go to the closest attacker from center
def def_p3point5(world, defense_drones, offense_drones):
# Tweakable parameters
n = 1
target_rad = 0.25
###

targetted = {}

for a in defense_drones:
Expand All @@ -306,7 +343,7 @@ def def_p3point5(world, defense_drones, offense_drones):
flag = False
for x in self_sorted_offense:
off_dist = np.linalg.norm(np.array(x.coordinates) - np.array(defense_center))
if (targetted.setdefault(x.coordinates, 0) <= n) and off_dist >= self_dist - target_rad:
if (targetted.setdefault(x.coordinates, 0) <= k_max_concurrent_targets-1) and off_dist >= self_dist - k_target_rad:
flag = True
targetted[x.coordinates] += 1
move_toward(a, x.coordinates)
Expand All @@ -317,31 +354,52 @@ def def_p3point5(world, defense_drones, offense_drones):
closest_offense = sorted_offense[0]
move_toward(a, closest_offense.coordinates)


def def_p3point5_intercept(world, defense_drones, offense_drones):
targetted = {}

for defense in defense_drones:
self_sorted_offense = sorted(offense_drones, key=lambda x: np.linalg.norm(
np.array(x.coordinates) - np.array(defense.coordinates)))

flag = False
for offense in self_sorted_offense:
lookahead = get_intercept(offense, defense)
if (targetted.setdefault(offense.coordinates, 0) <= k_max_concurrent_targets-1) and lookahead is not None \
and np.linalg.norm(np.array(lookahead) - np.array(defense_center)) > k_target_rad:
flag = True
targetted[offense.coordinates] += 1
move_toward(defense, lookahead)
break
if not flag:
sorted_offense = sorted(offense_drones,
key=lambda x: np.linalg.norm(np.array(x.coordinates) - np.array(defense_center)))
closest_offense = sorted_offense[0]
move_toward(defense, closest_offense.coordinates)

# def policy 4: half the drones follow def policy 3.5 as usual
# other half only attack drones within `radius` of the center (following def policy 3.5)
def def_p4(world, defense_drones, offense_drones):
# Tweakable parameters
def_radius = 3
n = 2
target_rad = 0.25
###

patrollers = []
targetted = {}

for i in range(len(defense_drones)//2):
patrollers.append(defense_drones[i])

near_offense = list(filter(lambda x: (np.linalg.norm(np.array(x.coordinates) - np.array(defense_center))) < def_radius, offense_drones))
far_offense = list(filter(lambda x: (np.linalg.norm(np.array(x.coordinates) - np.array(defense_center))) >= def_radius, offense_drones))

for a in patrollers:
sorted_near_offense = sorted(near_offense, key=lambda x: np.linalg.norm(
np.array(x.coordinates) - np.array(a.coordinates)))

flag = False
for x in sorted_near_offense:
if (targetted.setdefault(x.coordinates, 0) <= n) and np.linalg.norm(np.array(x.coordinates) - np.array(defense_center)) >= np.linalg.norm(np.array(a.coordinates) - np.array(defense_center)) - target_rad:
if (targetted.setdefault(x.coordinates, 0) <= k_max_concurrent_targets-1) and np.linalg.norm(np.array(x.coordinates) - np.array(defense_center)) >= np.linalg.norm(np.array(a.coordinates) - np.array(defense_center)) - k_target_rad:
flag = True
targetted[x.coordinates] += 1
move_toward(a, x.coordinates)
Expand All @@ -357,6 +415,46 @@ def def_p4(world, defense_drones, offense_drones):
else:
def_p3point5(world, defense_drones[len(patrollers):], near_offense)

# def policy 4: half the drones follow def policy 3.5 as usual
# other half only attack drones within `radius` of the center (following def policy 3.5)
def def_p4_intercept(world, defense_drones, offense_drones):
# Tweakable parameters
def_radius = 3
###

patrollers = []
targetted = {}

for i in range(len(defense_drones)//2):
patrollers.append(defense_drones[i])

near_offense = list(filter(lambda x: (np.linalg.norm(np.array(x.coordinates) - np.array(defense_center))) < def_radius, offense_drones))
far_offense = list(filter(lambda x: (np.linalg.norm(np.array(x.coordinates) - np.array(defense_center))) >= def_radius, offense_drones))

for patroller in patrollers:
sorted_near_offense = sorted(near_offense, key=lambda x: np.linalg.norm(
np.array(x.coordinates) - np.array(patroller.coordinates)))

flag = False
for offense in sorted_near_offense:
lookahead = get_intercept(offense, patroller)
if (targetted.setdefault(offense.coordinates, 0) <= k_max_concurrent_targets-1) and lookahead is not None \
and np.linalg.norm(np.array(lookahead) - np.array(defense_center)) > k_target_rad:
flag = True
targetted[offense.coordinates] += 1
move_toward(patroller, lookahead)
break
if not flag:
if patroller.coordinates[0] <= defense_center[0]:
move_toward(patroller, [defense_center[0] + 1, patroller.coordinates[1], patroller.coordinates[2]])
else:
move_threshold(patroller, defense_center, 0, 1)

if far_offense:
def_p3point5_intercept(world, defense_drones[len(patrollers):], far_offense)
else:
def_p3point5_intercept(world, defense_drones[len(patrollers):], near_offense)

def get_drones(world, clr):
lst = []
for a in world.get_agent_list():
Expand All @@ -377,6 +475,50 @@ def move_toward(agent, target, thres=0.001):
return True
return False

def in_range(actual, target, threshold):
return abs(actual - target) <= threshold


# anticipate moving drone's location and calculate intercept loc TODO scale by speed
# NOTE: current implementation only works if same speed
def get_intercept(offense, defense, deadband=0.0001):
"""
:param offense: offense drone to target
:param defense: defense drone targeting offense
:param deadband: threshold for ignoring value
:return: intercept location
"""
if in_range(offense.get_velocities()[0], 0, deadband) and in_range(offense.get_velocities()[1], 0, deadband):
return offense.coordinates

# m2(y - b) = m1(x - a)
x_coeff_offense = offense.get_velocities()[1]
y_coeff_offense = offense.get_velocities()[0]

x_coeff_intersect = offense.coordinates[0] - defense.coordinates[0]
y_coeff_intersect = - (offense.coordinates[1] - defense.coordinates[1])

if in_range(y_coeff_intersect/x_coeff_intersect, y_coeff_offense/x_coeff_offense, deadband):
return None

c_intersect = np.array([[(offense.coordinates[0] + defense.coordinates[0]) / 2],
[(offense.coordinates[1] + defense.coordinates[1]) / 2]], dtype="float")

dep1 = y_coeff_offense * offense.coordinates[1] - x_coeff_offense * offense.coordinates[0]
dep2 = y_coeff_intersect * c_intersect[1][0] - x_coeff_intersect * c_intersect[0][0]

mat_coeff = np.array([[-x_coeff_offense, y_coeff_offense], [-x_coeff_intersect, y_coeff_intersect]], dtype="float")
mat_dep = np.array([[dep1], [dep2]], dtype="float")

# calculate intercept between equidistant line and offense vel vector
ret = np.linalg.solve(mat_coeff, mat_dep)

# check if solution lies in +dt
if (np.sign(offense.get_velocities()[0]) == np.sign(ret[0] - offense.coordinates[0])
and np.sign(offense.get_velocities()[1]) == np.sign(ret[1] - offense.coordinates[1])):
return tuple([ret[0][0], ret[1][0], 0])
return None

# enforce speed limit
def speed_limit(speed_vec, agent):
speed = np.linalg.norm(speed_vec)
Expand Down Expand Up @@ -445,5 +587,5 @@ def get_local_agents(world, agent, offense_drones, defense_drones, disk_range=DI
dist, comm_range = communication_model(x1, y1, x2, y2, comms_model=comms_model, DISK_RANGE_M = disk_range)
if (comm_range):
local_defense.append(d)
return local_offense, local_defense

return local_offense, local_defense
2 changes: 2 additions & 0 deletions gym-swarm-sim/envs/swarmsimmaster/core/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(self, world, coordinates, color, agent_counter=0):
self.carried_item = None
self.carried_agent = None
self.steps = 0
self.velocities = (0.0,) * 3
csv_generator_module = importlib.import_module('components.generators.csv.%s' % world.config_data.csv_generator)
self.csv_agent_writer = csv_generator_module.CsvAgentData(self.get_id(), self.number)

Expand Down Expand Up @@ -75,6 +76,7 @@ def move_to(self, direction):
:param direction: The direction is defined by loaded grid class
:return: True: Success Moving; False: Non moving
"""
self.velocities = direction
direction_coord = get_coordinates_in_direction(self.coordinates, direction)
direction_coord = self.check_within_border(direction, direction_coord)
if self.world.grid.are_valid_coordinates(direction_coord) \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
class VeloAgent(agent.Agent):
def __init__(self, world, coordinates, color, agent_counter=0, velocities = None):
super().__init__(world, coordinates, color)
self.velocities = (0.0,) * 3 # self.world.grid.get_dimension_count()
self.neighbors = []

# change in time is one round
Expand Down Expand Up @@ -45,4 +44,7 @@ def set_velocities(self, new_velocities):

# adds to the velocities.
def add_velocities(self, dv):
self.velocities = tuple(np.add(self.velocities, dv))
self.velocities = tuple(np.add(self.velocities, dv))

def get_velocities(self):
return tuple(self.velocities)