1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
import json
import requests
from ...constants.constants import usage_request_cookies, generic_request_headers, electric_usage_request_json, water_usage_request_json, water_request_endpoint, electric_request_endpoint, RequestMode, useless_response_keys
from datetime import date
from datetime import timedelta
# Electric
def do_electric_request(request_mode: RequestMode):
electric_usage_request_json['Mode'] = request_mode.value
if (request_mode in [RequestMode.halfHour, RequestMode.hour]):
electric_usage_request_json['strDate'] = get_yesterday_date()
return perform_electric_request()
def perform_electric_request():
electric_usage_response = requests.post(
electric_request_endpoint,
cookies=usage_request_cookies,
headers=generic_request_headers,
json=electric_usage_request_json
)
return parse_response(electric_usage_response)
def request_electric():
return {
"halfHour": do_electric_request(RequestMode.halfHour),
"hour": do_electric_request(RequestMode.hour),
"day": do_electric_request(RequestMode.day),
"month": do_electric_request(RequestMode.month)
}
# Water
def do_water_request(requestMode: RequestMode):
water_usage_request_json['Mode'] = requestMode.value
if (requestMode in [RequestMode.hour]):
water_usage_request_json['strDate'] = get_yesterday_date()
return perform_water_request()
def perform_water_request():
water_usage_response = requests.post(
water_request_endpoint,
cookies=usage_request_cookies,
headers=generic_request_headers,
json=water_usage_request_json
)
return parse_response(water_usage_response)
def request_water():
return {
"hour": do_water_request(RequestMode.hour),
"day": do_water_request(RequestMode.day),
"month": do_water_request(RequestMode.month)
}
# Utility methods
def parse_response(response):
json_response = json.loads(response.text.replace("\\\"", "\"").replace("\\\"", "\"").replace("\"{\"", "{\"").replace("}\"}", "}}"))['d']
clean_response(json_response)
return {
"usageData": json_response['objUsageGenerationResultSetTwo'], # Raw usage data for each timeframe
"tentativeData": json_response['getTentativeData'] # Accumulated usage data and predictions
}
def setup_request_params(parameters):
# Setup cookies and csrftoken to perform requests
usage_request_cookies['ApplicationGatewayAffinityCORS'] = parameters['aga']
usage_request_cookies['ApplicationGatewayAffinity'] = parameters['aga']
usage_request_cookies['ASP.NET_SessionId'] = parameters['asi']
usage_request_cookies['SCP'] = parameters['lt']
generic_request_headers['csrftoken'] = parameters['ct']
# Service calling method
def request_usage_data(request_params):
setup_request_params(request_params)
return {
"electric": request_electric(),
"water": request_water()
}
def get_yesterday_date():
yesterday = date.today() - timedelta(days = 1)
return yesterday.strftime("%x")
def clean_response(response: dict):
# Remove the response keys that provide no information
for key in useless_response_keys:
remove_key(response, key)
def remove_key(object, key_to_remove):
# If object is a dict, recursively search over data for keyBeingRemoved
if (isinstance(object, dict)):
for list_data in list(object):
if list_data == key_to_remove:
del object[key_to_remove]
else:
remove_key(object[list_data], key_to_remove)
# If the object is a list, iterate over each object in the list
elif (isinstance(object, list)):
for list_data in list(object):
remove_key(list_data, key_to_remove)
|