# -*- coding: utf-8 -*-
"""
YAML and CLI parsers
"""
import argparse
import json
import logging
from datetime import datetime
import yaml
from datafuzz.parsers.helpers import generate_from_parser, fuzz_from_parser
class BaseYAMLParser:
""" Base YAML Parser class
Parameters:
file_path (str): file to parse
Attributes:
REQUIRED_FIELDS (dict): dictionary of required fields
file_path (str): file to parse
"""
REQUIRED_FIELDS = {}
def __init__(self, file_path):
self.file_path = file_path
self.parse()
def parse(self):
""" Parse the file and validate the parsed YAML
raises SyntaxError if bad YAML
"""
with open(self.file_path, 'r') as myf:
try:
self.parsed = yaml.safe_load(myf)
except yaml.YAMLError:
logging.exception('Error loading YAML file')
raise SyntaxError('Invalid YAML! Please correct your syntax.')
self.validate_yaml()
def validate_yaml(self):
""" Ensure all required fields are parsed
and exist
Note: Uses dictionary self.REQUIRED_FIELDS
raises SyntaxError if fields missing
"""
for section, fields in self.REQUIRED_FIELDS.items():
try:
values = self.parsed.get(section)
assert values is not None
if isinstance(values, dict):
assert all([values.get(f) for f in fields])
elif isinstance(values, list):
for value_row in values:
assert set(fields).issubset(set(value_row.keys()))
except AssertionError:
raise SyntaxError(
'Your YAML file does not have all required fields. ' +
'Required fields: {}'.format(self.REQUIRED_FIELDS)
)
[docs]class StrategyYAMLParser(BaseYAMLParser):
""" Strategy YAML Parser is used
to parse strategies and fuzz / transform
data using a simple YAML definition.
see also `parsers.core.BaseYAMLParser`
"""
REQUIRED_FIELDS = {
'data': ['input', 'output'],
'strategies': ['type', 'percentage']
}
@property
def strategies(self):
""" Return strategies from parsed YAML """
return self.parsed.get('strategies')
@property
def input(self):
""" Return data input from parsed YAML """
return self.parsed.get('data').get('input')
@property
def output(self):
""" Return data output from parsed YAML """
return self.parsed.get('data').get('output')
@property
def db_uri(self):
""" Return data db_uri from parsed YAML """
return self.parsed.get('data').get('db_uri')
@property
def table(self):
""" Return data table from parsed YAML """
return self.parsed.get('data').get('table')
@property
def query(self):
""" Return data query from parsed YAML """
return self.parsed.get('data').get('query')
[docs] def execute(self):
""" Execute strategies from parsed YAML """
return fuzz_from_parser(self)
[docs]class StrategyCLIParser:
""" Strategy YAML CLI is used
to parse strategies and fuzz / transform
data using a simple CLI definition.
"""
REQUIRED_FIELDS = ['output', 'input', 'strategies']
def __init__(self, **kwargs):
""" Parse arguments for fuzzing data
Attributes:
input (str): filename or "sql" for input data
output (str): filename or "sql" for output data
strategies (str): dict outlining strategies for noise,
duplication, etc
db_uri (str): if using database (input or out),
database uri to connect
query (str): if using database input, query to execute
table (str): if using database output,
table name to insert
Note: strategies should have all required fields
see `strategy.Strategy`
"""
self.input = kwargs.get('input')
self.output = kwargs.get('output')
self.strategies = kwargs.get('strategies')
self.db_uri = kwargs.get('db_uri')
self.query = kwargs.get('query')
self.table = kwargs.get('table')
self.parser = self.init_parser()
[docs] def validate_arguments(self):
""" Validate that all required fields are submitted """
for section in self.REQUIRED_FIELDS:
assert getattr(self, section) is not None
[docs] def init_parser(self):
""" Initialize parser with required and optional arguments
Returns:
argparse.ArgumentParser
"""
parser = argparse.ArgumentParser(
description='Apply datafuzz strategies to input, return output')
parser.add_argument('run', choices=['run'], default='run')
parser.add_argument('-i', '--input', type=str,
help='input string (filename or sql)')
parser.add_argument('-o', '--output', type=str,
help='input string (filename or sql)')
parser.add_argument('-s', '--strategies', type=json.loads,
help='dictionary defining the strategies to take')
parser.add_argument('--db_uri',
type=str,
help='If using database, the db URI to connect')
parser.add_argument('--query',
type=str,
help='If using db input, query to collect data')
parser.add_argument('--table', type=str,
help='If using db output, table to insert into')
return parser
[docs] def parse_args(self, argv=None):
""" Parse arguments and validate them
Kwargs:
argv (sys.argv or similar list)
"""
args = self.parser.parse_args(argv)
self.input = args.input
self.output = args.output
self.strategies = args.strategies
if not isinstance(self.strategies, list):
self.strategies = [self.strategies]
self.db_uri = args.db_uri
self.query = args.query
self.table = args.table
self.validate_arguments()
[docs] def print_help(self):
""" print parser help """
self.parser.print_help()
[docs] def execute(self):
""" execute fuzzing strategies from parser
Returns:
output
"""
return fuzz_from_parser(self)
[docs]class SchemaYAMLParser(BaseYAMLParser):
""" Schema YAML Parser is used
generate data using a simple YAML definition.
see also `parsers.core.BaseYAMLParser`
"""
REQUIRED_FIELDS = ['schema', 'output', 'num_rows']
def __init__(self, file_name):
""" Parse the schema for generating data
(see: `parser.BaseYAMLParser`)
Attributes:
start_time (datetime): start date for timeseries (or None)
end_time (datetime): end date for timeseries (or None)
increments (str): timeseries increment
'seconds', 'hours', 'days' (or None)
"""
super().__init__(file_name)
self.start_time = None
self.end_time = None
self.increments = None
if 'timeseries' in self.parsed:
self.parse_timeseries()
@property
def schema(self):
""" Return schema from parsed YAML """
return self.parsed.get('schema')
@property
def output(self):
""" Return output from parsed YAML """
return self.parsed.get('output')
@property
def timeseries(self):
""" Return timeseries from parsed YAML """
return self.parsed.get('timeseries')
@property
def num_rows(self):
""" Return num_rows from parsed YAML """
return self.parsed.get('num_rows')
[docs] def validate_yaml(self):
""" Validate that all required fields are parsed from YAML
raises SyntaxError if required field missing
"""
for section in self.REQUIRED_FIELDS:
try:
assert self.parsed.get(section) is not None
except AssertionError:
raise SyntaxError(
'Required field {} is not present!'.format(section))
[docs] def parse_timeseries(self):
""" Parse and set values related to timeseries
raises SyntaxError if start or end time were
not properly parsed
"""
self.start_time = self.timeseries.get('start_time')
self.end_time = self.timeseries.get('end_time')
self.increments = self.timeseries.get('increments')
if isinstance(self.start_time, datetime):
if self.end_time is None or isinstance(self.end_time, datetime):
return
raise SyntaxError(
'You must specify starttime in isoformat: ' +
'YYYY-MM-DDThh:mm or YYYY-MM-DDThh:mm:ss')
[docs] def execute(self):
""" generate data using parsed YAML
Returns:
output
"""
return generate_from_parser(self)
[docs]class SchemaCLIParser:
""" Schema Parser for CLI Input
This generates a argparser to parse input
and can be used to then generate the
dataset
"""
REQUIRED_FIELDS = ['output', 'num_rows', 'schema']
def __init__(self, **kwargs):
""" Parse arguments for generating data
Attributes:
start_time (datetime): start date for timeseries (or None)
end_time (datetime): end date for timeseries (or None)
increments (str): timeseries increment
'seconds', 'hours', 'days' (or None)
num_rows (int): number of rows to generate
output (str): output string (filename)
schema (dict): dictionary of schema to generate
parser (`ArgumentParser`): argument parser
Note: length of fields should match that of values
"""
self.num_rows = kwargs.get('num_rows')
self.start_time = kwargs.get('start_time')
self.end_time = kwargs.get('end_time')
self.increments = kwargs.get('increments')
self.output = kwargs.get('output')
self.schema = kwargs.get('schema') or {}
self.parser = self.init_parser()
[docs] def validate_arguments(self):
""" Validate that all required fields are submitted """
for section in self.REQUIRED_FIELDS:
try:
assert getattr(self, section) is not None
except AssertionError:
raise Exception("You must include %s in your parser" % section)
[docs] def init_parser(self):
""" Generate `argparse.ArgumentParser`
to use for parsing arguments """
parser = argparse.ArgumentParser(
description='Generate dataset: to use')
parser.add_argument('generate', choices=['generate'],
default='generate')
parser.add_argument('-f', '--fields', type=lambda x: x.split(';'),
help='semicolon-delimited string of field names')
parser.add_argument('-v', '--values', type=lambda x: x.split(';'),
help='semicolon-delimited string of values.' +
'This can be a mix of faker types and ranges')
parser.add_argument('-o', '--output', type=str,
help='what output to use')
parser.add_argument('-n', '--num_rows', type=int,
help='number of rows to generate')
parser.add_argument('--start_time',
type=lambda x:
datetime.strptime(x, '%Y-%m-%dT%H:%M:%S'),
help='start time of timeseries in isoformat:' +
'YYYY-MM-DDThh:mm:ss')
parser.add_argument('--end_time',
type=lambda x:
datetime.strptime(x, '%Y-%m-%dT%H:%M:%S'),
help='end time of timeseries in isoformat: ' +
'YYYY-MM-DDThh:mm:ss')
parser.add_argument('--increments',
choices=['hours', 'seconds', 'days', 'random'],
default='random',
help='how to increment entries')
return parser
[docs] def parse_args(self, argv=None):
""" Parse arguments and validate them
Kwargs:
argv (sys.argv or similar list)
"""
args = self.parser.parse_args(argv)
self.start_time = args.start_time
self.end_time = args.end_time
self.increments = args.increments
self.num_rows = args.num_rows
self.output = args.output
self.schema = dict((f, v) for f, v in zip(args.fields, args.values))
self.validate_arguments()
[docs] def print_help(self):
""" print parser help """
self.parser.print_help()
[docs] def execute(self):
""" Generates data from CLI parsed arguments
Returns:
output
"""
return generate_from_parser(self)