Script 533: Campaign Anomoly
Purpose
Python script to detect anomalies in campaign data and generate an anomaly report.
To Elaborate
The Python script solves the problem of identifying accounts and campaigns with anomalies in campaign data. It calculates an anomaly score based on the deviation of metrics such as cost, impressions, and click-through rate (CTR) from predicted values. The script then generates an anomaly report listing the accounts and campaigns with outlier scores for each campaign type.
The key business rules of the script are as follows:
- For each campaign type, find accounts with anomalies for the previous day.
- For each account with anomalies, identify the contributing campaigns.
- Calculate the anomaly score as the deviation times the metric value, and list the account/campaign with outlier scores for each campaign type/account.
The script uses the following columns/constants:
- RPT_COL_CAMPAIGN: Campaign name
- RPT_COL_DATE: Date of the data
- RPT_COL_ACCOUNT: Account name
- RPT_COL_CAMPAIGN_TYPE: Campaign type
- RPT_COL_CAMPAIGN_STATUS: Campaign status
- RPT_COL_PUB_COST: Cost of publication
- RPT_COL_IMPR: Impressions
- RPT_COL_CLICKS: Clicks
- RPT_COL_CTR: Click-through rate
The user can change the following parameters:
- metric: The metric to calculate anomalies for (cost, impressions, CTR)
- threshold: The threshold for deviation percentage to consider as an anomaly
The major structure and flow of the Python script are as follows:
Walking Through the Code
- Define column constants and import necessary libraries.
- Define a function to get predictions from previous 5 weeks of data.
- Define a function to calculate anomalies based on predicted and actual data.
- Define a function to get outliers from a dataset based on an anomaly score column.
- Print the tail of the input dataframe.
- Drop unnecessary columns from the input dataframe.
- Perform data cleaning to replace special characters in the account column.
- Group the reduced dataframe by campaign type, account, and date, and aggregate metrics.
- Apply the getPredictions function to get account-level predictions.
- Calculate click-through rate (CTR) for account-level predictions.
- Reset the index of the aggregated dataframe and get the last row for each account.
- Calculate CTR for account-level actual data.
- Set the index of the campaign-level prediction dataframe and apply the getPredictions function.
- Calculate CTR for campaign-level predictions.
- Group the reduced dataframe by account, campaign type, and campaign, and get the last row for each campaign.
- Calculate CTR for campaign-level actual data.
- Get anomalies for impressions at the account level.
- Filter anomalies based on a minimum predicted impressions threshold.
- Get anomalies for impressions at the campaign level.
- Filter anomalies based on a minimum predicted impressions threshold.
- Get outliers for impressions at the campaign level, grouped by campaign type and account.
- Concatenate the campaign-level and account-level anomalies for impressions.
- Get anomalies for CTR at the account level.
- Filter anomalies based on a minimum predicted impressions threshold.
- Get anomalies for CTR at the campaign level.
- Filter anomalies based on a minimum predicted impressions threshold.
- Get outliers for CTR at the campaign level, grouped by campaign type and account.
- Concatenate the campaign-level and account-level anomalies for CTR.
- Get anomalies for cost at the account level.
- Filter anomalies based on a minimum predicted impressions threshold.
- Get anomalies for cost at the campaign level.
- Filter anomalies based on a minimum predicted impressions threshold.
- Get outliers for cost at the campaign level, grouped by campaign type and account.
- Concatenate the campaign-level and account-level anomalies for cost.
- Sort the CSV output by metric, campaign type, and account.
- Round all floats to 1 decimal place.
- Print the head of the CSV output in a table format.
Vitals
- Script ID : 533
- Client ID / Customer ID: 1306926667 / 60270093
- Action Type: Email Report
- Item Changed: None
- Output Columns:
- Linked Datasource: M1 Report
- Reference Datasource: None
- Owner: ascott@marinsoftware.com (ascott@marinsoftware.com)
- Created by ascott@marinsoftware.com on 2023-11-14 20:46
- Last Updated by Michael Huang on 2023-12-06 04:01
> See it in Action
Python Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
### # Scripts - Campaign Anomaly Detection by Campaign Type - Fisher Investments 68428
#
# * For each Campaign Type, find Accounts with anomaly for yesterday
# * For each Account with anomaly, identify contributing Campaigns
# * Calc Anomaly Score (Deviation times Metric), and list account/campaign with outlier scores for each CampaignType/Account
# * for Ratio Metrics, use another scaling factor that scales with spend like Impressions, Clicks, Cost
#
# Initial Scope
# * Cost, Impressions, CTR
#
#
# Authors: Michael S Huang, Adam Scott
# Created: 2023-11-14
#
#
RPT_COL_CAMPAIGN = 'Campaign'
RPT_COL_DATE = 'Date'
RPT_COL_ACCOUNT = 'Account'
RPT_COL_CAMPAIGN_TYPE = 'Campaign Type'
RPT_COL_CAMPAIGN_STATUS = 'Campaign Status'
RPT_COL_PUB_COST = 'Pub. Cost $'
RPT_COL_IMPR = 'Impr.'
RPT_COL_CLICKS = 'Clicks'
RPT_COL_CTR = 'CTR %'
BULK_COL_ACCOUNT = 'Account'
BULK_COL_CAMPAIGN = 'Campaign'
BULK_COL_PRODUCTTEST = 'Status'
# Function to get prediction from previous 5 weeks
def getPredictions(data):
if len(data) >= 36:
prediction = np.round(np.mean(data.iloc[[-8, -15, -22, -29, -36]], axis=0), 0)
return prediction
else:
print("not enough data. skipping: ", data.index)
return None
def getAnomalies(df_predict, df_actual, metric, threshold):
PREDICT = '_predict'
DEVIATION = 'deviation'
DEVIATION_PCT = 'deviation_pct'
SCORE = 'anomaly_score'
df_predict_renamed = df_predict.add_suffix(PREDICT)
df_combined = pd.concat([df_predict_renamed, df_actual], axis=1)
df_combined['metric'] = metric
# negative deviation when less than predicted
df_combined[DEVIATION] = np.round(df_combined[metric] - df_combined[metric+PREDICT], 1)
df_combined[DEVIATION_PCT] = np.round((df_combined[metric] - df_combined[metric+PREDICT]) / df_combined[metric+PREDICT] * 100, 1)
# compute anomaly score based on metric type and deviation pct
# metric that are ratios should use another appropriate factor to scale
if metric == RPT_COL_CTR:
# use impressions to scale
df_combined[SCORE] = np.abs(np.round(df_combined[RPT_COL_IMPR+PREDICT] * df_combined[DEVIATION_PCT], 1))
else:
# in general, anomaly score is proportional to metric value and deviation pct
df_combined[SCORE] = np.abs(np.round(df_combined[metric+PREDICT] * df_combined[DEVIATION_PCT], 1))
df_anomaly = df_combined.loc[ df_combined[DEVIATION_PCT].abs() >= threshold ]
if not df_anomaly.empty:
df_anomaly = df_anomaly.sort_values(by=SCORE, ascending=False)
return df_anomaly
# returns outliers
# if no obvious outliers, just return top 2
def getOutlier(data, col='anomaly_score', thresh=2):
# for short lists, just return top 1
if len(data) < 3:
return data.iloc[[0]]
# to reduce noise, skip low anomaly score rows
#data = data.loc[data[col] > 5000]
IRQ = data[col].quantile(0.75) - data[col].quantile(0.25)
upper_bound = data[col].quantile(0.75) + (thresh * IRQ)
lower_bound = data[col].quantile(0.25) - (thresh * IRQ)
anomalies_mask_over = data[col] > upper_bound
anomalies_mask_under = data[col] < lower_bound
if sum(anomalies_mask_over) > 0:
# since anomaly score is calculated using abs(), higher is worst. Only care about over upper bound
return data.loc[ anomalies_mask_over ]
else:
# return top 2
return data.iloc[[0,1]]
print("inputDf", inputDf.tail(20).to_string())
inputDf_reduced = inputDf.drop([RPT_COL_CAMPAIGN_STATUS], axis=1)
print("inputDf_reduced shape", inputDf_reduced.shape)
# Hack around FEND-17759 bug
inputDf_reduced[RPT_COL_ACCOUNT] = inputDf_reduced[RPT_COL_ACCOUNT].str.replace('���','–')
inputDf_reduced[RPT_COL_ACCOUNT] = inputDf_reduced[RPT_COL_ACCOUNT].str.replace('��','–')
inputDf_reduced[RPT_COL_ACCOUNT] = inputDf_reduced[RPT_COL_ACCOUNT].str.replace('�','–')
# ## Prepare Timeseries for each Campaign Type and Account
agg_func_selection = {
RPT_COL_IMPR: ['sum'],
RPT_COL_CLICKS: ['sum'],
RPT_COL_PUB_COST: ['sum'],
}
df_agg_campaigntype_account_date = inputDf_reduced.groupby([RPT_COL_CAMPAIGN_TYPE,RPT_COL_ACCOUNT,RPT_COL_DATE]) \
.agg(agg_func_selection) \
.droplevel(1, axis=1)
# Account-level prediction
df_predict = df_agg_campaigntype_account_date.groupby([RPT_COL_CAMPAIGN_TYPE,RPT_COL_ACCOUNT])\
.apply(getPredictions)
df_predict[RPT_COL_CTR] = np.round(df_predict[RPT_COL_CLICKS] / df_predict[RPT_COL_IMPR], 3) * 100
# Account-level actual
df_actual = df_agg_campaigntype_account_date.reset_index() \
.groupby([RPT_COL_CAMPAIGN_TYPE,RPT_COL_ACCOUNT])\
.apply(lambda x: x.iloc[-1]) \
.reset_index(drop=True) \
.set_index([RPT_COL_CAMPAIGN_TYPE,RPT_COL_ACCOUNT])
df_actual[RPT_COL_CTR] = np.round(df_actual[RPT_COL_CLICKS] / df_actual[RPT_COL_IMPR], 3) * 100
# Campaign-level prediction
df_predict_campaign = inputDf_reduced \
.set_index([RPT_COL_ACCOUNT,RPT_COL_CAMPAIGN_TYPE,RPT_COL_CAMPAIGN,RPT_COL_DATE]) \
.groupby([RPT_COL_ACCOUNT,RPT_COL_CAMPAIGN_TYPE,RPT_COL_CAMPAIGN]) \
.apply(getPredictions)
df_predict_campaign[RPT_COL_CTR] = np.round(df_predict_campaign[RPT_COL_CLICKS] / df_predict_campaign[RPT_COL_IMPR], 3) * 100
# Campaign-level actual
df_actual_campaign = inputDf_reduced.groupby([RPT_COL_ACCOUNT,RPT_COL_CAMPAIGN_TYPE,RPT_COL_CAMPAIGN])\
.apply(lambda x: x.iloc[-1]) \
.reset_index(drop=True) \
.set_index([RPT_COL_ACCOUNT,RPT_COL_CAMPAIGN_TYPE,RPT_COL_CAMPAIGN])
df_actual_campaign[RPT_COL_CTR] = np.round(df_actual_campaign[RPT_COL_CLICKS] / df_actual_campaign[RPT_COL_IMPR], 3) * 100
# ## Build the entire Anomaly Report
# IMPR
df_account_anomaly_all = getAnomalies(df_predict, df_actual, RPT_COL_IMPR, 20)
df_account_anomaly_output = df_account_anomaly_all.loc[ df_account_anomaly_all[RPT_COL_IMPR+'_predict'] > 1000]
df_campaign_anomaly_all = getAnomalies(df_predict_campaign, df_actual_campaign, RPT_COL_IMPR, 0)
df_campaign_anomaly_output = df_campaign_anomaly_all.loc[df_account_anomaly_output.index.get_level_values(1)] \
.loc[ df_campaign_anomaly_all[RPT_COL_IMPR+'_predict'] > 100] \
.reset_index() \
.sort_index() \
.groupby([RPT_COL_CAMPAIGN_TYPE, RPT_COL_ACCOUNT]) \
.apply(getOutlier) \
.reset_index(drop=True) \
.set_index([RPT_COL_CAMPAIGN_TYPE, RPT_COL_ACCOUNT])
df_csv_output = pd.concat([df_campaign_anomaly_output, df_account_anomaly_output], axis=0)
# CTR
df_account_anomaly_all = getAnomalies(df_predict, df_actual, RPT_COL_CTR, 20)
df_account_anomaly_output = df_account_anomaly_all.loc[ df_account_anomaly_all[RPT_COL_IMPR+'_predict'] > 1000]
df_campaign_anomaly_all = getAnomalies(df_predict_campaign, df_actual_campaign, RPT_COL_CTR, 0)
df_campaign_anomaly_output = df_campaign_anomaly_all.loc[df_account_anomaly_output.index.get_level_values(1)] \
.loc[ df_campaign_anomaly_all[RPT_COL_IMPR+'_predict'] > 100] \
.reset_index() \
.sort_index() \
.groupby([RPT_COL_CAMPAIGN_TYPE, RPT_COL_ACCOUNT]) \
.apply(getOutlier) \
.reset_index(drop=True) \
.set_index([RPT_COL_CAMPAIGN_TYPE, RPT_COL_ACCOUNT])
df_csv_output = pd.concat([df_csv_output, df_campaign_anomaly_output, df_account_anomaly_output], axis=0)
# Cost
df_account_anomaly_all = getAnomalies(df_predict, df_actual, RPT_COL_PUB_COST, 20)
df_account_anomaly_output = df_account_anomaly_all.loc[ df_account_anomaly_all[RPT_COL_IMPR+'_predict'] > 1000]
df_campaign_anomaly_all = getAnomalies(df_predict_campaign, df_actual_campaign, RPT_COL_PUB_COST, 0)
df_campaign_anomaly_output = df_campaign_anomaly_all.loc[df_account_anomaly_output.index.get_level_values(1)] \
.loc[ df_campaign_anomaly_all[RPT_COL_IMPR+'_predict'] > 100] \
.reset_index() \
.sort_index() \
.groupby([RPT_COL_CAMPAIGN_TYPE, RPT_COL_ACCOUNT]) \
.apply(getOutlier) \
.reset_index(drop=True) \
.set_index([RPT_COL_CAMPAIGN_TYPE, RPT_COL_ACCOUNT])
df_csv_output = pd.concat([df_csv_output, df_campaign_anomaly_output, df_account_anomaly_output], axis=0)
# sort CSV output by Product and Account
outputDf = df_csv_output.reset_index().set_index(['metric',RPT_COL_CAMPAIGN_TYPE, RPT_COL_ACCOUNT]).sort_index().reset_index()
# round all floats with 1 DP
outputDf = round(outputDf, 1)
print("Email CSV", tableize(outputDf.head()))
Post generated on 2024-05-15 07:44:05 GMT