Source code for utensil.random_search._random_search

from __future__ import annotations

import abc
import hashlib
import os
from collections import namedtuple
from copy import deepcopy
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, Tuple, Union

from utensil.general.logger import DUMMY_LOGGER

try:
    import numpy as np
    import pandas as pd
except ImportError as e:
    raise e


[docs]class RandomizedParam(abc.ABC):
[docs] @abc.abstractmethod def from_random(self, r): raise NotImplementedError
[docs] @classmethod def create_randomized_param(cls, param_type, options) -> RandomizedParam: if param_type == "EXPONENTIAL_BETWEEN": _option = { "left": options.pop("LEFT"), "right": options.pop("RIGHT"), "otype": options.pop("TYPE", float), } if _option["otype"] == "INTEGER": _option["otype"] = int elif _option["otype"] == "FLOAT": _option["otype"] = float return ExponentialBetweenParam(**_option) if param_type == "UNIFORM_BETWEEN": _option = { "left": options.pop("LEFT"), "right": options.pop("RIGHT"), "otype": options.pop("TYPE", float), } if _option["otype"] == "INTEGER": _option["otype"] = int elif _option["otype"] == "FLOAT": _option["otype"] = float return UniformBetweenParam(**_option) raise ValueError
[docs]@dataclass class BooleanParam(RandomizedParam):
[docs] def from_random(self, r): return r > 0.5
[docs]@dataclass class UniformBetweenParam(RandomizedParam): left: Any right: Any otype: type
[docs] def from_random(self, r): return self.otype(r * (self.right - self.left) + self.left)
[docs]@dataclass class ExponentialBetweenParam(RandomizedParam): left: Any right: Any otype: type
[docs] def from_random(self, r): log_right = np.log(self.right) log_left = np.log(self.left) return self.otype(np.exp(r * (log_right - log_left) + log_left))
[docs]@dataclass(init=False) class RandomizedChoices(RandomizedParam): choice: Tuple[Enum] def __init__(self, *args: Enum): self.choice = args
[docs] def from_random(self, r): nr_choices = len(self.choice) return list(self.choice)[-1 if r == 1 else int(r * nr_choices)]
[docs]@dataclass class RandomizedDispatcher: key_names: Union[str, Tuple[str]] dispatch: Dict[Any, RandomizedConfig]
[docs]class RandomizedConfig(abc.ABC):
[docs] def get_config(self, model_id, seed=0) -> Tuple[Dict[str, Any], RandomizedConfig]: kwargs = {} dispatchers = {} params = {} model_r = {} for k, v in vars(self).items(): if isinstance(v, RandomizedParam): params[k] = v elif isinstance(v, RandomizedDispatcher): dispatchers[k] = v kwargs[k] = v else: kwargs[k] = v model_r[k] = None base = 2**int(np.log2(model_id + 1)) offset = model_id + 1 - base sd = int.from_bytes( hashlib.sha256(str(seed + base).encode()).digest()[:4], "big") rng = np.random.default_rng(sd) linspace = np.linspace(0, 1, base + 1) rand_space = rng.random(size=(base, len(params))) * ( linspace[1] - linspace[0]) + linspace[:-1].reshape(-1, 1) for i in range(len(params)): rng.shuffle(rand_space[:, i]) for (k, v), r in zip(params.items(), rand_space[offset]): model_r[k] = r kwargs[k] = v.from_random(r) model_c = self.__class__(**kwargs) for var_name, dispatcher in dispatchers.items(): if isinstance(dispatcher.key_names, str): key = vars(model_c)[dispatcher.key_names] else: key = tuple(vars(model_c)[kn] for kn in dispatcher.key_names) r, vars(model_c)[var_name] = dispatcher.dispatch[key].get_config( model_id, seed=seed) model_r[var_name] = r return model_r, model_c
[docs] def to_dict(self) -> Dict[str, Any]: d = {} for k, v in vars(self).items(): if isinstance(v, RandomizedConfig): d[k] = v.to_dict() else: d[k] = v return d
[docs] def to_plain_dict(self, sep=":") -> Dict[str, Any]: d = {} for k, v in vars(self).items(): if isinstance(v, RandomizedConfig): for vk, vv in v.to_plain_dict().items(): d[f"{k}{sep}{vk}"] = vv else: d[k] = v return d
[docs]@dataclass class SeededConfig: cid: int base_seed: int seed_r: Dict[str, Any] config: RandomizedConfig config_temp: RandomizedConfig
[docs] @classmethod def from_config_template(cls, config_temp: RandomizedConfig, model_id: int, seed: int): seed_r, config = config_temp.get_config(model_id=model_id, seed=seed) return cls( cid=model_id, base_seed=seed, seed_r=seed_r, config=config, config_temp=config_temp, )
ModelScore = namedtuple("ModelScore", ["model", "score"])
[docs]class RandomSearch(abc.ABC): def __init__(self, logger=None): self.logger = DUMMY_LOGGER if logger is None else logger
[docs] @abc.abstractmethod def get_xy(self, tr_path, te_path) -> Tuple[pd.DataFrame, pd.DataFrame]: raise NotImplementedError
[docs] @abc.abstractmethod def model_scores_to_csv(self, model_scores) -> None: raise NotImplementedError
[docs] @abc.abstractmethod def do_training(self, sd_config: SeededConfig, train_x, train_y, idx) -> ModelScore: raise NotImplementedError
[docs] def train(self, tr_path, te_path, config_temp, model_id_range=None, seed=0): if model_id_range is None: model_id_range = range(10) x, y = self.get_xy(tr_path, te_path) te_x = x[y.isna()] train_x = x[~y.isna()] train_y = y[~y.isna()] rng = np.random.default_rng(seed) idx = np.arange(train_x.shape[0]) rng.shuffle(idx) if not os.path.exists("submit"): os.mkdir("submit") model_scores = {} best_model = None for mid in model_id_range: self.logger.info(f"model_id={mid}: initialize") sd_config = SeededConfig.from_config_template( config_temp=config_temp, model_id=mid, seed=seed) model, score = None, None try: model, score = self.do_training(sd_config, train_x, train_y, idx) except ValueError as train_error: self.logger.warning( f"model_id={sd_config.cid}: " f"invalid config for model_id={sd_config.cid}, seed={seed}" f"({train_error})", stack_info=True, ) self.logger.info(f"model_id={sd_config.cid}: score={score}") # record model_id, model_config and score model_scores[sd_config.cid] = sd_config.config.to_plain_dict() assert "score" not in model_scores[sd_config.cid] model_scores[sd_config.cid]["score"] = score self.model_scores_to_csv(model_scores) # keep the best model if score is not None and (best_model is None or best_model[0] < score): best_model = (score, deepcopy(model), sd_config) # use the current best model to generate csv for submission te_xpy = np.empty(shape=(te_x.shape[0], 2), dtype=int) te_xpy[:, 0] = np.arange(te_x.shape[0]) + 1 te_xpy[:, 1] = best_model[1].predict(te_x) submit_path = os.path.join( "submit", f"{__name__}_{int(np.round(score * 1e5))}_" f"{sd_config.cid:04d}.csv", ) pd.DataFrame(te_xpy, columns=["ImageId", "Label"]).to_csv(submit_path, index=False) if best_model is not None: self.logger.info( f"model_id={sd_config.cid}: " f"current best model_id={best_model[2].cid}, " f"score={best_model[0]}, " f"config={best_model[2].config.to_plain_dict()}") return best_model, model_scores