Source code for antinex_utils.make_predictions

import os
import uuid
import json
import numpy
import pandas as pd
import copy
from spylunking.log.setup_logging import build_colorized_logger
from antinex_utils.consts import SUCCESS
from antinex_utils.consts import ERR
from antinex_utils.consts import FAILED
from antinex_utils.consts import NOTRUN
from antinex_utils.utils import ev
from antinex_utils.utils import ppj
from antinex_utils.build_training_request import \
    build_training_request
from antinex_utils.build_scaler_dataset_from_records import \
    build_scaler_dataset_from_records
from antinex_utils.build_scaler_train_and_test_datasets import \
    build_scaler_train_and_test_datasets
from antinex_utils.merge_inverse_data_into_original import \
    merge_inverse_data_into_original
from keras.models import Sequential
from keras.models import model_from_json
from keras.layers import Dense
from keras.layers import Dropout
from keras.wrappers.scikit_learn import KerasRegressor
from keras.wrappers.scikit_learn import KerasClassifier
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import KFold
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
import tensorflow as tf
import matplotlib
matplotlib.use("Agg")  # noqa
import matplotlib.pyplot as plt  # noqa


log = build_colorized_logger(
    name='make_predict')


[docs]def build_regression_dnn( num_features, compile_data, label="", model_json=None, model_desc=None): """build_regression_dnn :param num_features: input_dim for the number of features in the data :param compile_data: dictionary of compile options :param label: log label for tracking this method :param model_json: keras model json to build the model :param model_desc: optional dictionary for model """ model = Sequential() if model_json: log.info(("{} building regression " "dnn model_json={}") .format( label, model_json)) model = model_from_json(json.dumps(model_json)) elif model_desc: log.info(("{} building regression " "dnn num_features={} model_desc={}") .format( label, num_features, model_desc)) num_layers = 0 for idx, node in enumerate(model_desc["layers"]): layer_type = node.get( "layer_type", "dense").lower() if layer_type == "dense": if num_layers == 0: model.add( Dense( int(node["num_neurons"]), input_dim=num_features, kernel_initializer=node["init"], activation=node["activation"])) else: model.add( Dense( int(node["num_neurons"]), kernel_initializer=node["init"], activation=node["activation"])) else: if layer_type == "dropout": model.add( Dropout( float(node["rate"]))) # end of supported model types num_layers += 1 # end of all layers else: # https://machinelearningmastery.com/regression-tutorial-keras-deep-learning-library-python/ # noqa log.info(("{} building regression " "dnn num_features={}") .format( label, num_features)) model.add( Dense( 8, input_dim=num_features, kernel_initializer="normal", activation="relu")) model.add( Dense( 6, kernel_initializer="normal", activation="relu")) model.add( Dense( 1, kernel_initializer="normal")) # end of building a regression dnn # if model was defined if model: log.info(("{} - regression compiling={}") .format( label, compile_data)) # compile the model loss = compile_data.get( "loss", "mse") optimizer = compile_data.get( "optimizer", "adam") metrics = compile_data.get( "metrics", [ "mse", "mae", "mape", "cosine" ]) model.compile( loss=loss, optimizer=optimizer, metrics=metrics) else: log.error(("{} - failed building regression model") .format( label)) # if could compile model return model
# end of build_regression_dnn
[docs]def build_classification_dnn( num_features, compile_data, label="", model_json=None, model_desc=None): """build_classification_dnn :param num_features: input_dim for the number of features in the data :param compile_data: dictionary of compile options :param label: log label for tracking this method :param model_json: keras model json to build the model :param model_desc: optional dictionary for model """ model = Sequential() if model_json: log.info(("{} building classification " "dnn model_json={}") .format( label, num_features, model_json)) model = model_from_json(json.dumps(model_json)) elif model_desc: log.info(("{} building classification " "dnn num_features={} model_desc={}") .format( label, num_features, model_desc)) num_layers = 0 for idx, node in enumerate(model_desc["layers"]): layer_type = node.get( "layer_type", "dense").lower() if layer_type == "dense": if num_layers == 0: model.add( Dense( int(node["num_neurons"]), input_dim=num_features, kernel_initializer=node["init"], activation=node["activation"])) else: model.add( Dense( int(node["num_neurons"]), kernel_initializer=node["init"], activation=node["activation"])) else: if layer_type == "dropout": model.add( Dropout( float(node["rate"]))) # end of supported model types num_layers += 1 # end of all layers else: # https://machinelearningmastery.com/tutorial-first-neural-network-python-keras/ # noqa log.info(("{} building classification " "dnn num_features={}") .format( label, num_features)) model.add( Dense( 8, input_dim=num_features, kernel_initializer="uniform", activation="relu")) model.add( Dense( 6, kernel_initializer="uniform", activation="relu")) model.add( Dense( 1, kernel_initializer="uniform", activation="sigmoid")) # end of building a classification dnn # if model was defined if model: log.info(("{} - classification compiling={}") .format( label, compile_data)) # compile the model loss = compile_data.get( "loss", "binary_crossentropy") optimizer = compile_data.get( "optimizer", "adam") metrics = compile_data.get( "metrics", [ "accuracy" ]) model.compile( loss=loss, optimizer=optimizer, metrics=metrics) else: log.error(("{} - failed building classification model") .format( label)) # if could compile model return model
# end of build_classification_dnn
[docs]def check_request( req): """check_request :param req: dictionary to check values """ label = req.get("label", "no-label-set") predict_rows = req.get("predict_rows", None) manifest = req.get("manifest", None) dataset = req.get("dataset", None) csv_file = req.get("csv_file", None) use_existing_model = req.get("use_existing_model", False) if use_existing_model: if not predict_rows and not dataset: return ("{} missing predict_rows or dataset for existing model" "request={}").format( label, req) else: return None # existing models just need a couple rows or a dataset to work if not manifest and not dataset and not predict_rows and not csv_file: return ("{} missing manifest " "request={}").format( label, ppj(req)) if manifest: csv_file = manifest.get( "csv_file", None) if not predict_rows and not csv_file and not dataset: return ("{} missing dataset predict_rows or csv_file in " "manifest of request={}").format( label, ppj(req)) if not dataset and not predict_rows and not csv_file: return ("{} missing dataset or predict_rows or csv_file in " "request={}").format( label, ppj(req)) return None
# end of check_request
[docs]def save_prediction_image( label="not-set", history=None, histories=[], image_file=None): """save_prediction_image :param history: model prediction history :param histories: histories to generate in the image :param image_file: save to file """ status = FAILED if not history: log.info(("{} - no history") .format( label)) return status if not histories: log.info(("{} - no histories") .format( label)) return status if not image_file: log.info(("{} - no image_file") .format( label)) return status try: if history and len(histories) > 0: log.info(("plotting history={} " "histories={}") .format( history, histories)) should_save = False for h in histories: if h in history.history: log.info(("plotting={}") .format( h)) plt.plot( history.history[h], label=h) should_save = True else: log.error(("missing history={}") .format( h)) # for all histories if should_save: log.info(("saving plots as image={}") .format( image_file)) plt.legend(loc='best') plt.savefig(image_file) if not os.path.exists(image_file): log.error(("Failed saving image={}") .format( image_file)) # end of saving file # end of if there are histories to plot except Exception as e: log.error(("Failed saving " "image_file={} ex={}") .format( image_file, e))
# end of try/ex # end of save_prediction_image def build_train_and_test_features( df_columns, features_to_process, predict_feature, ignore_features): """build_train_and_test_features Order matters when slicing up datasets using scalers... if not, then something that is an int/bool can get into a float column and that is really bad for making predictions with new or pre-trained models... :param df_columns: columns in the dataframe :param features_to_process: requested features to train :param predict_feature: requested feature to predict :param ignore_features: requested non-numeric/not-wanted features """ train_and_test_features = [] # for all columns in the data # add columns in order if they are in the requested: # features_to_process list and not in the ignore_features for c in df_columns: if c == predict_feature: train_and_test_features.append( c) else: add_feature = True for i in ignore_features: if i == c: add_feature = False break if add_feature: for f in features_to_process: if f == c: train_and_test_features.append( c) break # end of filtering features before scalers return train_and_test_features # end of build_train_and_test_features
[docs]def make_predictions( req): """make_predictions :param req: dictionary for making predictions """ last_step = "not-run" label = "no-label-set" model = None predictions = None sample_predictions = [] rounded = [] accuracy = None error = None image_file = None history = None histories = None indexes = None scores = None cm = None scaler_res = None scaler_res_data = None scaler_train = None scaler_test = None scaled_train_dataset = None scaled_test_dataset = None inverse_predictions = None merge_df = None are_predicts_merged = False existing_model_dict = None data = { "predictions": predictions, "rounded_predictions": rounded, "sample_predictions": sample_predictions, "acc": accuracy, "scores": scores, "history": history, "histories": histories, "image_file": image_file, "model": model, "indexes": indexes, "confusion_matrix": cm, "scaler_train": scaler_train, "scaler_test": scaler_test, "scaled_train_dataset": scaled_train_dataset, "scaled_test_dataset": scaled_test_dataset, "inverse_predictions": inverse_predictions, "apply_scaler": False, "are_predicts_merged": are_predicts_merged, "merge_df": merge_df, "err": error } res = { "status": NOTRUN, "err": last_step, "data": None } try: label = req.get("label", "no-label-set") last_step = "validating" log.info("{} - {}".format( label, last_step)) invalid_error_string = check_request(req) if invalid_error_string: last_step = ("{} - {}").format( label, invalid_error_string) log.info(("predictions stopping: {}") .format( last_step)) res["err"] = last_step res["status"] = ERR res["data"] = None return res # end of checking for bad request inputs predict_rows = req.get("predict_rows", None) verbose = int(req.get("verbose", "1")) manifest = req.get("manifest", {}) model_json = req.get("model_json", None) model_desc = req.get("model_desc", None) weights_json = req.get("weights_json", None) weights_file = req.get("weights_file", None) should_predict = req.get("should_predict", True) dataset = req.get("dataset", None) new_model = True existing_model_dict = req.get("use_existing_model", None) if existing_model_dict: log.info(("{} - using existing model={}") .format( label, existing_model_dict["model"])) accuracy = existing_model_dict["acc"] scores = existing_model_dict["scores"] history = existing_model_dict["history"] histories = existing_model_dict["histories"] model = existing_model_dict["model"] new_model = False else: log.info(("{} - new model") .format( label)) # end of loading the existing model save_weights = False image_file = req.get("image_file", None) loss = req.get( "loss", manifest.get( "loss", "mse")) optimizer = req.get( "optimizer", manifest.get( "optimizer", "adam")) metrics = req.get( "metrics", manifest.get( "metrics", [ "accuracy" ])) histories = req.get( "histories", manifest.get( "histories", [ "val_loss", "val_acc", "loss", "acc" ])) ml_type = req.get( "ml_type", manifest.get( "ml_type", "classification")) features_to_process = req.get( "features_to_process", manifest.get( "features_to_process", [])) filter_features = req.get( "filter_features", []) ignore_features = req.get( "ignore_features", []) predict_feature = req.get( "predict_feature", manifest.get( "predict_feature", None)) epochs = int(req.get( "epochs", manifest.get( "epochs", "5"))) batch_size = int(req.get( "batch_size", manifest.get( "batch_size", "32"))) test_size = float(req.get( "test_size", manifest.get( "test_size", "0.2"))) num_splits = int(req.get( "num_splits", manifest.get( "num_splits", "2"))) verbose = int(req.get( "verbose", manifest.get( "verbose", "1"))) seed = int(req.get( "seed", manifest.get( "seed", "9"))) label_rules = req.get( "label_rules", manifest.get( "label_rules", {})) min_scaler_range = int(req.get( "min_scaler_range", "-1")) max_scaler_range = int(req.get( "max_scaler_range", "1")) apply_scaler = bool(str(req.get( "apply_scaler", "false")).lower() == "true") scaler_cast_to_type = req.get( "scaler_cast_type", "float32") sort_by = req.get( "sort_values", None) max_records = int(req.get( "max_records", "100000")) predict_type = manifest.get( "predict_type", "predict") manifest_headers = manifest.get( "headers", []) csv_file = manifest.get( "csv_file", None) meta_file = manifest.get( "meta_file", None) if not weights_file: weights_file = manifest.get( "model_weights_file", None) num_features = len(features_to_process) detected_headers = [] num_samples = None org_df = None row_df = None filter_df = None sample_rows = None target_rows = None num_target_rows = None ml_req = None use_evaluate = False if csv_file and meta_file and predict_feature: if os.path.exists(csv_file) and os.path.exists(meta_file): use_evaluate = True else: if dataset: if os.path.exists(dataset): use_evaluate = True csv_file = dataset else: if predict_rows and not existing_model_dict: use_evaluate = True # end of if we're building a dataset from these locations numpy.random.seed(seed) last_step = ("loading prediction " "into dataframe seed={} " "scaler={} range[{},{}]").format( seed, apply_scaler, min_scaler_range, max_scaler_range) log.info("{} - {}".format( label, last_step)) if not weights_file: weights_file = manifest.get( "model_weights_file", None) last_step = ("loading prediction into dataframe") log.info("{} - {}".format( label, last_step)) # convert json into pandas dataframe for model.predict try: if new_model and use_evaluate and not predict_rows: log.info(("{} - loading predictions new_model={} " "evaluate={} csv={} sort={}") .format( label, new_model, use_evaluate, csv_file, sort_by)) if sort_by: org_df = pd.read_csv( csv_file, encoding="utf-8-sig").sort_values( by=sort_by) else: org_df = pd.read_csv( csv_file, encoding="utf-8-sig") predict_rows = org_df.to_json() detected_headers = list(org_df.columns.values) if apply_scaler: for f in features_to_process: if f not in org_df.columns: log.error(("{} " "csv={} is missing column={}") .format( label, csv_file, f)) # show columns that were supposed to be in the # dataset but are not train_and_test_features = \ build_train_and_test_features( org_df.columns, features_to_process, predict_feature, ignore_features) log.info(("building csv scalers all_features={}") .format( train_and_test_features)) scaler_transform_res = \ build_scaler_dataset_from_records( label=label, record_list=org_df[ train_and_test_features].to_json(), min_feature=min_scaler_range, max_feature=max_scaler_range, cast_to_type=scaler_cast_to_type) if scaler_transform_res["status"] == SUCCESS: log.info(("{} - scaled dataset predict_rows={} " "df={} dataset={}") .format( label, len(predict_rows), len(scaler_transform_res["org_recs"].index), len(scaler_transform_res["dataset"]))) else: log.error(("{} - failed to scale dataset err={} " "predict_rows={} df={} dataset={}") .format( label, scaler_transform_res["err"], len(predict_rows), len(scaler_transform_res["org_recs"].index), len(scaler_transform_res["dataset"]))) # if scaler works on dataset log.info(("{} building scaled samples and rows") .format( label)) # noqa https://stackoverflow.com/questions/21764475/scaling-numbers-column-by-column-with-pandas-python row_df = pd.DataFrame( scaler_transform_res["dataset"], columns=org_df[ train_and_test_features].columns) sample_rows = row_df[features_to_process] target_rows = row_df[predict_feature] num_samples = len(sample_rows.index) num_target_rows = len(target_rows.index) else: log.info(("{} - not applying scaler to predict_rows") .format( label)) row_df = org_df log.info(("{} - setting samples " "to features_to_process={} cols={}") .format( label, ppj(features_to_process), list(org_df.columns.values))) sample_rows = row_df[features_to_process] target_rows = row_df[predict_feature] num_samples = len(sample_rows.index) num_target_rows = len(target_rows.index) # if applying scaler to predict rows # end of loading from a csv if dataset: log.info(("{} loading dataset={}") .format( label, dataset)) if sort_by: org_df = pd.read_csv( dataset, encoding="utf-8-sig").sort_values( by=sort_by) else: org_df = pd.read_csv( dataset, encoding="utf-8-sig") # end of loading the df log.info(("{} preparing dataset={}") .format( label, dataset)) ml_req = { "X_train": None, "Y_train": None, "X_test": None, "Y_test": None } scaled_train_features = [] cur_headers = list(org_df.columns.values) for h in cur_headers: include_feature = True if h == predict_feature: include_feature = False else: for f in features_to_process: if h == f: scaled_train_features.append(h) include_feature = False break for e in ignore_features: if h == e: include_feature = False break # filter out columns if include_feature: features_to_process.append(h) # end of building features # make sure to prune out ignored ones: cleaned_scaled_train = [] for idx, h in enumerate(scaled_train_features): should_include = True for i in ignore_features: if h == i: should_include = False if should_include: cleaned_scaled_train.append(h) # end of pruning ignored ones # assign to cleaned list scaled_train_features = cleaned_scaled_train filter_features = copy.deepcopy(features_to_process) include_predict_feature = True for f in filter_features: if f == predict_feature: include_predict_feature = False break if include_predict_feature: filter_features.append(predict_feature) if apply_scaler: num_features = len(scaled_train_features) log.info(("{} scaling dataset={} " "scaled_train_features={}") .format( label, len(org_df.index), ppj(scaled_train_features))) scaler_res = \ build_scaler_train_and_test_datasets( label=label, train_features=scaled_train_features, test_feature=predict_feature, df=org_df, test_size=test_size, seed=seed, scaler_cast_to_type=scaler_cast_to_type, min_feature_range=min_scaler_range, max_feature_range=max_scaler_range) if scaler_res["status"] != SUCCESS: log.info(("{} - scaler transform failed error={}") .format( label, scaler_res["err"])) res["status"] = ERR res["err"] = last_step res["data"] = None return res else: log.info(("{} - scaler transform done") .format( label)) scaler_res_data = scaler_res["data"] ml_req["X_train"] = scaler_res_data["x_train"] ml_req["Y_train"] = scaler_res_data["y_train"] ml_req["X_test"] = scaler_res_data["x_test"] ml_req["Y_test"] = scaler_res_data["y_test"] scaler_train = scaler_res_data["scaler_train"] scaler_test = scaler_res_data["scaler_test"] scaled_train_dataset = \ scaler_res_data["scaled_train_dataset"] scaled_test_dataset = \ scaler_res_data["scaled_test_dataset"] log.info(("{} - building scaled row_df " "filter_features={}") .format( label, filter_features)) # noqa https://stackoverflow.com/questions/21764475/scaling-numbers-column-by-column-with-pandas-python last_step = ("building row_df from scaled ds " "train_features={}").format( scaled_train_features) row_df = pd.DataFrame( scaler_res_data["scaled_train_dataset"], columns=list(scaled_train_features)) last_step = ("building samples from rows_df={} " "train_features={}").format( len(row_df.index), scaled_train_features) sample_rows = pd.DataFrame( scaler_res_data["scaled_train_dataset"], columns=list(scaled_train_features)) last_step = ("building targets from scaled ds " "predict_feature={}").format( predict_feature) target_rows = pd.DataFrame( scaler_res_data["scaled_test_dataset"], columns=[predict_feature]) last_step = ("adding predict_feature={} to " "row_df").format( predict_feature) row_df[predict_feature] = \ scaler_res_data["scaled_test_dataset"] last_step = ("counting num_sample_rows") num_samples = len(sample_rows.index) last_step = ("counting num_target_rows") num_target_rows = len(target_rows.index) log.info(("{} row_df created features={} " "num_samples={} num_targets={}") .format( label, features_to_process, num_samples, num_target_rows)) # end of setting up scaler train/test data else: num_features = len(features_to_process) log.info(("{} filtering dataset={} filter_features={}") .format( label, len(org_df.index), ppj(filter_features))) filter_df = org_df[filter_features] log.info(("{} splitting non-scaled" "filtered_df={} predict_feature={} test_size={} " "features={} ignore_features={} csv={}") .format( label, len(filter_df.index), test_size, predict_feature, ppj(features_to_process), ppj(ignore_features), dataset)) # split the data into training (ml_req["X_train"], ml_req["X_test"], ml_req["Y_train"], ml_req["Y_test"]) = train_test_split( filter_df[features_to_process], filter_df[predict_feature], test_size=test_size, random_state=seed) row_df = org_df log.info(("{} - setting samples " "to features_to_process={} cols={}") .format( label, ppj(features_to_process), list(org_df.columns.values))) sample_rows = row_df[features_to_process] target_rows = row_df[predict_feature] num_samples = len(sample_rows.index) num_target_rows = len(target_rows.index) # if applying scaler to predict rows else: if apply_scaler: log.info(("{} - no dataset - scaling predict_rows={}") .format( label, len(predict_rows))) org_df = pd.read_json(predict_rows) row_df = org_df for f in features_to_process: if f not in org_df.columns: log.error(("{} " "predict_rows are missing column={}") .format( label, f)) # show columns that were supposed to be in the # predict_rows but are not train_and_test_features = \ build_train_and_test_features( org_df.columns, features_to_process, predict_feature, ignore_features) log.info(("building predict_rows scalers all_features={}") .format( train_and_test_features)) scaler_transform_res = \ build_scaler_dataset_from_records( label=label, record_list=org_df[ train_and_test_features].to_json(), min_feature=min_scaler_range, max_feature=max_scaler_range, cast_to_type=scaler_cast_to_type) if scaler_transform_res["status"] == SUCCESS: log.info(("{} - scaled predict_rows={} " "df={} dataset={}") .format( label, len(predict_rows), len(scaler_transform_res["org_recs"].index), len(scaler_transform_res["dataset"]))) else: log.error(("{} - failed to scale predict err={} " "predict_rows={} df={} dataset={}") .format( label, scaler_transform_res["err"], len(predict_rows), len(scaler_transform_res["org_recs"].index), len(scaler_transform_res["dataset"]))) # if scaler works on dataset log.info(("{} building predict org_df scaled " "for testing all samples and predict_rows") .format( label)) # noqa https://stackoverflow.com/questions/21764475/scaling-numbers-column-by-column-with-pandas-python row_df = pd.DataFrame( scaler_transform_res["dataset"], columns=org_df[ train_and_test_features].columns) log.info(("{} casting data to floats") .format( label)) ml_req = { "X_train": row_df[features_to_process].astype( "float32").values, "Y_train": row_df[predict_feature].astype( "float32").values, "X_test": row_df[features_to_process].astype( "float32").values, "Y_test": row_df[predict_feature].astype( "float32").values } sample_rows = row_df[features_to_process] target_rows = row_df[predict_feature] num_samples = len(sample_rows.index) num_target_rows = len(target_rows.index) else: log.info(("{} - no dataset using org_df") .format( label)) org_df = pd.read_json(predict_rows) row_df = org_df ml_req = { "X_train": org_df[features_to_process].astype( "float32").values, "Y_train": org_df[predict_feature].astype( "float32").values, "X_test": org_df[features_to_process].astype( "float32").values, "Y_test": org_df[predict_feature].astype( "float32").values } log.info(("{} building predict org_df scaled " "for testing all samples WITHOUT predict_rows") .format( label)) log.info(("{} - setting samples " "to features_to_process={} cols={}") .format( label, ppj(features_to_process), list(org_df.columns.values))) sample_rows = row_df[features_to_process] target_rows = row_df[predict_feature] num_samples = len(sample_rows.index) num_target_rows = len(target_rows.index) # end of if apply_scalar to csv + predict_rows # end of handling metadata-driven split vs controlled except Exception as f: last_step = ("{} - failed during '{}' json={} " "with ex={}").format( label, last_step, model_json, f) log.error(last_step) res["status"] = ERR res["err"] = last_step res["data"] = None return res # end of try/ex to convert rows to pandas dataframe if num_samples == 0: last_step = ("{} - " "missing predict_rows={}").format( label, sample_rows) log.error(last_step) res["status"] = ERR res["err"] = last_step res["data"] = None return res # stop if no rows # check the headers in the dataframe match what # the original model was trained with for h in manifest_headers: found_header = False for d in detected_headers: if h == d: found_header = True break # end of for all detected headers if not found_header: last_step = ("{} - invalid predict_rows - header={} in " "detected_headers={}").format( label, h, detected_headers) log.error(last_step) res["status"] = ERR res["err"] = last_step res["data"] = None return res # end for all manifest headers - expected to be in # predict_rows # create a back up weights file if one was not targeted if not weights_file: h5_storage_dir = ev( "H5_DIR", "/tmp") weights_file = "{}/{}.h5".format( h5_storage_dir, str(uuid.uuid4())) # end of building a weights file last_step = "loading model" log.info("{} - {}".format( label, last_step)) # load the model from the json try: if new_model: compile_data = { "loss": loss, "optimizer": optimizer, "metrics": metrics } if ml_type == "standalone-classification": model = build_classification_dnn( num_features=num_features, compile_data=compile_data, label=label, model_json=model_json, model_desc=model_desc) elif ml_type == "classification": def set_model(): return build_classification_dnn( num_features=num_features, compile_data=compile_data, label=label, model_json=model_json, model_desc=model_desc) model = KerasClassifier( build_fn=set_model, epochs=epochs, batch_size=batch_size, verbose=verbose) elif ml_type == "regression": def set_model(): return build_regression_dnn( num_features=num_features, compile_data=compile_data, label=label, model_json=model_json, model_desc=model_desc) model = KerasRegressor( build_fn=set_model, epochs=epochs, batch_size=batch_size, verbose=verbose) else: def set_model(): return build_regression_dnn( num_features=num_features, compile_data=compile_data, label=label, model_json=model_json, model_desc=model_desc) model = KerasRegressor( build_fn=set_model, epochs=epochs, batch_size=batch_size, verbose=verbose) else: log.info(("{} - using existing - not building={}") .format( label, new_model)) # end of if new_model or use existing except Exception as f: last_step = ("{} - failed during '{}' ml_type={} " "model_json={} model_desc={} " "with ex={}").format( label, last_step, ml_type, model_json, model_desc, f) log.error(last_step) res["status"] = ERR res["err"] = last_step res["data"] = None return res # end of try/ex to save weights last_step = "saving model weights={}".format( weights_file) log.info("{} - {}".format( label, last_step)) if use_evaluate: last_step = ("building training ml_type={} " "predict_type={} " "sample_rows={} target_rows={} " "manifest={}").format( ml_type, predict_type, num_samples, num_target_rows, ppj(manifest)) log.info("{} - {}".format( label, last_step)) # build training request for new predicts if not dataset and csv_file: ml_req = build_training_request( csv_file=csv_file, meta_file=meta_file, predict_feature=predict_feature, test_size=test_size) # fit the model if new_model: last_step = ("fitting Xtrain={} Ytrain={} Xtest={} Ytest={} " "epochs={} batch_size={}").format( len(ml_req["X_train"]), len(ml_req["Y_train"]), len(ml_req["X_test"]), len(ml_req["Y_test"]), epochs, batch_size) log.info("{} - {}".format( label, last_step)) history = model.fit( ml_req["X_train"], ml_req["Y_train"], validation_data=( ml_req["X_test"], ml_req["Y_test"]), epochs=epochs, batch_size=batch_size, shuffle=False, verbose=verbose) else: log.info(("{} - using existing - not fitting={}") .format( label, new_model)) # end of if new model or using existing # end of building training update data # not use_evaluate last_step = ("predicting ml_type={} predict_type={} " "rows={} manifest={}").format( ml_type, predict_type, num_samples, ppj(manifest)) log.info("{} - {}".format( label, last_step)) if os.path.exists(weights_file): last_step = ("loading weights_file={}").format( weights_file) log.info("{} - {}".format( label, last_step)) # load the weights from the file on disk try: if ml_type == "standalone-classification": model.load_weights( weights_file) else: model.model.load_weights( weights_file) except Exception as f: last_step = ("{} - failed during '{}' " "file={} weights={}" "with ex={}").format( label, last_step, weights_file, weights_json, f) log.error(last_step) res["status"] = ERR res["err"] = last_step res["data"] = None return res # end of try/ex to save weights else: log.info(("{} did not find weights_file={}") .format( label, weights_file)) # only load weights if the file is still on disk # evaluating last_step = ("evaluating num_xtest={} num_ytest={} " "metrics={} histories={} " "loss={} optimizer={}").format( len(ml_req["X_test"]), len(ml_req["Y_test"]), metrics, histories, loss, optimizer) log.info("{} - {}".format( label, last_step)) # make predictions try: if should_predict: if ml_type == "standalone-classification": if new_model: scores = model.evaluate( ml_req["X_test"], ml_req["Y_test"]) if len(scores) > 1: accuracy = { "accuracy": scores[1] * 100 } else: accuracy = { "accuracy": 0.0 } # no scoring on existing models predictions = model.predict( sample_rows.values, verbose=verbose) numpy.set_printoptions(threshold=numpy.nan) sess = tf.InteractiveSession() # noqa indexes = tf.argmax(predictions, axis=1) data["indexes"] = indexes rounded = [round(x[0]) for x in predictions] ridx = 0 should_set_labels = False labels_dict = {} if "labels" in label_rules \ and "label_values" in label_rules: label_rows = label_rules["label_values"] for idx, lidx in enumerate(label_rows): if len(label_rules["labels"]) >= idx: should_set_labels = True labels_dict[str(lidx)] = \ label_rules["labels"][idx] # end of compiling labels dictionary log.info(("{} - scores={} accuracy={} " "merging predictions={} labels={}") .format( label, scores, accuracy.get("accuracy", None), len(sample_rows.index), labels_dict)) for idx, row in row_df.iterrows(): if len(sample_predictions) > max_records: log.info(("{} hit max={} predictions") .format( label, max_records)) break new_row = json.loads(row.to_json()) cur_value = rounded[ridx] if predict_feature in row: new_row["_original_{}".format( predict_feature)] = \ row[predict_feature] else: new_row["_original_{}".format( predict_feature)] = \ "missing-from-dataset" new_row[predict_feature] = int(cur_value) if should_set_labels: new_row["label_name"] = \ labels_dict[str(int(cur_value))] new_row["_row_idx"] = ridx new_row["_count"] = idx sample_predictions.append(new_row) ridx += 1 # end of merging samples with predictions elif ml_type == "classification": if new_model: last_step = "building estimators" estimators = [] estimators.append( ("standardize", StandardScaler())) estimators.append( ("mlp", model)) last_step = "building pipeline" pipeline = Pipeline(estimators) # https://machinelearningmastery.com/multi-class-classification-tutorial-keras-deep-learning-library/ # noqa last_step = ("{} - starting classification " "StratifiedKFold " "splits={} seed={}").format( label, num_splits, seed) log.info(last_step) kfold = StratifiedKFold( n_splits=num_splits, random_state=seed) last_step = "cross_val_score" log.info(("{} - classification cross_val_score: ") .format( label)) results = cross_val_score( pipeline, ml_req["X_train"], ml_req["Y_train"], cv=kfold) scores = [ results.std(), results.mean() ] accuracy = { "accuracy": results.mean() * 100 } log.info(("{} - classification accuracy={} samples={}") .format( label, accuracy["accuracy"], num_samples)) else: log.info(("{} - using existing " "accuracy={} scores={} " "predictions={}") .format( label, accuracy, scores, len(sample_rows.index))) # end of if use existing or new model predictions = model.predict( sample_rows.values, verbose=verbose) if new_model: log.info(("{} - " "classification confusion_matrix samples={} " "predictions={} target_rows={}") .format( label, num_samples, len(predictions), num_target_rows)) cm = confusion_matrix( target_rows.values, predictions) log.info(("{} - " "classification has confusion_matrix={} " "predictions={} target_rows={}") .format( label, cm, len(predictions), num_target_rows)) # end of confusion matrix rounded = [round(x[0]) for x in predictions] ridx = 0 should_set_labels = False labels_dict = {} if "labels" in label_rules \ and "label_values" in label_rules: label_rows = label_rules["label_values"] for idx, lidx in enumerate(label_rows): if len(label_rules["labels"]) >= idx: should_set_labels = True labels_dict[str(lidx)] = \ label_rules["labels"][idx] # end of compiling labels dictionary log.info(("{} - ml_type={} scores={} accuracy={} " "merging samples={} with predictions={} " "labels={}") .format( label, ml_type, scores, accuracy.get("accuracy", None), len(sample_rows.index), len(rounded), labels_dict)) for idx, row in row_df.iterrows(): if len(sample_predictions) > max_records: log.info(("{} hit max={} predictions") .format( label, max_records)) break new_row = json.loads(row.to_json()) cur_value = rounded[ridx] if predict_feature in row: new_row["_original_{}".format( predict_feature)] = \ row[predict_feature] else: new_row["_original_{}".format( predict_feature)] = \ "missing-from-dataset" new_row[predict_feature] = int(cur_value) if should_set_labels: new_row["label_name"] = \ labels_dict[str(int(cur_value))] new_row["_row_idx"] = ridx new_row["_count"] = idx sample_predictions.append(new_row) ridx += 1 # end of merging samples with predictions elif ml_type == "regression": if new_model: last_step = "building new regression model" estimators = [] estimators.append( ("standardize", StandardScaler())) estimators.append( ("mlp", model)) last_step = "building pipeline" pipeline = Pipeline(estimators) log.info(("{} - starting regression kfolds " "splits={} seed={}") .format( label, num_splits, seed)) last_step = "starting KFolds" kfold = KFold( n_splits=num_splits, random_state=seed) log.info(("{} - regression cross_val_score " "kfold={}") .format( label, num_splits)) last_step = "starting cross_val_score" results = cross_val_score( pipeline, sample_rows.values, target_rows.values, cv=kfold) log.info(("{} - regression prediction score: " "mean={} std={}") .format( label, results.mean(), results.std())) last_step = "getting scores" scores = [ results.std(), results.mean() ] last_step = "getting accuracy" accuracy = { "accuracy": results.mean() * 100 } # end of if new model or using existing last_step = "making predictions on samples" log.info(("{} - regression accuracy={} samples={}") .format( label, accuracy["accuracy"], num_samples)) org_predictions = model.predict( sample_rows.values, verbose=verbose) if apply_scaler and scaler_test: inverse_predictions = scaler_test.inverse_transform( org_predictions.reshape(-1, 1)).reshape(-1) predict_feature_values = \ pd.Series(inverse_predictions) inverse_predictions_df = pd.DataFrame( predict_feature_values, columns=[predict_feature]) predictions = inverse_predictions_df.values merge_req = { "org_recs": org_df, "inverse_recs": inverse_predictions_df } merge_res = merge_inverse_data_into_original( req=merge_req, sort_on_index=predict_feature, ordered_columns=[predict_feature]) merge_df = merge_res["merge_df"] sample_predictions = merge_df.to_json() log.info(("{} - merge_df={}") .format( label, len(sample_predictions))) are_predicts_merged = True else: last_step = "casting predictions to float" predictions = [float(x) for x in org_predictions] log.info(("{} - ml_type={} scores={} accuracy={} " "merging samples={} with predictions={}") .format( label, ml_type, scores, accuracy.get("accuracy", None), len(sample_rows.index), len(predictions))) last_step = "merging predictions with org dataframe" ridx = 0 for idx, row in row_df.iterrows(): if len(sample_predictions) > max_records: log.info(("{} hit max={} predictions") .format( label, max_records)) break new_row = json.loads(row.to_json()) cur_value = predictions[ridx] if predict_feature in row: new_row["_original_{}".format( predict_feature)] = \ row[predict_feature] else: new_row["_original_{}".format( predict_feature)] = \ "missing-from-dataset" new_row[predict_feature] = cur_value new_row["_row_idx"] = ridx new_row["_count"] = idx sample_predictions.append(new_row) log.debug(("predicting={} target={} predicted={}") .format( predict_feature, target_rows[ridx], new_row[predict_feature])) ridx += 1 # end of merging samples with predictions # handle inverse transform for scaler datasets last_step = "merging predictions done" else: last_step = ("{} - invalid ml_type={} " "rows={}").format( label, ml_type, num_samples) res["status"] = ERR res["err"] = last_step res["data"] = None return res else: log.info(("{} - skipping predictions") .format( should_predict)) except Exception as f: predictions = None last_step = ("{} - failed predicting '{}' " "file={} weights={} " "with ex={}").format( label, last_step, weights_file, weights_json, f) log.error(last_step) res["status"] = ERR res["err"] = last_step res["data"] = None return res # end of try/ex to predict if ml_type == "classification": last_step = ("packaging {} predictions={} " "rows={}").format( ml_type, len(rounded), len(sample_rows.index)) else: last_step = ("packaging {} predictions={} " "rows={}").format( ml_type, len(sample_predictions), len(sample_rows.index)) log.info(("{} - {}") .format( label, last_step)) try: image_saved = save_prediction_image( label=label, history=history, histories=histories, image_file=image_file) if image_saved: log.info(("{} - created image_file={}") .format( label, image_file)) else: image_file = None except Exception as f: image_file = None last_step = ("{} - failed creating image " "with ex={}").format( label, f) log.error(last_step) # end of trying to build prediction history image model_weights = {} if new_model and save_weights: try: model_weights = {} # disabled for https://github.com/keras-team/keras/issues/4875 # model.save_weights(weights_file) # model_weights = model.get_weights() except Exception as m: log.error(("saving label={} weights_file={} failed ex={}") .format( label, weights_file, m)) # end of try/ex save log.info(("convert label={} weights_file={} to df") .format( label, weights_file)) try: log.info(("label={} weights_file={} to HDFStore") .format( label, weights_file)) hdf_store = \ pd.HDFStore(weights_file) log.info(("label={} HDFStore getting keys") .format( label, weights_file)) hdf_keys = list(hdf_store.keys()) model_weights = { "keys": hdf_keys, "weights": [] } log.info(("label={} HDFStore found weight_file={} keys={}") .format( label, weights_file, hdf_keys)) for cur_key in hdf_keys: log.info(("label={} HDFStore converting keys={} to df") .format( label, cur_key)) df_hdf = \ pd.read_hdf(weights_file, cur_key) model_weights["weights"].append({ "key": cur_key, "df": df_hdf.to_json()}) # end of for all keys in the HDFStore except Exception as m: log.error(("convert label={} weights_file={} failed ex={}") .format( label, weights_file, m)) # end of try/ex convert # end of save_weights file for next run log.info(("{} - predictions done") .format( label)) data["acc"] = accuracy data["histories"] = histories data["image_file"] = image_file if new_model: if ml_type == "standalone-classification": data["model"] = model else: data["model"] = model.model else: data["model"] = model data["weights"] = model_weights data["indexes"] = indexes data["scores"] = scores data["predictions"] = predictions data["rounded"] = rounded data["sample_predictions"] = sample_predictions data["confusion_matrix"] = None data["scaler_train"] = scaler_train data["scaler_test"] = scaler_test data["scaled_train_dataset"] = scaled_train_dataset data["scaled_test_dataset"] = scaled_test_dataset data["inverse_predictions"] = inverse_predictions data["apply_scaler"] = apply_scaler data["merge_df"] = merge_df data["are_predicts_merged"] = are_predicts_merged res["status"] = SUCCESS res["err"] = "" res["data"] = data except Exception as e: res["status"] = ERR if existing_model_dict: last_step = ("failed {} existing_model request={} hit ex={} " "during last_step='{}'").format( label, req, e, last_step) else: last_step = ("failed {} request={} hit ex={} " "during last_step='{}'").format( label, ppj(req), e, last_step) res["err"] = last_step res["data"] = None log.error(last_step) # end of try/ex return res
# end of make_predictions