"""**Provide NodeProcessFunction for machine learning work flows.**
Example:
.. highlight:: python
.. code-block:: python
from utensil.loopflow.functions import dataflow
from utensil.loopflow.loopflow import register_node_process_functions
register_node_process_functions(dataflow)
"""
from __future__ import annotations
import os.path
import tempfile
from dataclasses import dataclass
from typing import Any, Dict, List, Union
import requests
import urllib3
from utensil import get_logger
from utensil.general import warn_left_keys
from utensil.loopflow.functions.basic import MISSING
from utensil.loopflow.loopflow import NodeProcessFunction
try:
import numpy as np
import pandas as pd
except ImportError as e:
raise e
logger = get_logger(__name__)
[docs]class Feature(pd.Series):
"""A feature of a dataset.
:class:`.Feature` is an individual measurable property or characteristic
of a phenomenon. It can be a list of numbers, strings with or without
missing values. The length of a feature (missing values included) should
be the number of instance in a dataset.
"""
[docs]class Features(pd.DataFrame):
"""A list of features.
:class:`.Features` is a list of :class:`.Feature`. It can be represented as
a matrix of numbers, strings, missing values, etc.
"""
[docs]class Target(pd.Series):
"""The target of a dataset.
:class:`.Target` is whatever the output of the input variables. Typically,
it is the variables a supervised model trying to learn to predict,
either numerical or categorical.
"""
[docs]@dataclass
class Dataset:
"""A dataset used to train a model or to let a model predict its target.
A pair of :class:`.Target` and :class:`.Features`. For supervised case, to
train or to score a model, use both of target and features; to predict
only, use only the features. The length of target should be identical to
the length of every feature of features, i.e., the number of instances.
>>> dataset = Dataset(
... Target(np.random.randint(2, size=3)),
... Features(np.random.random(size=(3, 4)))
... )
>>> dataset.nrows
3
>>> dataset.ncols
4
>>> bad_dataset = Dataset(
... Target(np.random.randint(2, size=2)),
... Features(np.random.random(size=(3, 4)))
... )
>>> bad_dataset.nrows
Traceback (most recent call last):
...
ValueError: rows of target and that of features should be the same
"""
target: Target
"""The target of the dataset."""
features: Features
"""The features of the dataset."""
@property
def nrows(self):
"""Number of rows/instances."""
if self.target.shape[0] != self.features.shape[0]:
raise ValueError(
"rows of target and that of features should be the same")
return self.target.shape[0]
@property
def ncols(self):
"""Number of columns/features."""
return self.features.shape[1]
[docs]class Model:
"""A base model class to be trained and to predict target based on a
dataset.
Before calling :meth:`.Model.train`, the model is untrained and should
not be used to predict. After that, :meth:`.Model.predict` can be called to
predict the :class:`.Target` of :class:`.Features`.
"""
[docs] def train(self, dataset: Dataset) -> Model:
"""Train a model.
Use ``self`` as a base model to train on ``dataset`` for a trained
model.
*Should be overridden by subclass for implementation.*
>>> Model().train(Dataset(
... Target(np.random.randint(2, size=3)),
... Features(np.random.random(size=(3, 4)))
... ))
Traceback (most recent call last):
...
NotImplementedError
Args:
dataset (:class:`.Dataset`): dataset to be trained on.
Returns:
A trained :class:`.Model`.
"""
raise NotImplementedError
[docs] def predict(self, features: Features) -> Target:
"""Predict the target.
Model returned from :meth:`.Model.train` can predict for
:class:`.Target` on a given :class:`.Features`.
*Should be overridden by subclass for implementation.*
>>> Model().predict(Features(np.random.random(size=(3, 4))))
Traceback (most recent call last):
...
NotImplementedError
Args:
features (:class:`.Features`): used to predicted :class:`.Target`.
Returns:
The prediction of :class:`.Target`.
"""
raise NotImplementedError
[docs]class SklearnModel(Model):
"""A wrapper for ``sklearn`` models.
>>> from sklearn.linear_model import LinearRegression
>>> model = SklearnModel(LinearRegression())
>>> target = Target([1, 2, 3])
>>> features = Features([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]])
>>> model = model.train(Dataset(target, features))
>>> model.predict(features + 1)
0 1.25
1 2.25
2 3.25
dtype: float64
"""
def __init__(self, model):
"""
Args:
model (``sklearn`` model):
"""
self._model = model
[docs] def train(self, dataset: Dataset) -> Model:
"""Train a model.
Use ``self`` as a base model to train on ``dataset`` for a trained
model. Typically the ``fit`` method of the ``sklearn`` model is used.
Args:
dataset (:class:`.Dataset`): dataset to be trained on.
Returns:
A trained :class:`.Model`.
"""
model = self._model.fit(dataset.features, dataset.target)
return SklearnModel(model)
[docs] def predict(self, features: Features) -> Target:
"""Predict the target.
Model returned from :meth:`.Model.train` can
predict for :class:`.Target` on a given :class:`.Features`.
Typically the ``predict`` method of the ``sklearn`` model is used.
Args:
features (:class:`.Features`): used to predicted :class:`.Target`.
Returns:
The prediction of :class:`.Target`.
"""
return Target(self._model.predict(features))
[docs]class LoadData(NodeProcessFunction):
"""Load a dataset from an URL.
URL can be a path. Data format can be ``SVMLIGHT``.
Attributes:
dformat (str): Data format. Valid options are ``SVMLIGHT``.
.. todo::
More format are needed
#. CSV
#. HDF5
url (str): URL for the dataset. Should be a path or a url with the
scheme `http`, `https` or `file`.
.. todo::
More types are needed
#. sklearn data.
target (str): The column of the dataset treated as a target.
features (dict[int, str]):
A mapping from 0-index of column to its name. This is useful when
the dataset itself does not contain its own column names,
for example, ``svmlight`` format.
"""
def __init__(self, dformat: str, url: str, target: str,
features: Dict[int, str]):
super().__init__()
self.dformat = dformat
self.url = url
self.target = target
self.features = features
[docs] def main(self) -> Dataset:
if self.dformat == "SVMLIGHT":
parsed_url = urllib3.util.parse_url(self.url)
from sklearn import datasets
if parsed_url.scheme in ('http', 'https'):
r = requests.get(self.url)
with tempfile.NamedTemporaryFile('wb',
suffix=os.path.basename(
parsed_url.path)) as tmp:
tmp.write(r.content)
tmp.flush()
features, target, *_ = datasets.load_svmlight_file(tmp.name)
elif parsed_url.scheme in (None, 'file'):
features, target, *_ = datasets.load_svmlight_file(
parsed_url.path)
else:
raise ValueError(f'scheme "{parsed_url.scheme}" cannot be '
f'recognized in {self.url}')
else:
raise ValueError(
f'data format "{self.dformat}" cannot be recognized')
features = (pd.DataFrame.sparse.from_spmatrix(
features).loc[:,
self.features.keys()].rename(columns=self.features))
target = pd.Series(target, name=self.target)
return Dataset(Target(target), Features(features))
[docs]class FilterRows(NodeProcessFunction):
"""Filter rows of :class:`.Dataset`.
Filter rows of dataset by the value of its :class:`.Target`.
Attributes:
filter_by (dict[str, Any]): Indicate to filter by which
column with what values. Typical usage is to filter ``TARGET`` with
a list of values. For example, ``filter_by={"TARGET": [1, 2]}``
filters the target column to only contains 1 or 2.
"""
def __init__(self, filter_by: Dict[str, Any]):
super().__init__()
self.filter_by = filter_by
[docs] def main(self, dataset: Dataset) -> Dataset:
"""
Args:
dataset (:class:`.Dataset`): the dataset to be filtered.
Returns:
A filtered dataset.
"""
idx = dataset.target.index
for filter_key, filter_val in self.filter_by.items():
if filter_key == "TARGET":
idx = idx.intersection(
dataset.target.index[dataset.target.isin(filter_val)])
else:
raise ValueError(
f'filter key "{filter_key}" cannot be recognized')
dataset.target = dataset.target.loc[idx]
dataset.features = dataset.features.loc[idx]
return dataset
[docs]class SamplingRows(NodeProcessFunction):
"""Sampling rows of a dataset.
This method samples a dataset to a specific number of rows or to a ratio.
Attributes:
number (`int`) :
Sampled dataset will have this many rows. Suppressed by `ratio`.
ratio (`float`, default `1.0` if `number` is not set) :
Sampled dataset will have `ratio * dataset.nrows` rows.
Suppressing `number`.
stratified (`bool`, default `False`) :
If `True`, the dataset will be sampled using a stratified manner.
That is, there will be same number of rows for each category of
the dataset target, if possible.
replace (`bool`, default `False`) :
If `True`, the dataset will be sampled with replacement and a row
may be selected multiple times. Will raise an exception if
`replace` is set to `False` and `number` larger than
`dataset.nrows` or `ratio` larger than `1`.
random_seed (`None` or `int`, default `None`) :
Random seed used to sample the dataset. It is used to set
`numpy.random.BitGenerator`. See
`Numpy Documentation
<https://numpy.org/doc/stable/reference/random/bit_generators/generated/numpy.random.BitGenerator.html#numpy.random.BitGenerator>`_
for more information.
return_rest (`bool`, default `False`) :
If `False`, only the sampled dataset is returned.
If `True`, this method will return a dictionary of two datasets,
.. highlight:: python
.. code-block:: python
{
'sampled': sampled_dataset,
'rest': rest_dataset,
}
`rest_dataset` contains all rows not in `sampled_dataset`.
.. note::
Even if `sampled_dataset` is sampled with replacement,
`rest_dataset` does not contain duplicated rows.
"""
def __init__(
self,
number: int = MISSING,
ratio: float = MISSING,
stratified: bool = False,
replace: bool = False,
random_seed: Union[None, int] = None,
return_rest: bool = False,
):
super().__init__()
self.number = number
self.ratio = ratio
self.stratified = stratified
self.replace = replace
self._rng = np.random.default_rng(random_seed)
self.return_rest = return_rest
if self.ratio is MISSING and self.number is MISSING:
self.ratio = 1.0
def _get_number_each(self, value_counts: pd.Series, ttl_number: int):
if self.replace or ttl_number // value_counts.shape[
0] <= value_counts.min():
number_each = (pd.Series(0, index=value_counts.index) +
ttl_number // value_counts.shape[0])
categories = number_each.index.to_numpy(copy=True)
self._rng.shuffle(categories)
residual = ttl_number - number_each.sum()
number_each += pd.Series({
cat: 1 if i < residual else 0
for i, cat in enumerate(categories)
})
else:
number_each = pd.Series(value_counts.min(),
index=value_counts.index)
residual = self._get_number_each(
value_counts.drop(labels=value_counts.idxmin()) -
value_counts.min(),
ttl_number - np.sum(number_each),
)
number_each += (
residual +
pd.Series(0, index=number_each.index)).fillna(0).astype(int)
return number_each
[docs] def main(self, dataset: Dataset) -> Union[Dataset, Dict[str, Dataset]]:
"""
Args:
dataset (:class:`.Dataset`): the dataset to be sampled.
Returns:
A sampled dataset or a dictionary of the `sampled` dataset and
the `rest` dataset.
"""
ttl_number = (int(self.ratio * dataset.nrows)
if self.ratio is not MISSING else self.number)
if not self.replace and ttl_number > dataset.nrows:
raise ValueError(
"sampling number should at most the same size as the "
"dataset "
'when "replace" is set False')
if self.stratified:
value_counts = dataset.target.value_counts()
number_each = self._get_number_each(value_counts, ttl_number)
selected_idx = []
for cat, idx in dataset.target.groupby(
dataset.target).groups.items():
selected_idx.append(
self._rng.choice(idx,
number_each[cat],
replace=self.replace))
selected_idx = np.concatenate(selected_idx)
imap = {idx: i for i, idx in enumerate(dataset.target.index)}
selected_idx = np.array(sorted(selected_idx, key=imap.__getitem__))
new_target = dataset.target.loc[selected_idx]
else:
new_target = dataset.target.sample(
n=ttl_number,
replace=self.replace,
random_state=self._rng.bit_generator)
new_features = dataset.features.loc[new_target.index]
if self.return_rest:
rest_index = dataset.target.index.difference(new_target.index)
rest_target = dataset.target.loc[rest_index]
rest_features = dataset.features.loc[rest_index]
return {
"sampled": Dataset(Target(new_target), Features(new_features)),
"rest": Dataset(Target(rest_target), Features(rest_features)),
}
return Dataset(Target(new_target), Features(new_features))
[docs]class MakeDataset(NodeProcessFunction):
"""Make a dataset using `target` and `features`."""
[docs] def main(self, target: Target, features: Features) -> Dataset:
"""
Args:
target (:class:`.Target`): the input target.
features (:class:`.Features`): the input features.
Returns:
A `dataset` consisted of `target` and `features`.
"""
return Dataset(target, features)
[docs]class GetTarget(NodeProcessFunction):
"""Get `target` from a `dataset`."""
[docs] def main(self, dataset: Dataset) -> Target:
"""
Args:
dataset (:class:`.Dataset`): get `target` from this `dataset`.
Returns:
The `target` of the `dataset`.
"""
return dataset.target
[docs]class GetFeature(NodeProcessFunction):
"""Get `feature` from a `dataset` with a given name.
Attributes:
feature (str):
This `feature` will be retrieved from the `dataset`.
"""
def __init__(self, feature: str):
super().__init__()
self.feature = feature
[docs] def main(self, dataset: Dataset) -> Feature:
"""
Args:
dataset (:class:`.Dataset`): get `feature` from this `dataset`.
Returns:
The `feature` with the given name of the `dataset`.
"""
return Feature(dataset.features.loc[:, self.feature])
[docs]class MergeFeatures(NodeProcessFunction):
"""Merge a list of `feature` to `features`."""
[docs] def main(self, *features: Feature) -> Features:
"""
Args:
*features (list of :class:`.Feature`): list of feature to be merged.
Returns:
A :class:`.Features` object contains the list `features`.
"""
return Features(pd.concat([pd.Series(f) for f in features], axis=1))
def _is_number(n):
return pd.api.types.is_number(n)
def _get_max(arr):
if pd.api.types.is_sparse(arr):
if arr.sparse.sp_values.shape[0] < arr.shape[0]:
return max(arr.sparse.fill_value, np.max(arr.sparse.sp_values))
return np.max(arr.sparse.sp_values)
return np.max(arr)
def _get_min(arr):
if pd.api.types.is_sparse(arr):
if arr.sparse.sp_values.shape[0] < arr.shape[0]:
return min(arr.sparse.fill_value, np.min(arr.sparse.sp_values))
return np.min(arr.sparse.sp_values)
return np.min(arr)
[docs]class LinearNormalize(NodeProcessFunction):
"""Perform linear normalization of a 1d array.
Linearly maps the given array from range ``(u1, l1)`` to ``(u2, l2)``.
Attributes:
upper (`dict` of ``FROM`` and ``TO``, default both ``MAX``):
Sets ``u1=upper["FROM"]`` and ``u2=upper["TO"]``. ``u*`` should
be a number or a string, ``MAX`` or ``MIN``. ``MAX`` means the
maximum of the array, and ``MIN`` means the minimum of the array.
lower (`dict` of ``FROM`` and ``TO``, default both ``MIN``):
Sets ``l1=lower["FROM"]`` and ``l2=lower["TO"]``. ``l*`` should
be a number or a string, ``MAX`` or ``MIN``. ``MAX`` means the
maximum of the array, and ``MIN`` means the minimum of the array.
"""
def __init__(self,
upper: Dict[str, Any] = None,
lower: Dict[str, Any] = None):
super().__init__()
self.upper = {"FROM": "MAX", "TO": "MAX"}
self.upper.update({} if upper is None else upper)
self.lower = {"FROM": "MIN", "TO": "MIN"}
self.lower.update({} if lower is None else lower)
@staticmethod
def _compile_limit(cmd, arr1d):
if isinstance(cmd, str):
if cmd == "MAX":
return _get_max(arr1d)
if cmd == "MIN":
return _get_min(arr1d)
raise ValueError(f'command "{cmd}" cannot be recognized')
if _is_number(cmd):
return cmd
raise ValueError(f'Expecting str or number, got {type(cmd).__name__}')
[docs] def main(self, arr1d: np.ndarray) -> np.ndarray:
hifrom = self._compile_limit(self.upper.pop("FROM"), arr1d)
hito = self._compile_limit(self.upper.pop("TO"), arr1d)
lofrom = self._compile_limit(self.lower.pop("FROM"), arr1d)
loto = self._compile_limit(self.lower.pop("TO"), arr1d)
if hifrom == lofrom:
if hito == loto:
return arr1d
raise ValueError(f'Cannot map a single value ({hifrom}) to '
f'a different values ({hito}, {loto})')
return arr1d * (hito - loto) / (hifrom - lofrom) + loto
[docs]class MakeModel(NodeProcessFunction):
"""Make an untrained model.
Attributes:
method (str) : the model will use this method to train. Options are
``XGBOOST_REGRESSOR``, ``XGBOOST_CLASSIFIER``.
"""
def __init__(self, method):
super().__init__()
self.method = method
@staticmethod
def _after_assign_params_routine(_from, _to):
for k, v in list(_to.items()):
if v is MISSING:
del _to[k]
warn_left_keys(_from)
[docs] def main(self, model_params: Dict[str, Any]) -> Model:
"""
Args:
model_params (dict):
The parameters to create the model. Based on the `method`,
different parameters can be set.
* ``XGBOOST_REGRESSOR``:
See more details in `XGBoost documentation
<https://xgboost.readthedocs.io/en/latest/parameter.html>`_
* ``learning_rate``
* ``max_depth``
* ``n_estimators``
* ``XGBOOST_CLASSIFIER``:
See more details in `XGBoost documentation
<https://xgboost.readthedocs.io/en/latest/parameter.html>`_
* ``learning_rate``
* ``max_depth``
* ``n_estimators``
* ``SKLEARN_GRADIENT_BOOSTING_CLASSIFIER``:
See more details in `Scikit Learn documentation
<https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.GradientBoostingClassifier.html>`_
* ``learning_rate``
* ``max_depth``
* ``n_estimators``
Returns:
An untrained :class:`.Model`.
"""
if self.method == "XGBOOST_REGRESSOR":
import xgboost
_model_params = {
"learning_rate": model_params.pop("LEARNING_RATE", MISSING),
"max_depth": model_params.pop("MAX_DEPTH", MISSING),
"n_estimators": model_params.pop("N_ESTIMATORS", MISSING),
}
self._after_assign_params_routine(model_params, _model_params)
return SklearnModel(
xgboost.XGBRegressor(**_model_params, use_label_encoder=True))
if self.method == "XGBOOST_CLASSIFIER":
import xgboost
_model_params = {
"learning_rate": model_params.pop("LEARNING_RATE", MISSING),
"max_depth": model_params.pop("MAX_DEPTH", MISSING),
"n_estimators": model_params.pop("N_ESTIMATORS", MISSING),
}
self._after_assign_params_routine(model_params, _model_params)
return SklearnModel(
xgboost.XGBClassifier(**_model_params, use_label_encoder=True))
if self.method == "SKLEARN_GRADIENT_BOOSTING_CLASSIFIER":
from sklearn import ensemble
_model_params = {
"learning_rate": model_params.pop("LEARNING_RATE", MISSING),
"max_depth": model_params.pop("MAX_DEPTH", MISSING),
"n_estimators": model_params.pop("N_ESTIMATORS", MISSING),
}
self._after_assign_params_routine(model_params, _model_params)
return SklearnModel(
ensemble.GradientBoostingClassifier(**_model_params))
raise ValueError(f'method "{self.method}" cannot be recognized')
[docs]class Train(NodeProcessFunction):
"""Train a model."""
[docs] def main(self, model: Model, dataset: Dataset) -> Model:
"""
Args:
model (:class:`.Model`):
The model to be trained.
dataset (:class:`.Dataset`):
The dataset to be trained on.
Returns:
A trained :class:`.Model`.
"""
model = model.train(dataset)
return model
[docs]class Predict(NodeProcessFunction):
"""Predict a target."""
[docs] def main(self, model: Model, features: Features) -> Target:
"""
Args:
model (:class:`.Model`):
The prediction is from this model.
features (:class:`.Features`):
The features used for prediction.
Returns:
A :class:`.Target` based on the `model` and `features`. The
length of the `target` is identical to the number of rows of the
`features`.
"""
return model.predict(features)
[docs]class ParameterSearch(NodeProcessFunction):
"""Random search the model parameters.
See more in :class:`utensil.random_search`.
Attributes:
init_state (int, default 0)
seed (int, default 0)
search_map (dict, default `None`)
"""
def __init__(self, init_state=0, seed: int = 0, search_map: Dict = None):
super().__init__()
from utensil import random_search
self.state = init_state
self._nr_randomized_params = 0
self._rng = np.random.default_rng(seed)
self._seed_gen = self._generate_seed()
self._search_map = {}
search_map = {} if search_map is None else search_map
for param_name, search_method in search_map.items():
if isinstance(search_method, dict):
if len(search_method) != 1:
raise ValueError
search_type, search_option = search_method.popitem()
self._search_map[param_name] = (
random_search.RandomizedParam.create_randomized_param(
search_type, search_option))
self._nr_randomized_params += 1
else:
self._search_map[param_name] = search_method
def _random_between(self, a, b, **kwargs):
return self._rng.random(**kwargs) * (b - a) + a
def _generate_seed(self):
rand_space = []
while True:
base = 2**int(np.log2(self.state + 1))
offset = self.state + 1 - base
if offset == 0 or len(rand_space) == 0:
linspace = np.linspace(0, 1, base + 1)
rand_space = np.array([
self._random_between(
linspace[i],
linspace[i + 1],
size=self._nr_randomized_params,
) for i in range(base)
])
for i in range(self._nr_randomized_params):
self._rng.shuffle(rand_space[:, i])
model_r = tuple(rand_space[offset])
yield self.state, model_r
self.state += 1
[docs] def main(self):
"""
Returns:
Next randomly generated parameters.
"""
from utensil import random_search
state, r_list = next(self._seed_gen)
if len(r_list) != self._nr_randomized_params:
raise ValueError
r = iter(r_list)
params = {}
for k, v in self._search_map.items():
if isinstance(v, random_search.RandomizedParam):
params[k] = v.from_random(next(r))
else:
params[k] = v
return params
[docs]class Score(NodeProcessFunction):
"""Calculate scores of a model, based on its prediction and a ground truth.
Attributes:
dataset (str):
The name of the dataset. It is used to generate an informative
output.
methods (str or list of str):
The method or a list of methods to score a model. Options are
``ACCURACY``.
"""
def __init__(self,
dataset: str = MISSING,
methods: Union[str, List[str]] = None):
super().__init__()
self.dataset_name = dataset
self.methods = [] if methods is None else methods
if isinstance(self.methods, str):
self.methods = [self.methods]
[docs] def main(
self,
prediction: Union[Target, Features, Dataset],
ground_truth: Union[Target, Dataset],
model: Model,
):
"""
Args:
prediction (`target`, `features` or `dataset`):
If `prediction` is :class:`.Target`, it will be directly used
to calculate the score without using the `model`.
If it is :class:`.Features`, `model` will make a prediction
based on it.
If it is :class:`.Dataset`, `model` will make a prediction
based on its `features`.
ground_truth (`target` or `dataset`):
If `ground_truth` is a :class:`.Target`, it is directly
compared to `prediction`.
If `ground_truth` is a :class:`.Dataset`, its `target` is
compared to `prediction`.
model (:class:`.Model`):
The `model` to be scored.
.. note::
If `prediction` is :class:`.Target`, then the `model` is
not used.
Returns:
A list of scoring results. A scoring result is consisted of two
or three attributes, the scoring method name, the dataset name (
if provided), and the score.
For example:
.. highlight:: python
.. code-block:: python
# if dataset name is 'MNIST'
[
('ACCURACY', 'MNIST', 0.812641),
('FSCORE', 'MNIST', 0.713278),
]
# if dataset name is not provided
[
('ACCURACY', 0.812641),
('FSCORE', 0.713278),
]
"""
if isinstance(prediction, Target):
pass
elif isinstance(prediction, Features):
prediction = model.predict(prediction)
elif isinstance(prediction, Dataset):
prediction = model.predict(prediction.features)
else:
raise TypeError
if isinstance(ground_truth, Target):
pass
elif isinstance(ground_truth, Dataset):
ground_truth = ground_truth.target
else:
raise TypeError
def wrap_output(_method, _score):
if self.dataset_name is MISSING:
return _method, _score
return _method, self.dataset_name, _score
ret = []
for method in self.methods:
if method == "ACCURACY":
ret.append(
wrap_output(
method,
np.sum(prediction.values == ground_truth.values) /
prediction.shape[0],
))
return ret
[docs]class ChangeTypeTo(NodeProcessFunction):
"""Change the type of a given `arr`.
Attributes:
to_type (str):
The `arr` will be this type. Options are ``INTEGER``, ``FLOAT``.
"""
def __init__(self, to_type: str):
super().__init__()
if to_type == "INTEGER":
self.to_type = int
elif to_type == "FLOAT":
self.to_type = float
else:
raise RuntimeError("E22")
[docs] def main(self, arr: Union[Feature, Target]):
"""
Args:
arr (:class:`.Feature` or :class:`.Target`):
The type of this will be changed.
Returns:
The `arr` with type changed to `to_type`.
"""
return arr.astype(self.to_type)