diff --git a/momaland/envs/ingenious/__init__.py b/momaland/envs/ingenious/__init__.py new file mode 100644 index 00000000..3d3cdcad --- /dev/null +++ b/momaland/envs/ingenious/__init__.py @@ -0,0 +1 @@ +"""Multi-objective Ingenious environment.""" diff --git a/momaland/envs/ingenious/ingenious.py b/momaland/envs/ingenious/ingenious.py new file mode 100644 index 00000000..998afee6 --- /dev/null +++ b/momaland/envs/ingenious/ingenious.py @@ -0,0 +1,206 @@ +"""Multi-objective Ingenious environment for MOMAland. + +This environment is based on the Ingenious game: https://boardgamegeek.com/boardgame/9674/ingenious +Every color is a different objective. The goal in the original game is to maximize the minimum score over all colors, +however we leave the utility wrapper up to the users and only return the vectorial score on each color dimension. +""" + +import functools +import random + +# from gymnasium.utils import EzPickle +from typing_extensions import override + +import numpy as np +from gymnasium.logger import warn +from gymnasium.spaces import Box, Dict, Discrete +from pettingzoo.utils import wrappers + +from momaland.envs.ingenious.ingenious_base import ALL_COLORS, IngeniousBase +from momaland.utils.env import MOAECEnv + + +def env(**kwargs): + """Autowrapper for multi-objective Ingenious game. + + Args: + **kwargs: keyword args to forward to the parallel_env function + + Returns: + A fully wrapped env + """ + env = raw_env(**kwargs) + + # this wrapper helps error handling for discrete action spaces + env = wrappers.AssertOutOfBoundsWrapper(env) + return env + + +def raw_env(**kwargs): + """Env factory function for multi-objective Ingenious game.""" + return MOIngenious(**kwargs) + + +class MOIngenious(MOAECEnv): + """Environment for the multi-objective Ingenious game.""" + + metadata = {"render_modes": ["human"], "name": "moingenious_v0", "is_parallelizable": False} + + def __init__(self, num_players=2, init_draw=6, num_colors=6, board_size=8, limitation_score=18, render_mode=None): + """Initializes the multi-objective Ingenious game. + + Args: + num_players (int): The number of players in the environment. Default: 2 + init_draw (int): The number of tiles each player draws at the beginning of the game. Default: 6 + num_colors (int): The number of colors in the game. Default: 4 + board_size (int): The size of the board. Default: 8 + limitation_score(int): Limitation to refresh the score board for any color. Default: 20 + render_mode (str): The rendering mode. Default: None + """ + self.board_size = board_size + self.num_colors = num_colors + self.init_draw = init_draw + self.num_players = num_players + self.limitation_score = limitation_score + + self.game = IngeniousBase( + num_players=num_players, + init_draw=init_draw, + num_colors=num_colors, + board_size=board_size, + limitation_score=limitation_score, + ) + + self.possible_agents = ["agent_" + str(r) for r in range(num_players)] + # init list of agent + self.agents = self.possible_agents[:] + + self.terminations = {agent: False for agent in self.agents} + self.truncations = {agent: False for agent in self.agents} + self.infos = {agent: {} for agent in self.agents} + self.agent_selection = self.agents[self.game.agent_selector] + self._cumulative_rewards = {agent: np.zeros(self.num_colors) for agent in self.agents} + self.refresh_cumulative_reward = True + self.render_mode = render_mode + + # Observation space is a dict of 2 elements: actions mask and game state (board, agent own tile bag, + # agent score) + self.observation_spaces = { + i: Dict( + { + "observation": Dict( + { + "board": Box( + 0, len(ALL_COLORS), shape=(2 * self.board_size - 1, 2 * self.board_size - 1), dtype=np.float32 + ), + "tiles": Box(0, self.num_colors, shape=(self.init_draw, 2), dtype=np.int32), + "scores": Box(0, self.game.limitation_score, shape=(self.num_colors,), dtype=np.int32), + } + ), + "action_mask": Box(low=0, high=1, shape=(len(self.game.masked_action),), dtype=np.int8), + } + ) + for i in self.agents + } + + self.action_spaces = dict(zip(self.agents, [Discrete(len(self.game.masked_action))] * num_players)) + + # The reward after one move is the difference between the previous and current score. + self.reward_spaces = dict( + zip(self.agents, [Box(0, self.game.limitation_score, shape=(self.num_colors,))] * num_players) + ) + + @functools.lru_cache(maxsize=None) + @override + def observation_space(self, agent): + # gymnasium spaces are defined and documented here: https://gymnasiuspspom.farama.org/api/spaces/ + return self.observation_spaces[agent] + + @functools.lru_cache(maxsize=None) + @override + def action_space(self, agent): + return self.action_spaces[agent] + + @override + def reward_space(self, agent): + """Returns the reward space for the given agent.""" + return self.reward_spaces[agent] + + @override + def render(self): + """Renders the environment. + + In human mode, it can print to terminal, open + up a graphical window, or open up some other display that a human can see and understand. + """ + if self.render_mode is None: + warn("You are calling render method without specifying any render mode.") + return + + @override + def reset(self, seed=None, options=None): + """Reset needs to initialize the `agents` attribute and must set up the environment so that render(), + and step() can be called without issues. + """ + if seed is not None: + np.random.seed(seed) + random.seed(seed) + self.game.reset_game(seed) + self.agents = self.possible_agents[:] + obs = {agent: self.observe(agent) for agent in self.agents} + self.terminations = {agent: False for agent in self.agents} + self.truncations = {agent: False for agent in self.agents} + self.infos = {agent: {} for agent in self.agents} + self.agent_selection = self.agents[self.game.agent_selector] + self.agent_selection = self.agents[self.game.agent_selector] + self.rewards = {agent: np.zeros(self.num_colors, dtype="float64") for agent in self.agents} + self._cumulative_rewards = {agent: np.zeros(self.num_colors, dtype="float64") for agent in self.agents} + self.refresh_cumulative_reward = True + return obs, self.infos + + @override + def step(self, action): + """Steps in the environment. + + Args: + action: action of the active agent + """ + + current_agent = self.agent_selection + + if self.terminations[current_agent] or self.truncations[current_agent]: + return self._was_dead_step(action) + self.rewards = {agent: np.zeros(self.num_colors, dtype="float64") for agent in self.agents} + if self.refresh_cumulative_reward: + self._cumulative_rewards[current_agent] = np.zeros(self.num_colors, dtype="float64") + + if not self.game.end_flag: + prev_rewards = np.array(list(self.game.score[current_agent].values())) + self.game.set_action_index(action) + current_rewards = np.array(list(self.game.score[current_agent].values())) + self.rewards[current_agent] = current_rewards - prev_rewards + + if self.game.end_flag: + self.terminations = {agent: True for agent in self.agents} + + # update accumulate_rewards + self._accumulate_rewards() + + # update to next agent + self.agent_selection = self.agents[self.game.agent_selector] + + if self.agent_selection != current_agent: + self.refresh_cumulative_reward = True + else: + self.refresh_cumulative_reward = False + + @override + def observe(self, agent): + board_vals = np.array(self.game.board_array, dtype=np.float32) + p_tiles = np.array(self.game.p_tiles[agent], dtype=np.int32) + p_score = np.array(list(self.game.score[agent].values()), dtype=np.int32) + + observation = {"board": board_vals, "tiles": p_tiles, "scores": p_score} + action_mask = np.array(self.game.return_action_list(), dtype=np.int8) + + return {"observation": observation, "action_mask": action_mask} diff --git a/momaland/envs/ingenious/ingenious_base.py b/momaland/envs/ingenious/ingenious_base.py new file mode 100644 index 00000000..3dae81ed --- /dev/null +++ b/momaland/envs/ingenious/ingenious_base.py @@ -0,0 +1,400 @@ +"""Base class for Ingenious environment. + +This class is not meant to be instantiated directly. This class supports the MOIngenious environment and provides the +board and rules. +""" + +import collections +import itertools +import random + +import numpy as np + + +# red 12-pointed star +# green circle +# blue 6-pointed star +# orange hexagon +# yellow 24-pointed star +# purple ring +RED = 1 +GREEN = 2 +BLUE = 3 +ORANGE = 4 +YELLOW = 5 +PURPLE = 6 +ALL_COLORS = [RED, GREEN, BLUE, ORANGE, YELLOW, PURPLE] +COLOR_NAMES = ["red", "green", "blue", "orange", "yellow", "purple"] + +NUM_TILES = 120 +Hex = collections.namedtuple("Hex", ["q", "r", "s"]) + + +def hex_coord(q, r, s): + """Create a cube-based coordinates.""" + assert not (round(q + r + s) != 0), "q + r + s must be 0" + return Hex(q, r, s) + + +def hex_add(a, b): + """Add two cube-based coordinates.""" + return hex_coord(a.q + b.q, a.r + b.r, a.s + b.s) + + +def hex_subtract(a, b): + """Subtract two cube-based coordinates.""" + return hex_coord(a.q - b.q, a.r - b.r, a.s - b.s) + + +def hex_scale(a, k): + """Scale a cube-based coordinate.""" + return hex_coord(a.q * k, a.r * k, a.s * k) + + +def Hex2ArrayLocation(hx, length): + """Convert cube-based coordinates to 2D-based coordinates.""" + return hx.q + length - 1, hx.r + length - 1 + + +def ArrayLocation2Hex(x, y, length): + """Convert 2D-based coordinates to cube-based coordinates.""" + q = x + 1 - length + r = y + 1 - length + s = -q - r + return q, r, s + + +hex_directions = [ + hex_coord(1, 0, -1), + hex_coord(1, -1, 0), + hex_coord(0, -1, 1), + hex_coord(-1, 0, 1), + hex_coord(-1, 1, 0), + hex_coord(0, 1, -1), +] + + +def hex_direction(direction): + """Return the directions of a hexagon.""" + return hex_directions[direction] + + +def hex_neighbor(hex, direction): + """Return the neighbors of a hexagon.""" + return hex_add(hex, hex_direction(direction)) + + +def generate_board(board_size): + """Generate a hexagonal board.""" + N = board_size - 1 + s = set() + for q in range(-N, +N + 1): + r1 = max(-N, -q - N) + r2 = min(N, -q + N) + for r in range(r1, r2 + 1): + location = hex_coord(q, r, -q - r) + s.add(location) + return s + + +class IngeniousBase: + """Base class for Ingenious environment.""" + + def __init__(self, num_players=2, init_draw=6, num_colors=6, board_size=8, limitation_score=18): + """Initialize the Ingenious environment. + + Args: + num_players (int): Number of players in the game. + init_draw (int): Number of tiles to draw at the beginning of the game. + num_colors (int): Number of colors in the game. + board_size (int): Size of the board. + limitation_score(int): Limitation to refresh the score board for any color. Default: 20 + """ + assert 2 <= num_players <= 5, "Number of players must be between 2 and 5." + assert 2 <= num_colors <= 6, "Number of colors must be between 2 and 6." + assert 2 <= init_draw <= 6, "Number of tiles in hand must be between 2 and 6." + assert 3 <= board_size <= 10, "Board size must be between 3 and 10." + + self.board_size = board_size + self.num_player = num_players + self.agents = [f"agent_{i}" for i in range(self.num_player)] + self.agent_selector = 0 + self.limitation_score = limitation_score + self.colors = num_colors + self.corner_color = ALL_COLORS + self.init_draw = init_draw + self.board_array = np.zeros([2 * self.board_size - 1, 2 * self.board_size - 1]) + self.board_hex = generate_board(self.board_size) # original full board + self.action_map = {} + self.action_index_map = {} + self.action_size = 0 + self.masked_action = [] + self.legal_move = set() + self.score = {agent: {ALL_COLORS[i]: 0 for i in range(0, self.colors)} for agent in self.agents} + + self.tiles_bag = {} + self.p_tiles = {agent: [] for agent in self.agents} + self.first_round = True + self.first_round_pos = set() + self.end_flag = False + self.random = random.Random() + + for loc in self.board_hex: + for direct in range(0, len(hex_directions)): + neighbour = hex_neighbor(loc, direct) + if neighbour not in self.board_hex: + continue + for i in range(0, self.init_draw): + if (loc, neighbour, i) not in self.action_map: + self.action_map[(loc, neighbour, i)] = self.action_size + self.action_index_map[self.action_size] = (loc, neighbour, i) + self.legal_move.add(self.action_size) + self.action_size += 1 + self.masked_action = np.ones(self.action_size, "int8") + + def reset_game(self, seed=None): + """Reset the board, racks, score, and tiles bag.""" + if seed is not None: + np.random.seed(seed) + random.seed(seed) + self.random.seed(seed) + self.end_flag = False + self.first_round = True + self.first_round_pos.clear() + self.board_array = np.zeros([2 * self.board_size - 1, 2 * self.board_size - 1]) + # generate hex board + self.board_hex = generate_board(self.board_size) + # generate action space + self.action_map.clear() + self.action_index_map.clear() + self.action_size = 0 + for loc in self.board_hex: + for direct in range(0, len(hex_directions)): + neighbour = hex_neighbor(loc, direct) + if neighbour not in self.board_hex: + continue + for i in range(0, self.init_draw): + if (loc, neighbour, i) not in self.action_map: + self.action_map[(loc, neighbour, i)] = self.action_size + self.action_index_map[self.action_size] = (loc, neighbour, i) + self.legal_move.add(self.action_size) + self.action_size += 1 + self.masked_action = np.ones(self.action_size, "int8") + + # generate corner symbol + self.initial_corner() + # generate and shuffle public tiles bag + self.tiles_bag_reset() + # initial tile draw for each agent + self.p_tiles = {a: self.draw_tiles_fill() for a in self.agents} + self.agent_selector = 0 + self.score = {agent: {ALL_COLORS[i]: 0 for i in range(0, self.colors)} for agent in self.agents} + + def draw_tiles_fill(self): + """Draw tiles for single player with amount(self.init_draw) of tiles.""" + return [self.tiles_bag.pop(self.random.randrange(len(self.tiles_bag))) for _ in range(self.init_draw)] + + def get_tile(self, a): + """Draw tiles for a specific player.""" + while len(self.p_tiles[a]) < self.init_draw: + self.p_tiles[a].append(self.tiles_bag.pop(self.random.randrange(len(self.tiles_bag)))) + return + + def initial_corner(self): + """Initialise the corner of the board with the 6 colors.""" + for i in range(0, 6): + a = hex_scale(hex_directions[i], self.board_size - 1) + x, y = Hex2ArrayLocation(a, self.board_size) + self.board_array[x, y] = self.corner_color[i] + self.exclude_action(a) + + # In first round, each player has to put the tile next to the corners position. Therefore, + # we use self.first_round_pos to maintain the first round position. + for k in range(0, 6): + hx1 = hex_neighbor(a, k) + for j in range(0, 6): + hx2 = hex_neighbor(hx1, j) + if (hx2 not in self.board_hex) or (hx1 not in self.board_hex) or (hx2 == a): + continue + for card in range(0, self.init_draw): + c1 = self.action_map[(hx1, hx2, card)] + c2 = self.action_map[(hx2, hx1, card)] + self.first_round_pos.add(c1) + self.first_round_pos.add(c2) + + def tiles_bag_reset(self): + """Generate and shuffle the tiles bag.""" + # Create a list of tuples for combinations of two different colors + diff_color_combinations = list(itertools.combinations(ALL_COLORS[: self.colors], 2)) + # Create a list of tuples for combinations of the same integer (color) + same_color_combinations = [(color, color) for color in ALL_COLORS[: self.colors]] + # Create the tiles bag + if self.colors == len(ALL_COLORS): + # when color type is 6, tiles bag follow the original game setting : six tiles for each two-colour + # combination (e.g. red/orange) and five for each double (red/red) + self.tiles_bag = (diff_color_combinations * 6) + (same_color_combinations * 5) + else: + # when color type is not 6( like 1-5), the number of combinations could be divided by NUM_TILES(120) + self.tiles_bag = int(NUM_TILES / len(diff_color_combinations + same_color_combinations)) * ( + diff_color_combinations + same_color_combinations + ) + # Shuffle the tiles bag + self.random.shuffle(self.tiles_bag) + + def set_action_index(self, index): + """Apply the corresponding action for the given index on the board.""" + """If selected actions is not a legal move, return False""" + assert self.masked_action[index] == 1, "Illegal move, choose a valid action." + if self.first_round: + assert index in self.first_round_pos, ( + "Illegal move, in the first round tiles can only be placed next to " "corners." + ) + """Hex Coordinate: h1,h2 ; Tile to play: card""" + h1, h2, card = self.action_index_map[index] + agent_i = self.agent_selector + agent = self.agents[agent_i] + assert card < len(self.p_tiles[agent]), "Illegal move: choosing tile out of hand(happening after ingenious)" + """Extract the certain tile (color1 , color2) as (c1,c2)""" + c1, c2 = self.p_tiles[agent][card] + # Translate Hex Coordinate to Offset Coordinate(x,y) + x1, y1 = Hex2ArrayLocation(h1, self.board_size) + x2, y2 = Hex2ArrayLocation(h2, self.board_size) + flag = False + for item in self.p_tiles[agent]: + if (c1, c2) == item: + self.p_tiles[agent].remove(item) + flag = True + break + if (c2, c1) == item: + self.p_tiles[agent].remove(item) + flag = True + break + assert flag, "Illegal move: set the tile to the coordinate unsuccessfully" + """Update the mask_action list after the action""" + self.legal_move.remove(index) + self.board_array[x1][y1] = c1 + self.board_array[x2][y2] = c2 + self.exclude_action(h1) + self.exclude_action(h2) + # Flag to signal if ingenious is called + skip_flag = False + # flags to avoid calling ingenious on colour that was already maxed out + ingenious_possible = [True, True] + if self.score[agent][c1] == self.limitation_score: + ingenious_possible[0] = False + if self.score[agent][c2] == self.limitation_score: + ingenious_possible[1] = False + + """Update score through checking 5 neighboring directions for h1 and h2 independently""" + self.score[agent][c1] += self.calculate_score_for_piece(h1, h2, c1) + self.score[agent][c2] += self.calculate_score_for_piece(h2, h1, c2) + + if self.score[agent][c1] > self.limitation_score and ingenious_possible[0]: + skip_flag = True + self.score[agent][c1] = self.limitation_score + if self.score[agent][c2] > self.limitation_score and ingenious_possible[1]: + skip_flag = True + self.score[agent][c2] = self.limitation_score + + """End game if no more legal actions.""" + if len(self.legal_move) == 0: + self.end_flag = True + # Preserve the number of tiles in hand for each player to comply with observation dimensions + while len(self.p_tiles[agent]) < self.init_draw: + self.p_tiles[agent].append([0, 0]) + return True + + """All tiles in hand has been played""" + if len(self.p_tiles[agent]) == 0: + self.end_flag = True # The player should win instantly if he plays out all the tiles in hand. + # Preserve the number of tiles in hand for each player to comply with observation dimensions + while len(self.p_tiles[agent]) < self.init_draw: + self.p_tiles[agent].append([0, 0]) + return True + + """In the original rules of the game, when a player calls ingenious, they can play a bonus round without + replenishing tiles in hand. However, due to implementation constraints in our case the player replenishes its + hand in all cases (ingenious or not)""" + self.get_tile(agent) + # Rule that says if you have no tiles of a color, you can swap your tiles with the lowest score. + self.refresh_hand(agent) + # Pass turn to next player if ingenious was not called + if not skip_flag: + self.next_turn() + + def calculate_score_for_piece(self, start_hex, other_hex, color): + """Calculate the scores after placing the tile.""" + point = 0 + for i in range(0, 6): + neighbor_hex = hex_neighbor(start_hex, i) + if neighbor_hex == other_hex: + continue + while neighbor_hex in self.board_hex: + x, y = Hex2ArrayLocation(neighbor_hex, self.board_size) + if self.board_array[x][y] == color: + point += 1 + else: + break + neighbor_hex = hex_neighbor(neighbor_hex, i) + return point + + def exclude_action(self, hx): + """Exclude the actions that are not legal moves.""" + for i in range(0, 6): + hx2 = hex_neighbor(hx, i) + if hx2 not in self.board_hex: + continue + for card in range(0, self.init_draw): + x = self.action_map[(hx, hx2, card)] + self.masked_action[x] = 0 + if x in self.legal_move: + self.legal_move.remove(x) + y = self.action_map[(hx2, hx, card)] + self.masked_action[y] = 0 + if y in self.legal_move: + self.legal_move.remove(y) + + def next_turn(self): + """Move to the next turn.""" + self.agent_selector = (self.agent_selector + 1) % self.num_player + if self.agent_selector == 0 and self.first_round: + self.first_round = False + return self.agent_selector + + def refresh_hand(self, player): + """Additional rule to refresh hand-held tiles.""" + """find the color for which the player has the lowest score""" + minval = min(self.score[player].values()) + flag_lowest_score = False + for item in self.p_tiles[player]: + for col in item: + if self.score[player][col] == minval: + flag_lowest_score = True + if flag_lowest_score: + break + if not flag_lowest_score: + """no lowest score color""" + # save current unused tiles to add them back to the tiles bag + back_up = self.p_tiles[player].copy() + # clear the player's tiles + self.p_tiles[player].clear() + # draw new tiles + self.get_tile(player) + # add unused tiles back to the tiles bag + self.tiles_bag.append(back_up) + + def return_action_list(self): + """Return the legal action list.""" + if self.first_round: + return [ + 1 if i in self.first_round_pos and self.masked_action[i] == 1 else 0 for i in range(len(self.masked_action)) + ] + return self.masked_action + + def log(self): + """Print the current status of the game.""" + print({"board_size": self.board_size, "num_players": self.num_player}) + print("selector", self.agent_selector) + print(self.board_array) + print(self.score) + print(self.p_tiles) diff --git a/momaland/envs/ingenious/ingenious_check.py b/momaland/envs/ingenious/ingenious_check.py new file mode 100644 index 00000000..2a1f64e3 --- /dev/null +++ b/momaland/envs/ingenious/ingenious_check.py @@ -0,0 +1,359 @@ +"""Temporary check file for Ingenious environment.""" + +import random + +import gymnasium +import numpy as np +from ingenious import MOIngenious +from ingenious_base import Hex2ArrayLocation + + +def train(ig_env): + """Train a random agent on the item gathering domain.""" + done = False + while not done: + ag = ig_env.agent_selection + # print("Agent: ", ag) + obs = ig_env.observe(ag) + masked_act_list = obs["action_mask"] + action = random_index_of_one(masked_act_list) + # print("Observation: ",obs) + # print("Action: ", action) + ig_env.step(action) + observation, reward, truncation, termination, _ = ig_env.last() + # print("Observations: ", observation) + # print("Rewards: ", reward) + # print("Truncation: ", truncation) + # print("Termination: ", termination) + done = truncation or termination + + +def random_index_of_one(lst): + """Get indices where the value is 1.""" + # Get indices where the value is 1 + one_indices = [i for i, value in enumerate(lst) if value == 1] + # Check if there is at least one '1' in the list + if one_indices: + # Randomly choose an index where the value is 1 + random_index = random.choice(one_indices) + return random_index + else: + # If there are no '1' values in the list, return an appropriate message or handle it as needed + return "No '1' values in the list" + + +def random_index_of_zero(lst): + """Get indices where the value is 0.""" + one_indices = [i for i, value in enumerate(lst) if value == 0] + # Check if there is at least one '0' in the list + if one_indices: + # Randomly choose an index where the value is 0 + random_index = random.choice(one_indices) + return random_index + else: + # If there are no '1' values in the list, return an appropriate message or handle it as needed + return "No '0' values in the list" + + +def test_move(): + """Test move correctly in ingenious_base. + + Returns: True or False + """ + ig_env = MOIngenious(num_players=2, init_draw=2, num_colors=2, board_size=8) + ig_env.reset() + # print(ig_env.game.board_array, "nweowjrowhafhif!!!!!!!!!") + flag = True + + # action map insist the same with index map + for i in ig_env.game.action_index_map: + h = ig_env.game.action_map.get(ig_env.game.action_index_map[i]) + if h is None or h != i: + flag = False + break + # check legal move + index = random_index_of_one(ig_env.game.return_action_list()) + h1, h2, card = ig_env.game.action_index_map[index] + x1, y1 = Hex2ArrayLocation(h1, ig_env.game.board_size) + x2, y2 = Hex2ArrayLocation(h2, ig_env.game.board_size) + + if ig_env.game.board_array[x1][y1] != 0 or ig_env.game.board_array[x2][y2] != 0: + print("reason1") + flag = False + return flag + + ag = ig_env.agent_selection + c1, c2 = ig_env.game.p_tiles[ag][card] + + # print(c1,c2,ig_env.game.board_array[x1][y1],ig_env.game.board_array[x2][y2] ) + # print(ig_env.game.return_action_list()[index]) + ig_env.game.set_action_index(index) + # ig_env.step(index) + # print('after',c1, c2, ig_env.game.board_array[x1][y1], ig_env.game.board_array[x2][y2]) + ag = ig_env.agent_selection + if ig_env.game.board_array[x1][y1] != c1 or ig_env.game.board_array[x2][y2] != c2: + flag = False + print("reason2") + return flag + + # check illegal move : put somewhere not allowed + index = random_index_of_zero(ig_env.game.return_action_list()) + if ig_env.game.set_action_index(index): + print("reason3") + flag = False + return flag + + # check illegal move : put some tile out of hand + index = random_index_of_one(ig_env.game.return_action_list()) + + ag = ig_env.game.agents[ig_env.game.agent_selector] + # h1, h2, card = ig_env.game.action_index_map[index] + ig_env.game.p_tiles[ag].clear() + + if ig_env.game.set_action_index(index): + print("reason4") + flag = False + return flag + return flag + + +def test_step(): + """Test move correctly in ingenious_base. + + Returns: True or False + """ + ig_env = MOIngenious(num_players=2, init_draw=2, num_colors=2, board_size=8) + ig_env.reset() + flag = True + + # check legal step + ag = ig_env.agent_selection + + obs = ig_env.observe(ag) + masked_act_list = obs["action_mask"] + index = random_index_of_one(masked_act_list) + h1, h2, card = ig_env.game.action_index_map[index] + x1, y1 = Hex2ArrayLocation(h1, ig_env.game.board_size) + x2, y2 = Hex2ArrayLocation(h2, ig_env.game.board_size) + + if ig_env.game.board_array[x1][y1] != 0 or ig_env.game.board_array[x2][y2] != 0: + print("reason1") + flag = False + return flag + ag = ig_env.agent_selection + c1, c2 = ig_env.game.p_tiles[ag][card] + + ig_env.step(index) + + ag = ig_env.agent_selection + if ig_env.game.board_array[x1][y1] != c1 or ig_env.game.board_array[x2][y2] != c2: + flag = False + print("reason2") + return flag + + # check illegal move : put somewhere not allowed + obs = ig_env.observe(ag) + masked_act_list = obs["action_mask"] + index = random_index_of_zero(masked_act_list) + + remain = len(ig_env.game.tiles_bag) + ig_env.step(index) + if remain != len(ig_env.game.tiles_bag): + print("reason3") + flag = False + return flag + + # check illegal move : put some tile out of hand + index = random_index_of_one(ig_env.game.masked_action) + ag = ig_env.agent_selection + ig_env.game.p_tiles[ag].clear() + remain = len(ig_env.game.tiles_bag) + ig_env.step(index) + if remain != len(ig_env.game.tiles_bag): + print("reason4") + flag = False + return flag + + # check selector + + return flag + + +def test_reset(): + """Use MOIngenious.reset, then check if every parameter inside ingenious_base is right. + + Returns: True or False + + """ + ig_env = MOIngenious(num_players=2, init_draw=2, num_colors=2, board_size=4) + ig_env.reset(105) + train(ig_env) + ig_env.reset(110) + flag = True + if ig_env.game.board_array.sum() != 21: + flag = False + + if ig_env.game.end_flag: + flag = False + if not ig_env.game.first_round: + flag = False + if ig_env.game.action_size - ig_env.game.masked_action.sum() != 6 * 3 * 2 * 2: + flag = False + if sum([sum(s) for s in [l.values() for l in ig_env.game.score.values()]]) != 0: + flag = False + if ig_env.game.agent_selector != 0: + flag = False + if len(ig_env.game.tiles_bag) < 100: + flag = False + return flag + + +def test_ingenious_rule(): + """Ingenious rule test in a small case setting; when game end successfully, no agent should successively play 3 times.""" + ig_env = MOIngenious(num_players=2, init_draw=2, num_colors=2, board_size=8, limitation_score=10) + ag = -1 + sum = 0 + ig_env.reset() + done = False + if_exeed = True + if_ingenious = False + while not done: + if ag != ig_env.agent_selection: + sum = 0 + else: + sum += 1 + ag = ig_env.agent_selection + # obs = ig_env.observe(ag) + # masked_act_list = obs["action_mask"] + masked_act_list = ig_env.game.return_action_list() + action = random_index_of_one(masked_act_list) + ig_env.step(action) + observation, reward, truncation, termination, _ = ig_env.last() + done = truncation or termination + if sum >= 2: + if_exeed = False + break + if sum == 1: + if_ingenious = True + break + return if_ingenious and if_exeed + + +def test_API(): + """Test observe interface in ingenous.py.""" + ig_env = MOIngenious(limitation_score=10000) + ag = ig_env.agent_selection + obs = ig_env.observe(ag) + masked_act_list = obs["action_mask"] + print(sum(masked_act_list)) + print(sum(ig_env.game.masked_action)) + env = ig_env + env.reset() + # observation_0 + num_cycles = 100 + + env.reset() + + terminated = {agent: False for agent in env.agents} + truncated = {agent: False for agent in env.agents} + live_agents = set(env.agents[:]) + has_finished = set() + generated_agents = set() + accumulated_rewards = { + agent: np.zeros(env.unwrapped.reward_space(agent).shape[0], dtype=np.float32) for agent in env.agents + } + for agent in env.agent_iter(env.num_agents * num_cycles): + generated_agents.add(agent) + print(agent, has_finished, generated_agents) + print(env.last()) + assert agent not in has_finished, "agents cannot resurrect! Generate a new agent with a new name." + assert isinstance(env.infos[agent], dict), "an environment agent's info must be a dictionary" + prev_observe, reward, terminated, truncated, info = env.last() + if terminated or truncated: + action = None + elif isinstance(prev_observe, dict) and "action_mask" in prev_observe: + action = random.choice(np.flatnonzero(prev_observe["action_mask"]).tolist()) + else: + action = env.action_space(agent).sample() + + if agent not in live_agents: + live_agents.add(agent) + + assert live_agents.issubset(set(env.agents)), "environment must delete agents as the game continues" + + if terminated or truncated: + live_agents.remove(agent) + has_finished.add(agent) + + assert np.all( + accumulated_rewards[agent] == reward + ), "reward returned by last is not the accumulated rewards in its rewards dict" + accumulated_rewards[agent] = np.zeros_like(reward, dtype=np.float32) + + env.step(action) + + for a, rew in env.rewards.items(): + accumulated_rewards[a] += rew + + assert env.num_agents == len(env.agents), "env.num_agents is not equal to len(env.agents)" + assert set(env.rewards.keys()) == ( + set(env.agents) + ), "agents should not be given a reward if they were terminated or truncated last turn" + assert set(env.terminations.keys()) == ( + set(env.agents) + ), "agents should not be given a termination if they were terminated or truncated last turn" + assert set(env.truncations.keys()) == ( + set(env.agents) + ), "agents should not be given a truncation if they were terminated or truncated last turn" + assert set(env.infos.keys()) == ( + set(env.agents) + ), "agents should not be given an info if they were terminated or truncated last turn" + if hasattr(env, "possible_agents"): + assert set(env.agents).issubset( + set(env.possible_agents) + ), "possible agents should always include all agents, if it exists" + + if not env.agents: + break + + if isinstance(env.observation_space(agent), gymnasium.spaces.Box): + assert env.observation_space(agent).dtype == prev_observe.dtype + assert env.observation_space(agent).contains(prev_observe), "Out of bounds observation: " + str(prev_observe) + + assert env.observation_space(agent).contains(prev_observe), "Agent's observation is outside of it's observation space" + # test_observation(prev_observe, observation_0) + if not isinstance(env.infos[env.agent_selection], dict): + print("The info of each agent should be a dict, use {} if you aren't using info") + + if not env.agents: + assert has_finished == generated_agents, "not all agents finished, some were skipped over" + + +if __name__ == "__main__": + # ig_env = MOIngenious(num_players=2, init_draw=2, num_colors=2, board_size=8) + # ag = ig_env.agent_selection + # ig_env.reset() + t1 = test_ingenious_rule() + # t1 = True + # ig_env.reset() + t2 = test_reset() + # ig_env.reset() + # t3 = test_move() # no need anymore + t4 = test_step() + + if t1: + print("Accepted: ingenious rule test") + else: + print("Rejected: ingenious rule test") + if t2: + print("Accepted: reset test") + else: + print("Rejected: reset test") + # if t3: + # print("Accepted: move in ingenious_base test") + # else: + # print("Rejected: move in ingenious_base test") + if t4: + print("Accepted: move in step test") + else: + print("Rejected: move in step test") diff --git a/momaland/envs/ingenious/ingenious_seedtest.py b/momaland/envs/ingenious/ingenious_seedtest.py new file mode 100644 index 00000000..a9215f36 --- /dev/null +++ b/momaland/envs/ingenious/ingenious_seedtest.py @@ -0,0 +1,206 @@ +"""Temporary check file for Ingenious environment.""" + +# import random + +import numpy as np +from ingenious import MOIngenious + + +# from ingenious_base import Hex2ArrayLocation +# from pettingzoo.classic import hanabi_v5 + + +# from pettingzoo.test import parallel_seed_test, seed_test + + +""" +def train(ig_env): + Train a random agent on the item gathering domain. + done = False + while not done: + ag = ig_env.agent_selection + print("Agent: ", ag) + obs = ig_env.observe(ag) + masked_act_list = obs["action_mask"] + action = random_index_of_one(masked_act_list) + # print("Observation: ",obs) + # print("Action: ", action) + ig_env.step(action) + observation, reward, truncation, termination, _ = ig_env.last() + print("Observations: ", observation) + print("Rewards: ", reward[ag]) + print("Truncation: ", truncation[ag]) + print("Termination: ", termination[ag]) + done = truncation[ag] or termination[ag] + + +def random_index_of_one(lst): + Get indices where the value is 1. + # Get indices where the value is 1 + one_indices = [i for i, value in enumerate(lst) if value == 1] + # Check if there is at least one '1' in the list + if one_indices: + # Randomly choose an index where the value is 1 + random_index = random.choice(one_indices) + return random_index + else: + # If there are no '1' values in the list, return an appropriate message or handle it as needed + return "No '1' values in the list" + + +def random_index_of_zero(lst): + Get indices where the value is 0. + one_indices = [i for i, value in enumerate(lst) if value == 0] + # Check if there is at least one '0' in the list + if one_indices: + # Randomly choose an index where the value is 0 + random_index = random.choice(one_indices) + return random_index + else: + # If there are no '1' values in the list, return an appropriate message or handle it as needed + return "No '0' values in the list" +""" + + +def check_environment_deterministic(env1, env2, num_cycles): + """Check that two AEC environments execute the same way.""" + env1.reset(seed=42) + env2.reset(seed=42) + print(env1.game.log()) + print(env2.game.log()) + # seed action spaces to ensure sampled actions are the same + seed_action_spaces(env1) + seed_action_spaces(env2) + + # seed observation spaces to ensure first observation is the same + seed_observation_spaces(env1) + seed_observation_spaces(env2) + + iter = 0 + max_env_iters = num_cycles * len(env1.agents) + + for agent1, agent2 in zip(env1.agent_iter(), env2.agent_iter()): + assert data_equivalence(agent1, agent2), f"Incorrect agent: {agent1} {agent2}" + + obs1, reward1, termination1, truncation1, info1 = env1.last() + obs2, reward2, termination2, truncation2, info2 = env2.last() + print(env1.agent_selection) + print(env2.agent_selection) + print("after") + print(obs1, obs2) + assert data_equivalence(obs1, obs2), "Incorrect observation" + assert data_equivalence(reward1, reward2), "Incorrect reward." + assert data_equivalence(termination1, termination2), "Incorrect termination." + assert data_equivalence(truncation1, truncation2), "Incorrect truncation." + assert data_equivalence(info1, info2), "Incorrect info." + + if termination1 or truncation1: + break + print("here 1") + mask1 = obs1.get("action_mask") if isinstance(obs1, dict) else None + mask2 = obs2.get("action_mask") if isinstance(obs2, dict) else None + assert data_equivalence(mask1, mask2), f"Incorrect action mask: {mask1} {mask2}" + print("here 2") + env1.action_space.seed(0) + env2.action_space.seed(3) + action1 = env1.action_space(agent1).sample(mask1) + action2 = env2.action_space(agent2).sample(mask2) + + assert data_equivalence(action1, action2), f"Incorrect actions: {action1} {action2}" + + print("before") + print(obs1) + print(obs2) + + env1.step(action1) + env2.step(action2) + print("here 3") + iter += 1 + + if iter >= max_env_iters: + break + + env1.close() + env2.close() + + +def seed_action_spaces(env): + """Seed action space.""" + if hasattr(env, "agents"): + for i, agent in enumerate(env.agents): + env.action_space(agent).seed(42 + i) + + +def seed_observation_spaces(env): + """Seed obs space.""" + if hasattr(env, "agents"): + for i, agent in enumerate(env.agents): + env.observation_space(agent).seed(42 + i) + + +def data_equivalence(data_1, data_2) -> bool: + """Assert equality between data 1 and 2, i.e observations, actions, info. + + Args: + data_1: data structure 1 + data_2: data structure 2 + + Returns: + If observation 1 and 2 are equivalent + """ + if type(data_1) is type(data_2): + if isinstance(data_1, dict): + return data_1.keys() == data_2.keys() and all(data_equivalence(data_1[k], data_2[k]) for k in data_1.keys()) + elif isinstance(data_1, (tuple, list)): + return len(data_1) == len(data_2) and all(data_equivalence(o_1, o_2) for o_1, o_2 in zip(data_1, data_2)) + elif isinstance(data_1, np.ndarray): + # return data_1.shape == data_2.shape and np.allclose( + # data_1, data_2, atol=0.00001 + # ) + return data_1.shape == data_2.shape and all(data_equivalence(data_1[k], data_2[k]) for k in range(0, len(data_1))) + else: + return data_1 == data_2 + else: + return False + + +if __name__ == "__main__": + ig_env = MOIngenious(num_players=4, init_draw=4, num_colors=4, board_size=8) + + ig_env2 = MOIngenious(num_players=4, init_draw=4, num_colors=4, board_size=8) + + env1 = ig_env + env2 = ig_env2 + env1.reset(seed=40) + env2.reset(seed=40) + # env1. + + env1.game.log() + env2.game.log() + + prev_observe1, reward1, terminated1, truncated1, info1 = env1.last() + prev_observe2, reward2, terminated2, truncated2, info2 = env1.last() + # action = random.choice(np.flatnonzero(prev_observe["action_mask"]).tolist()) + agent1 = env1.agent_selection + agent2 = env1.agent_selection + + print(agent1, agent2) + """ + print(type(env1.action_space(agent1))) + print(sum(prev_observe1["action_mask"])) + print(sum(prev_observe2["action_mask"])) + action1 = env1.action_space(agent1).sample(prev_observe1["action_mask"]) + action2 = env2.action_space(agent2).sample(prev_observe2["action_mask"]) + print(action1, action2) + print(prev_observe1["action_mask"][action1], prev_observe2["action_mask"][action2]) + # check_environment_deterministic(env1, env2, 100) + + env3 = hanabi_v5.env() + env3.reset(seed=30) + env4 = hanabi_v5.env() + env4.reset(seed=30) + agent = env3.agent_selection + action1 = env3.action_space(agent).sample() + action2 = env4.action_space(agent).sample() + print(action1, action2) + """ diff --git a/momaland/envs/ingenious/moingenious_v0.py b/momaland/envs/ingenious/moingenious_v0.py new file mode 100644 index 00000000..7f82f262 --- /dev/null +++ b/momaland/envs/ingenious/moingenious_v0.py @@ -0,0 +1,5 @@ +"""Multi-objective Ingenious Game.""" +from momaland.envs.ingenious.ingenious import env, raw_env + + +__all__ = ["env", "raw_env"] diff --git a/momaland/utils/all_modules.py b/momaland/utils/all_modules.py index 35a8185a..65b89a98 100644 --- a/momaland/utils/all_modules.py +++ b/momaland/utils/all_modules.py @@ -13,6 +13,7 @@ from momaland.envs.crazyrl.escort import escort_v0 from momaland.envs.crazyrl.surround import surround_v0 from momaland.envs.gem_mining import mogemmining_v0 +from momaland.envs.ingenious import moingenious_v0 from momaland.envs.item_gathering import moitemgathering_v0 from momaland.envs.multiwalker import momultiwalker_v0 from momaland.envs.pistonball import mopistonball_v0 @@ -26,6 +27,7 @@ "surround_v0": surround_v0, "escort_v0": escort_v0, "moitemgathering_v0": moitemgathering_v0, + "moingenious_v0": moingenious_v0, "mopistonball_v0": mopistonball_v0, "mocongestion_v0": mocongestion_v0, "moconnect4_v0": moconnect4_v0,