Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vision + scoring state machine #143

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
174 changes: 174 additions & 0 deletions components/score_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from enum import Enum
import numpy as np
import numpy.typing as npt
import wpilib
import magicbot
from utilities.game import Node
from ntcore import NetworkTableInstance

class GridNode(Enum):
CUBE = 0
CONE = 1
HYBRID = 2

class ScoreTracker:
CUBE_MASK = np.array(
[
[0, 1, 0, 0, 1, 0, 0, 1, 0],
[0, 1, 0, 0, 1, 0, 0, 1, 0],
[1, 1, 1, 1, 1, 1, 1, 1, 1],
],
dtype=bool,
)
CONE_MASK = np.array(
[
[1, 0, 1, 1, 0, 1, 1, 0, 1],
[1, 0, 1, 1, 0, 1, 1, 0, 1],
[1, 1, 1, 1, 1, 1, 1, 1, 1],
],
dtype=bool,
)

CONF_EXP_FILTER_ALPHA = 0.8

CONF_THRESHOLD = 0.2

def __init__(self) -> None:
# -1.0 - no piece for certain, 1.0 - a piece for certain, 0.0 - unsure
self.confidences_red: np.ndarray = np.zeros((3, 9), dtype=float)
self.confidences_blue: np.ndarray = np.zeros((3, 9), dtype=float)
self.state_red = np.zeros((3, 9), dtype=bool)
self.state_blue = np.zeros((3, 9), dtype=bool)
self.did_states_change = magicbot.will_reset_to(False)
self.inst = NetworkTableInstance.getDefault()
nt = self.inst.getTable("left_cam")
self.nodes = nt.getEntry("nodes")


def execute(self) -> None:
if not self.nodes.exists():
print("skipping")
return
_data = self.nodes.getStringArray([])
data = self.nt_data_to_node_data(_data)
for node in data:
side = wpilib.DriverStation.Alliance.kBlue if node[0] >= 27 else wpilib.DriverStation.Alliance.kRed
col = node[0] % 9
row = (node[0] % 27) // 9
self.add_vision_data(side=side, pos=np.array([row, col]), confidence=(1. if node[1] else -0.5))

def nt_data_to_node_data(self, data: list[str]) -> list[tuple[int, bool]]:
nodes: list[tuple[int, bool]] = []
for node in data:
as_array = str(node)
a = (int(f"{as_array[0]}{as_array[1]}"), as_array[2] == "1")
nodes.append(a)
return nodes

def add_vision_data(
self, side: wpilib.DriverStation.Alliance, pos: npt.ArrayLike, confidence: float
) -> None:
confidences = (
self.confidences_red if side == side.kRed else self.confidences_blue
)
confidences[pos] = confidences[
pos
] * ScoreTracker.CONF_EXP_FILTER_ALPHA + confidence * (
1.0 - ScoreTracker.CONF_EXP_FILTER_ALPHA
)
if abs(confidences[pos]) > ScoreTracker.CONF_THRESHOLD:
self.did_states_change = True
state = self.state_red if side == side.kRed else self.state_blue
state[pos] = confidence > 0.0

@staticmethod
def count_links(r: npt.NDArray[bool]) -> int:
i = 0
n = 0
l = len(r)
while i < l - 2:
if r[i] and r[i + 1] and r[i + 2]:
n += 1
i += 3
continue
i += 1
return n

@staticmethod
def evaluate_state(a: npt.NDArray[bool]) -> int:
return (
sum(ScoreTracker.count_links(r) for r in a) * 5
+ a[0].sum() * 5
+ a[1].sum() * 3
+ a[2].sum() * 2
)

@staticmethod
def run_lengths_mod3(state: npt.NDArray[bool]) -> npt.NDArray[int]:
"""
Returns an array where corresponding in shape to the input, where
every value is replaced by the length of the longest uninterrupted
run of true values containing it, modulo 3
"""
run_lengths = np.zeros_like(state, dtype=int)
for y in range(3):
x = 0
while x < 9:
if not state[y, x]:
x += 1
continue
acc = 0
for xn in range(x, 9):
if not state[y, xn]:
break
acc += 1
run_lengths[y, x : x + acc] = acc % 3
x += acc
return run_lengths

@staticmethod
def get_in_row(arr: npt.NDArray, x: int, y: int, def_val):
if x < 0 or x > 8:
return def_val
else:
return arr[y, x]

@staticmethod
def get_best_moves(
state: npt.NDArray[bool],
type_to_test: GridNode,
link_preparation_score: float = 2.5,
) -> npt.NDArray:
vals = np.zeros_like(state, dtype=float)
run_lengths = ScoreTracker.run_lengths_mod3(state)
for y in range(3):
for x in range(9):
if (
state[y, x]
or (
type_to_test == GridNode.CUBE
and not ScoreTracker.CUBE_MASK[y, x]
)
or (
type_to_test == GridNode.CONE
and not ScoreTracker.CONE_MASK[y, x]
)
):
continue
val = [5.0, 3.0, 2.0][y]
# Check link completion
if (
ScoreTracker.get_in_row(run_lengths, x - 1, y, 0)
+ ScoreTracker.get_in_row(run_lengths, x + 1, y, 0)
>= 2
):
val += 5.0
# Otherwise, check link preparation (state where a link can be completed after 1 move)
else:
for o in [-2, -1, 1, 2]:
if ScoreTracker.get_in_row(run_lengths, x + o, y, 0) == 1:
val += link_preparation_score
break
vals[y, x] = val
m = vals.max()
return np.argwhere(vals == m)
76 changes: 54 additions & 22 deletions controllers/score_game_piece.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,25 @@
from controllers.movement import Movement
from controllers.recover import RecoverController

from magicbot import state, timed_state, StateMachine
from magicbot import state, StateMachine, feedback
from enum import Enum, auto
from utilities.game import Node, get_closest_node, get_score_location, Rows
from utilities.game import Node, get_closest_node, get_score_location, Rows, is_red, GamePiece, get_closest_node_in_allowed
from components.score_tracker import GridNode, ScoreTracker


class NodePickStratergy(Enum):
CLOSEST = auto()
OVERRIDE = auto()
BEST = auto()

def piece_to_node(piece: GamePiece) -> GridNode:
Pokesi marked this conversation as resolved.
Show resolved Hide resolved
if piece == GamePiece.BOTH:
return GridNode.HYBRID
if piece == GamePiece.CONE:
return GridNode.CONE
if piece == GamePiece.CUBE:
return GridNode.CUBE


class ScoreGamePieceController(StateMachine):
gripper: Gripper
Expand All @@ -23,62 +32,85 @@ class ScoreGamePieceController(StateMachine):

movement: Movement
recover: RecoverController
HARD_UP_SPEED = 0.3

score_tracker: ScoreTracker

def __init__(self) -> None:
self.node_stratergy = NodePickStratergy.CLOSEST
self.node_stratergy = NodePickStratergy.BEST
self.override_node = Node(Rows.HIGH, 0)
self.prefered_row = Rows.HIGH
self.target_node = Node(Rows.HIGH, 0)

@state(first=True, must_finish=True)
def driving_to_position(self, initial_call: bool) -> None:
def driving_to_position(self, initial_call):
if initial_call:
self.target_node = self.pick_node()
self.movement.set_goal(*get_score_location(self.target_node))
self.movement.do_autodrive()
if self.movement.is_at_goal():
self.next_state("hard_up")

@timed_state(next_state="deploying_arm", duration=0.3, must_finish=True)
def hard_up(self) -> None:
self.movement.inputs_lock = True
self.movement.set_input(-self.HARD_UP_SPEED, 0, 0, False, override=True)
self.next_state("deploying_arm")

@state(must_finish=True)
def deploying_arm(self, initial_call: bool) -> None:
def deploying_arm(self):
self.arm.go_to_setpoint(get_setpoint_from_node(self.target_node))
if self.arm.at_goal():
self.next_state("dropping")

@timed_state(duration=1, must_finish=True)
def dropping(self) -> None:
@state(must_finish=True)
def dropping(self):
self.gripper.open()
if self.gripper.get_full_open():
self.done()

def done(self) -> None:
super().done()
self.movement.inputs_lock = False
self.recover.engage()

def pick_node(self) -> Node:
cur_pos = self.movement.chassis.get_pose().translation()
if self.node_stratergy is NodePickStratergy.CLOSEST:
return get_closest_node(
cur_pos, self.gripper.get_current_piece(), self.prefered_row
cur_pos, self.gripper.get_current_piece(), self.prefered_row, []
)
elif self.node_stratergy is NodePickStratergy.OVERRIDE:
return self.override_node
elif self.node_stratergy is NodePickStratergy.BEST:
return get_closest_node(
cur_pos, self.gripper.get_current_piece(), self.prefered_row
state = self.score_tracker.state_blue if is_red() else self.score_tracker.state_red
best = self.score_tracker.get_best_moves(state, piece_to_node(self.gripper.holding))
nodes: list[Node] = []
for i in range(len(best)):
as_tuple = tuple(best[i])
node = Node(Rows(int(as_tuple[0])), as_tuple[1])
nodes.append(node)

return get_closest_node_in_allowed(
cur_pos, self.gripper.get_current_piece(), nodes
)

@feedback
def state_red(self) -> list[bool]:
state: list[bool] = []
for i in self.score_tracker.state_blue.tolist():
for j in i:
state.append(j)
return state

@feedback
def state_blue(self) -> list[bool]:
state: list[bool] = []
for i in self.score_tracker.state_blue.tolist():
for j in i:
state.append(j)
return state

@feedback
def pick_node_as_int(self) -> int:
# node = self.pick_node()
node = Node(Rows.HIGH, 0)
return (node.row.value - 1) * 3 + node.col

def prefer_high(self) -> None:
self.prefered_row = Rows.HIGH

def prefer_mid(self) -> None:
self.prefered_row = Rows.MID

def score_without_moving(self, node: Node) -> None:
self.target_node = node
self.engage("deploying_arm", force=True)
3 changes: 3 additions & 0 deletions robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from components.arm import Arm
from components.gripper import Gripper
from components.leds import StatusLights, DisplayType, LedColors
from components.score_tracker import ScoreTracker
from utilities.scalers import rescale_js
from utilities.game import is_red

Expand All @@ -40,6 +41,7 @@ class MyRobot(magicbot.MagicRobot):
intake: Intake
status_lights: StatusLights
gripper: Gripper
score_tracker: ScoreTracker

port_localizer: VisualLocalizer
starboard_localizer: VisualLocalizer
Expand Down Expand Up @@ -233,6 +235,7 @@ def testPeriodic(self) -> None:
self.port_localizer.execute()
self.starboard_localizer.execute()
self.status_lights.execute()
self.score_tracker.execute()

def cancel_controllers(self):
self.acquire_cone.done()
Expand Down
13 changes: 12 additions & 1 deletion utilities/game.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ def get_valid_piece(self) -> GamePiece:
return GamePiece.CUBE
else:
return GamePiece.CONE

def get_id(self) -> int:
return (self.row.value - 1) * 3 + self.col


def get_node_location(node: Node) -> Translation3d:
Expand Down Expand Up @@ -123,8 +126,10 @@ def get_score_location(node: Node) -> tuple[Pose2d, Rotation2d]:
return goal, approach


def get_closest_node(pos: Translation2d, piece: GamePiece, row: Rows) -> Node:
def get_closest_node(pos: Translation2d, piece: GamePiece, row: Rows, impossible: list[int]) -> Node:
Pokesi marked this conversation as resolved.
Show resolved Hide resolved
def get_node_dist(node: Node) -> float:
if node.get_id() in impossible:
return 999999
return get_score_location(node)[0].translation().distance(pos)

if piece == GamePiece.CONE:
Expand All @@ -136,6 +141,12 @@ def get_node_dist(node: Node) -> float:

return min(nodes, key=get_node_dist)

def get_closest_node_in_allowed(pos: Translation2d, piece: GamePiece, allowed: list[Node]) -> Node:
def get_node_dist(node: Node) -> float:
return get_score_location(node)[0].translation().distance(pos)

return min(allowed, key=get_node_dist)


# tag in blue loading bay, on red side of field 16=x
tag_4 = apriltag_layout.getTagPose(4)
Expand Down