diff --git a/aeon/synthesis_grammar/grammar.py b/aeon/synthesis_grammar/grammar.py index e0d3724b..7625a8d5 100644 --- a/aeon/synthesis_grammar/grammar.py +++ b/aeon/synthesis_grammar/grammar.py @@ -40,16 +40,6 @@ def get_core(self): ... classType = TypingType[HasGetCore] -def mk_method_core_literal(cls: classType, ty: Type) -> classType: - - def get_core(self): - value = getattr(self, "value", None) - return Literal(value, type=ty) - - setattr(cls, "get_core", get_core) - return cls - - def is_valid_class_name(class_name: str) -> bool: return class_name not in prelude_ops and not class_name.startswith(("_anf_", "target")) @@ -104,7 +94,13 @@ def create_literal_class( [("value", value_type)], bases=(parent_class,), ) - return mk_method_core_literal(new_class, aeon_type) + + def get_core(self): + value = getattr(self, "value", None) + return Literal(value, type=aeon_type) + + setattr(new_class, "get_core", get_core) + return new_class def create_literals_nodes(type_info: dict[Type, TypingType], types: Optional[list[Type]] = None) -> list[TypingType]: @@ -149,6 +145,7 @@ def get_core(_self): # Note: We cannot use the variable inside Abstraction. setattr(dc, "get_core", get_core) + return dc diff --git a/aeon/synthesis_grammar/synthesizer.py b/aeon/synthesis_grammar/synthesizer.py index e8077acf..c7b35bab 100644 --- a/aeon/synthesis_grammar/synthesizer.py +++ b/aeon/synthesis_grammar/synthesizer.py @@ -10,12 +10,14 @@ from typing import Callable import configparser +from geneticengine.representations.tree.initializations import MaxDepthDecider import multiprocess as mp from geneticengine.algorithms.gp.operators.combinators import ParallelStep, SequenceStep from geneticengine.algorithms.gp.operators.crossover import GenericCrossoverStep from geneticengine.algorithms.gp.operators.elitism import ElitismStep from geneticengine.algorithms.gp.operators.initializers import ( - StandardInitializer, ) + StandardInitializer, +) from geneticengine.algorithms.gp.operators.mutation import GenericMutationStep from geneticengine.algorithms.gp.operators.novelty import NoveltyStep from geneticengine.algorithms.gp.operators.selection import LexicaseSelection @@ -33,10 +35,12 @@ from geneticengine.problems import MultiObjectiveProblem, Problem, SingleObjectiveProblem from geneticengine.random.sources import RandomSource from geneticengine.representations.grammatical_evolution.dynamic_structured_ge import ( - DynamicStructuredGrammaticalEvolutionRepresentation, ) + DynamicStructuredGrammaticalEvolutionRepresentation, +) from geneticengine.representations.grammatical_evolution.ge import GrammaticalEvolutionRepresentation from geneticengine.representations.grammatical_evolution.structured_ge import ( - StructuredGrammaticalEvolutionRepresentation, ) + StructuredGrammaticalEvolutionRepresentation, +) from geneticengine.representations.tree.treebased import TreeBasedRepresentation from geneticengine.solutions import Individual from loguru import logger @@ -109,8 +113,7 @@ def __init__(self, target_fitness: float): def is_done(self, tracker: ProgressTracker): assert isinstance(tracker, MultiObjectiveProgressTracker) - comps = tracker.get_best_individuals()[0].get_fitness( - tracker.get_problem()).fitness_components + comps = tracker.get_best_individuals()[0].get_fitness(tracker.get_problem()).fitness_components return all(abs(c - self.target_fitness) < 0.001 for c in comps) @@ -136,38 +139,27 @@ def __init__( if fields is not None: self.fields = fields - def register(self, - tracker: Any, - individual: Individual, - problem: Problem, - is_best=True): + def register(self, tracker: Any, individual: Individual, problem: Problem, is_best=True): if self.csv_file is None: self.csv_file = open(self.csv_file_path, "w", newline="") self.csv_writer = csv.writer(self.csv_file) if self.fields is None: self.fields = { - "Execution Time": - lambda t, i, _: - (time.monotonic_ns() - t.start_time) * 0.000000001, - "Fitness Aggregated": - lambda t, i, p: i.get_fitness(p).maximizing_aggregate, - "Phenotype": - lambda t, i, _: i.get_phenotype(), + "Execution Time": lambda t, i, _: (time.monotonic_ns() - t.start_time) * 0.000000001, + "Fitness Aggregated": lambda t, i, p: i.get_fitness(p).maximizing_aggregate, + "Phenotype": lambda t, i, _: i.get_phenotype(), } for comp in range(problem.number_of_objectives()): - self.fields[ - f"Fitness{comp}"] = lambda t, i, p: i.get_fitness( - p).fitness_components[comp] + self.fields[f"Fitness{comp}"] = lambda t, i, p: i.get_fitness(p).fitness_components[comp] if self.extra_fields is not None: for name in self.extra_fields: self.fields[name] = self.extra_fields[name] self.csv_writer.writerow([name for name in self.fields]) self.csv_file.flush() if not self.only_record_best_individuals or is_best: - self.csv_writer.writerow([ - self.fields[name](tracker, individual, problem) - for name in self.fields - ], ) + self.csv_writer.writerow( + [self.fields[name](tracker, individual, problem) for name in self.fields], + ) self.csv_file.flush() @@ -187,13 +179,15 @@ def parse_config(config_file: str, section: str) -> dict[str, Any]: def is_valid_term_literal(term_literal: Term) -> bool: - return (isinstance(term_literal, Literal) - and term_literal.type == BaseType("Int") - and isinstance(term_literal.value, int) and term_literal.value > 0) + return ( + isinstance(term_literal, Literal) + and term_literal.type == BaseType("Int") + and isinstance(term_literal.value, int) + and term_literal.value > 0 + ) -def get_csv_file_path(file_path: str, representation: type, seed: int, - hole_name: str, config_name: str) -> str | None: +def get_csv_file_path(file_path: str, representation: type, seed: int, hole_name: str, config_name: str) -> str | None: """Generate a CSV file path based on the provided file_path, representation, and seed. @@ -204,8 +198,7 @@ def get_csv_file_path(file_path: str, representation: type, seed: int, file_name = os.path.basename(file_path) name_without_extension, _ = os.path.splitext(file_name) - directory = os.path.join("csv", name_without_extension, - representation.__class__.__name__) + directory = os.path.join("csv", name_without_extension, representation.__class__.__name__) os.makedirs(directory, exist_ok=True) hole_suffix = f"_{hole_name}" if hole_name else "" @@ -241,38 +234,26 @@ def create_evaluator( holes: list[str], ) -> Callable[[classType], Any]: """Creates the fitness function for a given synthesis context.""" - fitness_decorators = [ - "minimize_int", "minimize_float", "multi_minimize_float" - ] - used_decorators = [ - decorator for decorator in fitness_decorators - if decorator in metadata[fun_name] - ] + fitness_decorators = ["minimize_int", "minimize_float", "multi_minimize_float"] + used_decorators = [decorator for decorator in fitness_decorators if decorator in metadata[fun_name]] assert used_decorators, "No fitness decorators used in metadata for function." objectives_list: list[Definition] = [ - objective for decorator in used_decorators - for objective in metadata[fun_name][decorator] + objective for decorator in used_decorators for objective in metadata[fun_name][decorator] ] programs_to_evaluate: list[Term] = [ - substitution(program, Var(objective.name), "main") - for objective in objectives_list + substitution(program, Var(objective.name), "main") for objective in objectives_list ] - def evaluate_individual(individual: classType, - result_queue: mp.Queue) -> Any: + def evaluate_individual(individual: classType, result_queue: mp.Queue) -> Any: """Function to run in a separate process and places the result in a Queue.""" try: start = time.time() first_hole_name = holes[0] individual_term = individual.get_core() # type: ignore individual_term = ensure_anf(individual_term, 10000000) - individual_type_check(ctx, program, first_hole_name, - individual_term) - results = [ - eval(substitution(p, individual_term, first_hole_name), ectx) - for p in programs_to_evaluate - ] + individual_type_check(ctx, program, first_hole_name, individual_term) + results = [eval(substitution(p, individual_term, first_hole_name), ectx) for p in programs_to_evaluate] result = results if len(results) > 1 else results[0] result = filter_nan_values(result) result_queue.put(result) @@ -282,16 +263,14 @@ def evaluate_individual(individual: classType, except Exception as e: # import traceback # traceback.print_exc() - logger.log("SYNTHESIZER", - f"Failed in the fitness function: {e}, {type(e)}") + logger.log("SYNTHESIZER", f"Failed in the fitness function: {e}, {type(e)}") result_queue.put(ERROR_FITNESS) def evaluator(individual: classType) -> Any: """Evaluates an individual with a timeout.""" assert len(holes) == 1, "Only 1 hole per function is supported now" result_queue = mp.Queue() - eval_process = mp.Process(target=evaluate_individual, - args=(individual, result_queue)) + eval_process = mp.Process(target=evaluate_individual, args=(individual, result_queue)) eval_process.start() eval_process.join(timeout=TIMEOUT_DURATION) @@ -319,68 +298,49 @@ def problem_for_fitness_function( ) -> Tuple[Problem, float | list[float]]: """Creates a problem for a particular function, based on the name and type of its fitness function.""" - fitness_decorators = [ - "minimize_int", "minimize_float", "multi_minimize_float" - ] + fitness_decorators = ["minimize_int", "minimize_float", "multi_minimize_float"] if fun_name in metadata: - used_decorators = [ - decorator for decorator in fitness_decorators - if decorator in metadata[fun_name].keys() - ] + used_decorators = [decorator for decorator in fitness_decorators if decorator in metadata[fun_name].keys()] assert used_decorators, "No valid fitness decorators found." set_error_fitness(used_decorators) - fitness_function = create_evaluator(ctx, ectx, term, fun_name, - metadata, hole_names) - problem_type = MultiObjectiveProblem if is_multiobjective( - used_decorators) else SingleObjectiveProblem + fitness_function = create_evaluator(ctx, ectx, term, fun_name, metadata, hole_names) + problem_type = MultiObjectiveProblem if is_multiobjective(used_decorators) else SingleObjectiveProblem target_fitness: float | list[float] = ( 0 if isinstance(problem_type, SingleObjectiveProblem) else 0 ) # TODO: add support to maximize decorators - return problem_type(fitness_function=fitness_function, - minimize=MINIMIZE_OBJECTIVE), target_fitness + return problem_type(fitness_function=fitness_function, minimize=MINIMIZE_OBJECTIVE), target_fitness else: - return SingleObjectiveProblem(fitness_function=lambda x: 0, - minimize=True), 0 + return SingleObjectiveProblem(fitness_function=lambda x: 0, minimize=True), 0 -def get_grammar_components(ctx: TypingContext, fun_type: Type, fun_name: str, - metadata: Metadata): - grammar_nodes, starting_node = gen_grammar_nodes(ctx, fun_type, fun_name, - metadata, []) +def get_grammar_components(ctx: TypingContext, fun_type: Type, fun_name: str, metadata: Metadata): + grammar_nodes, starting_node = gen_grammar_nodes(ctx, fun_type, fun_name, metadata, []) assert len(grammar_nodes) > 0 assert starting_node is not None, "Starting Node is None" return grammar_nodes, starting_node -def create_grammar(holes: dict[str, tuple[Type, TypingContext]], fun_name: str, - metadata: dict[str, Any]): - assert len( - holes - ) == 1, "More than one hole per function is not supported at the moment." +def create_grammar(holes: dict[str, tuple[Type, TypingContext]], fun_name: str, metadata: dict[str, Any]): + assert len(holes) == 1, "More than one hole per function is not supported at the moment." hole_name = list(holes.keys())[0] ty, ctx = holes[hole_name] - grammar_nodes, starting_node = get_grammar_components( - ctx, ty, fun_name, metadata) + grammar_nodes, starting_node = get_grammar_components(ctx, ty, fun_name, metadata) return extract_grammar(grammar_nodes, starting_node) -def random_search_synthesis(grammar: Grammar, - problem: Problem, - budget: int = 1000) -> Term: +def random_search_synthesis(grammar: Grammar, problem: Problem, budget: int = 1000) -> Term: """Performs a synthesis procedure with Random Search.""" max_depth = 5 rep = TreeBasedRepresentation(grammar, max_depth) r = RandomSource(42) population = [rep.create_individual(r, max_depth) for _ in range(budget)] - population_with_score = [(problem.evaluate(phenotype), - phenotype.get_core()) - for phenotype in population] + population_with_score = [(problem.evaluate(phenotype), phenotype.get_core()) for phenotype in population] return min(population_with_score, key=lambda x: x[0])[1] @@ -405,20 +365,19 @@ def create_gp_step(problem: Problem, gp_params: dict[str, Any]): weights=[ gp_params["n_elites"], gp_params["novelty"], - gp_params["population_size"] - gp_params["n_elites"] - - gp_params["novelty"], + gp_params["population_size"] - gp_params["n_elites"] - gp_params["novelty"], ], ) def geneticengine_synthesis( - grammar: Grammar, - problem: Problem, - filename: str | None, - hole_name: str, - target_fitness: float | list[float], - gp_params: dict[str, Any] | None = None, - ui: SynthesisUI = SilentSynthesisUI(), + grammar: Grammar, + problem: Problem, + filename: str | None, + hole_name: str, + target_fitness: float | list[float], + gp_params: dict[str, Any] | None = None, + ui: SynthesisUI = SilentSynthesisUI(), ) -> Term: """Performs a synthesis procedure with GeneticEngine.""" # gp_params = gp_params or parse_config("aeon/synthesis_grammar/gpconfig.gengy", "DEFAULT") # TODO @@ -431,34 +390,25 @@ def geneticengine_synthesis( assert isinstance(config_name, str) assert isinstance(seed, int) representation: type = representations[representation_name]( - grammar, max_depth=gp_params["max_depth"]) + grammar, decider=MaxDepthDecider(NativeRandomSource(seed), grammar, gp_params["max_depth"]) + ) tracker: ProgressTracker recorders = [] if filename: - csv_file_path = get_csv_file_path(filename, representation, seed, - hole_name, config_name) + csv_file_path = get_csv_file_path(filename, representation, seed, hole_name, config_name) recorders.append( - LazyCSVRecorder( - csv_file_path, - problem, - only_record_best_individuals=gp_params["only_record_best_inds"] - ), ) + LazyCSVRecorder(csv_file_path, problem, only_record_best_individuals=gp_params["only_record_best_inds"]), + ) if isinstance(problem, SingleObjectiveProblem): - tracker = SingleObjectiveProgressTracker( - problem, evaluator=SequentialEvaluator(), recorders=recorders) + tracker = SingleObjectiveProgressTracker(problem, evaluator=SequentialEvaluator(), recorders=recorders) else: - tracker = MultiObjectiveProgressTracker( - problem, evaluator=SequentialEvaluator(), recorders=recorders) + tracker = MultiObjectiveProgressTracker(problem, evaluator=SequentialEvaluator(), recorders=recorders) class UIBackendRecorder(SearchRecorder): - def register(self, - tracker: Any, - individual: Individual, - problem: Problem, - is_best=False): + def register(self, tracker: Any, individual: Individual, problem: Problem, is_best=False): ui.register( individual.get_phenotype().get_core(), individual.get_fitness(problem), @@ -472,11 +422,9 @@ def register(self, if target_fitness is not None: if isinstance(tracker, SingleObjectiveProgressTracker): search_budget = TargetFitness(target_fitness) - elif isinstance(tracker, MultiObjectiveProgressTracker) and isinstance( - target_fitness, list): + elif isinstance(tracker, MultiObjectiveProgressTracker) and isinstance(target_fitness, list): search_budget = TargetMultiFitness(target_fitness) - elif isinstance(tracker, MultiObjectiveProgressTracker) and isinstance( - target_fitness, (float, int)): + elif isinstance(tracker, MultiObjectiveProgressTracker) and isinstance(target_fitness, (float, int)): search_budget = TargetMultiSameFitness(target_fitness) else: assert False @@ -492,6 +440,14 @@ def register(self, step=create_gp_step(problem=problem, gp_params=gp_params), ) + # alg = RandomSearch( + # problem=problem, + # budget=budget, + # representation=representation, + # random=NativeRandomSource(seed), + # recorder=tracker, + # ) + ui.start( typing_ctx=None, evaluation_ctx=None, @@ -518,15 +474,15 @@ def set_error_fitness(decorators): def synthesize_single_function( - ctx: TypingContext, - ectx: EvaluationContext, - term: Term, - fun_name: str, - holes: dict[str, tuple[Type, TypingContext]], - metadata: Metadata, - filename: str | None, - synth_config: dict[str, Any] | None = None, - ui: SynthesisUI = SynthesisUI(), + ctx: TypingContext, + ectx: EvaluationContext, + term: Term, + fun_name: str, + holes: dict[str, tuple[Type, TypingContext]], + metadata: Metadata, + filename: str | None, + synth_config: dict[str, Any] | None = None, + ui: SynthesisUI = SynthesisUI(), ) -> Tuple[Term, dict[str, Term]]: # Step 1 Create a Single or Multi-Objective Problem instance. @@ -541,39 +497,37 @@ def synthesize_single_function( # Step 2 Create grammar object. grammar = create_grammar(holes, fun_name, metadata) + assert len(holes) == 1 hole_name = list(holes.keys())[0] # TODO Synthesis: This function (and its parent) should be parameterized with the type of search procedure # to use (e.g., Random Search, Genetic Programming, others...) # Step 3 Synthesize an element - synthesized_element = geneticengine_synthesis(grammar, problem, filename, - hole_name, target_fitness, - synth_config, ui) + synthesized_element = geneticengine_synthesis( + grammar, problem, filename, hole_name, target_fitness, synth_config, ui + ) # synthesized_element = random_search_synthesis(grammar, problem) # Step 4 Substitute the synthesized element in the original program and return it. - return substitution(term, synthesized_element, hole_name), { - hole_name: synthesized_element - } + return substitution(term, synthesized_element, hole_name), {hole_name: synthesized_element} def synthesize( - ctx: TypingContext, - ectx: EvaluationContext, - term: Term, - targets: list[tuple[str, list[str]]], - metadata: Metadata, - filename: str | None = None, - synth_config: dict[str, Any] | None = None, - refined_grammar: bool = False, - ui: SynthesisUI = SynthesisUI(), + ctx: TypingContext, + ectx: EvaluationContext, + term: Term, + targets: list[tuple[str, list[str]]], + metadata: Metadata, + filename: str | None = None, + synth_config: dict[str, Any] | None = None, + refined_grammar: bool = False, + ui: SynthesisUI = SynthesisUI(), ) -> Tuple[Term, dict[str, Term]]: """Synthesizes code for multiple functions, each with multiple holes.""" program_holes = get_holes_info(ctx, term, top, targets, refined_grammar) - assert len(program_holes) == len( - targets), "No support for function with more than one hole" + assert len(program_holes) == len(targets), "No support for function with more than one hole" results = {} @@ -583,10 +537,7 @@ def synthesize( ectx, term, name, - { - h: v - for h, v in program_holes.items() if h in holes_names - }, + {h: v for h, v in program_holes.items() if h in holes_names}, metadata, filename, synth_config, diff --git a/requirements.pip b/requirements.pip index 30e51795..847dceb3 100644 --- a/requirements.pip +++ b/requirements.pip @@ -6,7 +6,7 @@ psb2 zstandard==0.15.2 zss z3-solver -geneticengine @ https://github.com/alcides/GeneticEngine/archive/3feb0b48494f6b4352a3ed87a0fef29a98e4b4a1.zip +geneticengine @ https://github.com/alcides/GeneticEngine/archive/d5c2d3347e3052146ac45414737fa33974cb6bd2.zip textdistance loguru numpy diff --git a/tests/synth_fitness_test.py b/tests/synth_fitness_test.py index cc308751..19d6e96c 100644 --- a/tests/synth_fitness_test.py +++ b/tests/synth_fitness_test.py @@ -1,6 +1,5 @@ from __future__ import annotations -from abc import ABC from aeon.core.terms import Term, Application, Literal, Var from aeon.core.types import top, BaseType @@ -9,7 +8,6 @@ from aeon.sugar.desugar import desugar from aeon.sugar.parser import parse_program from aeon.sugar.program import Definition -from aeon.synthesis_grammar.grammar import mk_method_core_literal from aeon.synthesis_grammar.synthesizer import synthesize, gengy_default_config from aeon.typechecking.typeinfer import check_type_errors @@ -19,22 +17,6 @@ synth_config["timer_limit"] = 0.25 -def mock_literal_individual(value: int): - - class t_Int(ABC): - pass - - class literal_Int(t_Int): - value: int - - def __init__(self, value: int): - self.value = value - - literal_int_instance = mk_method_core_literal(literal_Int) # type: ignore - - return literal_int_instance(value) # type: ignore - - def test_fitness(): code = """def year : Int = 2023;