Script 1251: Error Alert Revenue File With High Mismatch Rates

Purpose

The script alerts when the error rate in revenue processing for a file exceeds a specified threshold.

To Elaborate

The Python script is designed to monitor and alert users when the error rate in revenue processing files surpasses a predefined threshold. It aggregates error statistics from a data source, specifically focusing on files where the error rate is higher than the set threshold. The script processes data to identify files with high error rates, which could indicate issues in revenue data processing. This is crucial for maintaining data integrity and ensuring accurate financial reporting. The script is configured to run either on a server or locally, with user-configurable parameters to adjust the error threshold and file paths for local execution.

Walking Through the Code

  1. User Configurable Parameters: The script begins by defining a user-configurable parameter, ERROR_RATIO_THRESHOLD, which sets the threshold for the acceptable error rate in revenue processing files.

  2. Local Mode Configuration:
    • The script includes steps for setting up local execution, such as downloading necessary data files and configuring the environment.
    • It checks whether the script is running on a server or locally, and loads data from a pickle file if running locally.
  3. Data Processing:
    • The script loads data into a DataFrame and processes it to calculate error statistics for each revenue file.
    • It converts timestamps and aggregates data to compute the total line count and error count for each file.
  4. Error Calculation:
    • The script calculates the error ratio and percentage for each file.
    • It filters files where the error ratio exceeds the defined threshold and prepares them for output.
  5. Output and Debugging:
    • If running locally, the script writes the output and debug information to CSV files for further analysis.
    • It also constructs an email summary prompt, although this part is not fully detailed in the script.

Vitals

  • Script ID : 1251
  • Client ID / Customer ID: 197178269 / 13095968
  • Action Type: Email Report
  • Item Changed: None
  • Output Columns:
  • Linked Datasource: FTP/Email Feed
  • Reference Datasource: None
  • Owner: Michael Huang (mhuang@marinsoftware.com)
  • Created by Michael Huang on 2024-07-03 00:55
  • Last Updated by Chris Jetton on 2024-07-16 22:30
> See it in Action

Python Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
##
## name: Alert on high Revenue Processing Error
## description:
##  * aggregate error stats from revenue_results_hybrid
##  * include files with error rate higher than threshold
##
## author: Michael S. Huang
## created: 2024-07-03
## 

########### START - User Configurable Params ###########
ERROR_RATIO_THRESHOLD = 0.05
########### END - User Configurable Params ###########



########### START - Local Mode Config ###########
# Step 1: Uncomment download_preview_input flag and run Preview successfully with the Datasources you want
download_preview_input=False
# Step 2: In MarinOne, go to Scripts -> Preview -> Logs, download 'dataSourceDict' pickle file, and update pickle_path below
# pickle_path = ''
pickle_path = '/Users/mhuang/Downloads/pickle/amazon_music_revenue_results_20240702.pkl'
# Step 3: Copy this script into local IDE with Python virtual env loaded with pandas and numpy.
# Step 4: Run locally with below code to init dataSourceDict

# determine if code is running on server or locally
def is_executing_on_server():
    try:
        # Attempt to access a known restricted builtin
        dict_items = dataSourceDict.items()
        return True
    except NameError:
        # NameError: dataSourceDict object is missing (indicating not on server)
        return False

local_dev = False

if is_executing_on_server():
    print("Code is executing on server. Skip init.")
elif len(pickle_path) > 3:
    print("Code is NOT executing on server. Doing init.")
    local_dev = True
    # load dataSourceDict via pickled file
    import pickle
    dataSourceDict = pickle.load(open(pickle_path, 'rb'))

    # print shape and first 5 rows for each entry in dataSourceDict
    for key, value in dataSourceDict.items():
        print(f"Shape of dataSourceDict[{key}]: {value.shape}")
        # print(f"First 5 rows of dataSourceDict[{key}]:\n{value.head(5)}")

    # set outputDf same as inputDf
    inputDf = dataSourceDict["1"]
    outputDf = inputDf.copy()

    # setup timezone
    import datetime
    # Chicago Timezone is GMT-5. Adjust as needed.
    CLIENT_TIMEZONE = datetime.timezone(datetime.timedelta(hours=-5))

    # import pandas
    import pandas as pd
    import numpy as np

    # other imports
    import re
    import urllib

    # import Marin util functions
    from marin_scripts_utils import tableize, select_changed

    # pandas settings
    pd.set_option('display.max_columns', None)  # Display all columns
    pd.set_option('display.max_colwidth', None)  # Display full content of each column

else:
    print("Running locally but no pickle path defined. dataSourceDict not loaded.")
    exit(1)
########### END - Local Mode Setup ###########

today = datetime.datetime.now(CLIENT_TIMEZONE).date()

# primary data source and columns
inputDf = dataSourceDict["1"]
FEED_COL_RAW_REV_FILE_NAME = 'raw_rev_file_name'
FEED_COL_LINE_INDEX = 'line_index'
FEED_COL_RESULT = 'Result'
FEED_COL_CUST_ID = 'cust_id'
FEED_COL_CLIENT_ID = 'client_id'
FEED_COL_REVENUE_SOURCE = 'revenue_source'
FEED_COL_JOB_NOMINAL_DATE = 'job_nominal_date'
FEED_COL_CLICK_MAPPING_TIMESTAMP = 'click_mapping_timestamp'

COL_FILE_LINE_COUNT = 'file_line_count'
COL_FILE_ERROR_COUNT = 'file_error_count'
COL_FILE_ERROR_RATIO = 'file_error_ratio'
COL_FILE_ERROR_PERCENTAGE = 'file_error_pct'
COL_FILE_PROCESSED_TIME = 'file_processed_time'

#### User Code Starts Here

print("inputDf.shape", inputDf.shape)
print("inputDf head", tableize(inputDf.head()))

# convert timestamp
inputDf[COL_FILE_PROCESSED_TIME] = pd.to_datetime(inputDf[FEED_COL_CLICK_MAPPING_TIMESTAMP], unit='ms').dt.tz_localize('UTC').dt.tz_convert('America/Los_Angeles')

# Calculate the required columns for each revenue filename
agg_funcs = {
    FEED_COL_JOB_NOMINAL_DATE: 'first',
    COL_FILE_PROCESSED_TIME: 'first',
    FEED_COL_REVENUE_SOURCE: 'first',
    FEED_COL_LINE_INDEX: 'count',
    FEED_COL_RESULT: lambda x: x.str.startswith('ERROR').sum()
}

agg_df = inputDf.groupby(FEED_COL_RAW_REV_FILE_NAME) \
                .agg(agg_funcs) \
                .reset_index()

agg_df.columns = [FEED_COL_RAW_REV_FILE_NAME, FEED_COL_JOB_NOMINAL_DATE, COL_FILE_PROCESSED_TIME, FEED_COL_REVENUE_SOURCE, COL_FILE_LINE_COUNT, COL_FILE_ERROR_COUNT]

# Calculate error ratio and percentage
agg_df[COL_FILE_ERROR_RATIO] = np.round(agg_df[COL_FILE_ERROR_COUNT] / agg_df[COL_FILE_LINE_COUNT], 4)
agg_df[COL_FILE_ERROR_PERCENTAGE] = (agg_df[COL_FILE_ERROR_RATIO] * 100).round(2).astype(str) + '%'

debugDf = agg_df

# select high error rate files for output
out_cols = [FEED_COL_JOB_NOMINAL_DATE, COL_FILE_PROCESSED_TIME, FEED_COL_REVENUE_SOURCE, COL_FILE_LINE_COUNT, COL_FILE_ERROR_PERCENTAGE, FEED_COL_RAW_REV_FILE_NAME]
outputDf = agg_df.loc[agg_df[COL_FILE_ERROR_RATIO] > ERROR_RATIO_THRESHOLD, out_cols]


print("outputDf.shape", outputDf.shape)
print("outputDf head", tableize(outputDf.head()))

### Construct Complete Prompt
    
## Build Prompt

# blank out prompt for now
emailSummaryPrompt = ''

### local debug

if local_dev:
    with open('prompt.txt', 'w') as file:
        file.write(emailSummaryPrompt)
        print(f"Local Dev: Prompt written to: {file.name}")

    output_filename = 'outputDf.csv'
    outputDf.to_csv(output_filename, index=False)
    print(f"Local Dev: Output written to: {output_filename}")

    debug_filename = 'debugDf.csv'
    debugDf.to_csv(debug_filename, index=False)
    print(f"Local Dev: Debug written to: {debug_filename}")

else:
    print("====== Prompt =====")
    print(emailSummaryPrompt)
    print("===========")


Post generated on 2024-11-27 06:58:46 GMT

comments powered by Disqus