# -*- coding: utf-8 -*-
"""
Strategies define how the data will be fuzzed, duplicated, noised and altered.
"""
import logging
import random
from datafuzz.utils.noise_helpers import numpy_type_transform
from datafuzz.settings import HAS_NUMPY
if HAS_NUMPY:
import numpy as np
[docs]class Strategy(object):
""" Strategy objects apply predefined noise and fuzz to datasets.
Parameters:
dataset (`datafuzz.DataSet`): dataset to noise / alter
Kwargs:
percentage (int) : percentage to distort (0-100)
If none given, default to 30
Attributes:
dataset (`datafuzz.DataSet`): dataset to noise / alter
percentage (float) : percentage to distort (0-1)
NOTE: each strategy type may have additional required keyword arguments
see also: `duplicator.Duplicator`, `noise.NoiseMaker` and `fuzz.Fuzzer`
"""
def __init__(self, dataset, **kwargs):
self.dataset = dataset
self.type = kwargs.get('type')
if kwargs.get('percentage'):
self.percentage = kwargs.get('percentage') / 100
else:
self.percentage = .3
try:
assert 0 < self.percentage < 1
except AssertionError:
raise Exception('You must define a percentage between 1 and 100')
@property
def num_rows(self):
""" return number of rows to transform in dataset
based on given percentage.
NOTE: this uses rounding so only whole numbers are returned.
"""
if self.dataset.data_type in ['pandas', 'numpy']:
return round(
self.dataset.records.shape[0] * self.percentage)
rows = round(len(self.dataset.records) * self.percentage)
if rows == 0:
return 1
return rows
[docs] def get_numeric_columns(self, columns):
""" Ensure columns are numeric, this will get indexes
of string column names (i.e. Pandas columns or dict keys)
Arguments:
columns (list of str or int): column list
Returns:
columns (list of int): column list (only ints)
"""
if all([isinstance(c, int) or
(isinstance(c, str) and c.isnumeric()) for c in columns]):
columns = [int(c) for c in columns]
if self.dataset.data_type == 'pandas' and any([isinstance(c, str)
for c in columns]):
columns = [self.dataset.column_idx(col) for col in columns]
return columns
[docs] def apply_func_to_column(self, function, column, dataset=None):
"""
Apply a function to a column in a given dataset.
(this should work as uniformly as possible across data types)
Arguments:
function (lambda or other func): function to apply
column (int): column index
Kwargs:
dataset (`dataset.DataSet`): dataset to use
defaults to self.dataset
Returns:
None
Note: This performs transformations on `dataset.records` in place.
"""
indexes = []
if dataset is None:
dataset = self.dataset
if dataset.data_type in ['pandas', 'numpy']:
while len(indexes) == 0:
indexes = random.sample(list(range(dataset.records.shape[0])),
random.randint(1, dataset.records.shape[0]))
if dataset.data_type == 'pandas':
dataset.records.iloc[indexes, column] = \
dataset.records.iloc[indexes, column].map(function)
else:
try:
dataset.records[indexes, column] = np.apply_along_axis(
function, 0, dataset.records[indexes, column])
except (TypeError, ValueError):
try:
func = np.vectorize(function)
dataset.records[indexes, column] = np.apply_along_axis(
func, 0, dataset.records[indexes, column])
except ValueError as exc:
# Force type change.
try:
numpy_type_transform(exc, dataset)
dataset.records[indexes,
column] = np.apply_along_axis(
function, 0,
dataset.records[indexes,
column])
except ValueError:
logging.exception('Could not transform numpy type')
else:
while len(indexes) == 0:
indexes = random.sample(list(range(len(dataset.records))),
random.randint(0, len(dataset.records)))
dataset.records = [
val if idx not in indexes else
[v if i != column else function(v) for i, v in enumerate(val)]
for idx, val in enumerate(dataset.records)]
return dataset