Script 1251: Error Alert Revenue File With High Mismatch Rates
Purpose:
The Python script alerts users when the error rate in revenue processing files exceeds a specified threshold.
To Elaborate
The script is designed to monitor and alert users when the error rate in revenue processing files surpasses a predefined threshold. It aggregates error statistics from a data source, focusing on files with error rates higher than the specified limit. The script processes data locally or on a server, depending on the execution environment, and uses a pickled data source for local execution. It calculates the error ratio for each file and outputs those with high error rates, enabling users to identify and address issues in revenue processing promptly.
Walking Through the Code
- User Configurable Parameters
- The script begins by defining a user-configurable parameter,
ERROR_RATIO_THRESHOLD
, which sets the threshold for the error rate that triggers an alert.
- The script begins by defining a user-configurable parameter,
- Local Mode Configuration
- The script includes steps for setting up local execution, such as downloading a data source dictionary and loading it from a pickle file. It checks whether the code is running on a server or locally and initializes the data source accordingly.
- Data Processing
- The script processes the input data frame, converting timestamps and aggregating data by revenue file name. It calculates the total line count and error count for each file, then computes the error ratio and percentage.
- Output Generation
- Files with an error ratio exceeding the threshold are selected for output. The script prepares the output data frame with relevant columns and writes it to a CSV file if running locally.
- Debugging and Output
- For local development, the script writes debug information and the output data frame to CSV files, facilitating further analysis and debugging.
Vitals
- Script ID : 1251
- Client ID / Customer ID: 197178269 / 13095968
- Action Type: Email Report
- Item Changed: None
- Output Columns:
- Linked Datasource: FTP/Email Feed
- Reference Datasource: None
- Owner: Michael Huang (mhuang@marinsoftware.com)
- Created by Michael Huang on 2024-07-03 00:55
- Last Updated by Chris Jetton on 2024-07-16 22:30
> See it in Action
Python Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
##
## name: Alert on high Revenue Processing Error
## description:
## * aggregate error stats from revenue_results_hybrid
## * include files with error rate higher than threshold
##
## author: Michael S. Huang
## created: 2024-07-03
##
########### START - User Configurable Params ###########
ERROR_RATIO_THRESHOLD = 0.05
########### END - User Configurable Params ###########
########### START - Local Mode Config ###########
# Step 1: Uncomment download_preview_input flag and run Preview successfully with the Datasources you want
download_preview_input=False
# Step 2: In MarinOne, go to Scripts -> Preview -> Logs, download 'dataSourceDict' pickle file, and update pickle_path below
# pickle_path = ''
pickle_path = '/Users/mhuang/Downloads/pickle/amazon_music_revenue_results_20240702.pkl'
# Step 3: Copy this script into local IDE with Python virtual env loaded with pandas and numpy.
# Step 4: Run locally with below code to init dataSourceDict
# determine if code is running on server or locally
def is_executing_on_server():
try:
# Attempt to access a known restricted builtin
dict_items = dataSourceDict.items()
return True
except NameError:
# NameError: dataSourceDict object is missing (indicating not on server)
return False
local_dev = False
if is_executing_on_server():
print("Code is executing on server. Skip init.")
elif len(pickle_path) > 3:
print("Code is NOT executing on server. Doing init.")
local_dev = True
# load dataSourceDict via pickled file
import pickle
dataSourceDict = pickle.load(open(pickle_path, 'rb'))
# print shape and first 5 rows for each entry in dataSourceDict
for key, value in dataSourceDict.items():
print(f"Shape of dataSourceDict[{key}]: {value.shape}")
# print(f"First 5 rows of dataSourceDict[{key}]:\n{value.head(5)}")
# set outputDf same as inputDf
inputDf = dataSourceDict["1"]
outputDf = inputDf.copy()
# setup timezone
import datetime
# Chicago Timezone is GMT-5. Adjust as needed.
CLIENT_TIMEZONE = datetime.timezone(datetime.timedelta(hours=-5))
# import pandas
import pandas as pd
import numpy as np
# other imports
import re
import urllib
# import Marin util functions
from marin_scripts_utils import tableize, select_changed
# pandas settings
pd.set_option('display.max_columns', None) # Display all columns
pd.set_option('display.max_colwidth', None) # Display full content of each column
else:
print("Running locally but no pickle path defined. dataSourceDict not loaded.")
exit(1)
########### END - Local Mode Setup ###########
today = datetime.datetime.now(CLIENT_TIMEZONE).date()
# primary data source and columns
inputDf = dataSourceDict["1"]
FEED_COL_RAW_REV_FILE_NAME = 'raw_rev_file_name'
FEED_COL_LINE_INDEX = 'line_index'
FEED_COL_RESULT = 'Result'
FEED_COL_CUST_ID = 'cust_id'
FEED_COL_CLIENT_ID = 'client_id'
FEED_COL_REVENUE_SOURCE = 'revenue_source'
FEED_COL_JOB_NOMINAL_DATE = 'job_nominal_date'
FEED_COL_CLICK_MAPPING_TIMESTAMP = 'click_mapping_timestamp'
COL_FILE_LINE_COUNT = 'file_line_count'
COL_FILE_ERROR_COUNT = 'file_error_count'
COL_FILE_ERROR_RATIO = 'file_error_ratio'
COL_FILE_ERROR_PERCENTAGE = 'file_error_pct'
COL_FILE_PROCESSED_TIME = 'file_processed_time'
#### User Code Starts Here
print("inputDf.shape", inputDf.shape)
print("inputDf head", tableize(inputDf.head()))
# convert timestamp
inputDf[COL_FILE_PROCESSED_TIME] = pd.to_datetime(inputDf[FEED_COL_CLICK_MAPPING_TIMESTAMP], unit='ms').dt.tz_localize('UTC').dt.tz_convert('America/Los_Angeles')
# Calculate the required columns for each revenue filename
agg_funcs = {
FEED_COL_JOB_NOMINAL_DATE: 'first',
COL_FILE_PROCESSED_TIME: 'first',
FEED_COL_REVENUE_SOURCE: 'first',
FEED_COL_LINE_INDEX: 'count',
FEED_COL_RESULT: lambda x: x.str.startswith('ERROR').sum()
}
agg_df = inputDf.groupby(FEED_COL_RAW_REV_FILE_NAME) \
.agg(agg_funcs) \
.reset_index()
agg_df.columns = [FEED_COL_RAW_REV_FILE_NAME, FEED_COL_JOB_NOMINAL_DATE, COL_FILE_PROCESSED_TIME, FEED_COL_REVENUE_SOURCE, COL_FILE_LINE_COUNT, COL_FILE_ERROR_COUNT]
# Calculate error ratio and percentage
agg_df[COL_FILE_ERROR_RATIO] = np.round(agg_df[COL_FILE_ERROR_COUNT] / agg_df[COL_FILE_LINE_COUNT], 4)
agg_df[COL_FILE_ERROR_PERCENTAGE] = (agg_df[COL_FILE_ERROR_RATIO] * 100).round(2).astype(str) + '%'
debugDf = agg_df
# select high error rate files for output
out_cols = [FEED_COL_JOB_NOMINAL_DATE, COL_FILE_PROCESSED_TIME, FEED_COL_REVENUE_SOURCE, COL_FILE_LINE_COUNT, COL_FILE_ERROR_PERCENTAGE, FEED_COL_RAW_REV_FILE_NAME]
outputDf = agg_df.loc[agg_df[COL_FILE_ERROR_RATIO] > ERROR_RATIO_THRESHOLD, out_cols]
print("outputDf.shape", outputDf.shape)
print("outputDf head", tableize(outputDf.head()))
### Construct Complete Prompt
## Build Prompt
# blank out prompt for now
emailSummaryPrompt = ''
### local debug
if local_dev:
with open('prompt.txt', 'w') as file:
file.write(emailSummaryPrompt)
print(f"Local Dev: Prompt written to: {file.name}")
output_filename = 'outputDf.csv'
outputDf.to_csv(output_filename, index=False)
print(f"Local Dev: Output written to: {output_filename}")
debug_filename = 'debugDf.csv'
debugDf.to_csv(debug_filename, index=False)
print(f"Local Dev: Debug written to: {debug_filename}")
else:
print("====== Prompt =====")
print(emailSummaryPrompt)
print("===========")
Post generated on 2025-03-11 01:25:51 GMT