Source code for antinex_utils.merge_inverse_data_into_original

import json
import pandas as pd
from spylunking.log.setup_logging import build_colorized_logger
from antinex_utils.consts import SUCCESS
from antinex_utils.consts import ERR
from antinex_utils.consts import NOTRUN


log = build_colorized_logger(
    name='merge_inverse_data_into_original')


[docs]def merge_inverse_data_into_original( req, sort_on_index=None, ordered_columns=None): """merge_inverse_data_into_original :param req: managed dictionary :param sort_on_index: sort the dataframe on this column name :param ordered_columns: column list to rename the inverse transform """ label = req.get( "label", "") last_step = "not-run" status = NOTRUN org_df = None predict_df = None merge_df = None res = { "status": status, "err": last_step, "sorted_org_df": org_df, "predict_df": predict_df, "merge_df": merge_df } try: last_step = ("sorting org_records on index={}").format( sort_on_index) log.info(("{} - {}") .format( label, last_step)) org_df = pd.DataFrame(req["org_recs"]).set_index(sort_on_index) inverse_df = pd.DataFrame(req["inverse_recs"]) if ordered_columns: last_step = "getting inverse column ordering={}".format( ordered_columns) log.info(("{} - {}") .format( label, last_step)) inverse_df.columns = ordered_columns if sort_on_index: last_step = ("sorting inverse on index={}").format( sort_on_index) log.info(("{} - {}") .format( label, last_step)) inverse_df.set_index(sort_on_index) row_idx = 0 merge_rows = [] predict_rows = [] last_step = ("for org_value rows={}").format( len(org_df.index)) log.info(("{} - {}") .format( label, last_step)) predicted_index = list(org_df.index) for idx, row in org_df.iterrows(): inv_row = inverse_df.iloc[row_idx].to_json() inv_dict = json.loads(inv_row) predict_rows.append(inv_dict) cur_dict = json.loads(row.to_json()) cur_dict[sort_on_index] = predicted_index[row_idx] cur_dict["_row_idx"] = row_idx for i in inv_dict: if i in cur_dict: cur_dict["_predicted_{}".format( i)] = inv_dict[i] merge_rows.append(cur_dict) row_idx += 1 # end of for all rows to check for ordering last_step = ("building predict_df rows={}").format( len(predict_rows)) log.info(("{} - {}") .format( label, last_step)) predict_df = pd.DataFrame( predict_rows) merge_df = pd.DataFrame( merge_rows) if sort_on_index: last_step = ("sorting={} predict_df rows={}").format( sort_on_index, len(predict_rows)) log.info(("{} - {}") .format( label, last_step)) predict_df.set_index(sort_on_index) merge_df.set_index(sort_on_index) last_step = "" status = SUCCESS except Exception as e: last_step = ("failed merge_inverse_data_into_original " "with ex={} last_step='{}' " "org_recs={} inverse_recs={}").format( e, last_step, len(org_df.index), len(inverse_df.index)) log.error(("{} - {}") .format( label, last_step)) status = ERR # end of try/ex res = { "status": status, "err": last_step, "sorted_org_df": org_df, "predict_df": predict_df, "merge_df": merge_df } return res
# end of merge_inverse_data_into_original