Skip to content

Commit

Permalink
[Tested, Will fail]Integrating RF model on server and more Unit test
Browse files Browse the repository at this point in the history
This will fail due to testForest.py file.
Changes here include :

1. Integrated the shifting of randomForest model from eval to server.

2. unit tests for Model save and load

3. RegressionTest for RF model in testRandomForest.py.
  • Loading branch information
humbleOldSage committed Dec 9, 2023
1 parent 87f109c commit 33cdaab
Show file tree
Hide file tree
Showing 5 changed files with 471 additions and 166 deletions.
45 changes: 34 additions & 11 deletions emission/analysis/modelling/trip_model/forest_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
import emission.analysis.modelling.trip_model.config as eamtc
import emission.storage.timeseries.builtin_timeseries as estb
import emission.storage.decorations.trip_queries as esdtq
from emission.analysis.modelling.trip_model.models import ForestClassifierModel
from emission.analysis.modelling.trip_model.models import ForestClassifier

EARTH_RADIUS = 6371000

class ForestClassifier(eamuu.TripModel):
class ForestClassifierModel(eamuu.TripModel):

def __init__(self,config=None):

Expand Down Expand Up @@ -54,7 +54,24 @@ def __init__(self,config=None):
if config.get(k) is None:
msg = f"cluster trip model config missing expected key {k}"
raise KeyError(msg)
self.model=ForestClassifierModel(config=config)
maxdepth =config['max_depth'] if config['max_depth']!='null' else None
self.model=ForestClassifier( loc_feature=config['loc_feature'],
radius= config['radius'],
size_thresh=config['radius'],
purity_thresh=config['purity_thresh'],
gamma=config['gamma'],
C=config['C'],
n_estimators=config['n_estimators'],
criterion=config['criterion'],
max_depth=maxdepth,
min_samples_split=config['min_samples_split'],
min_samples_leaf=config['min_samples_leaf'],
max_features=config['max_features'],
bootstrap=config['bootstrap'],
random_state=config['random_state'],
# drop_unclustered=False,
use_start_clusters=config['use_start_clusters'],
use_trip_clusters=config['use_trip_clusters'])


def fit(self,trips: List[ecwc.Confirmedtrip]):
Expand Down Expand Up @@ -89,7 +106,7 @@ def predict(self, trip: List[float]) -> Tuple[List[Dict], int]:
msg = f'model.predict cannot be called with an empty trips'
raise Exception(msg)
# CONVERT LIST OF TRIPS TO dataFrame
test_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",[trip])
test_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",trip)
labeled_trip_df = esdtq.filter_labeled_trips(test_df)
expanded_labeled_trip_df= esdtq.expand_userinputs(labeled_trip_df)
predcitions_df= self.model.predict(expanded_labeled_trip_df)
Expand Down Expand Up @@ -128,8 +145,14 @@ def to_dict(self):
## confirm this includes all the extra encoders/models
attr.extend([ 'cluster_enc','end_cluster_model','start_cluster_model','trip_grouper'])
for attribute_name in attr:
if not hasattr(self.model,attribute_name):
raise ValueError(f"Attribute {attribute_name} not found in the model")

buffer=BytesIO()
joblib.dump(getattr(self.model,attribute_name),buffer)
try:
joblib.dump(getattr(self.model,attribute_name),buffer)
except Exception as e:
raise RuntimeError(f"Error serializing { attribute_name}: {str(e)}")
buffer.seek(0)
data[attribute_name]=buffer.getvalue()

Expand All @@ -144,14 +167,14 @@ def from_dict(self,model: Dict):
## TODO : confirm this includes all the extra encoders/models
attr.extend([ 'cluster_enc','end_cluster_model','start_cluster_model','trip_grouper'])
for attribute_name in attr:
if attribute_name not in model:
raise ValueError(f"Attribute {attribute_name} missing in the model")
try:
if attribute_name in model:
buffer = BytesIO(model[attribute_name])
setattr(self.model,attribute_name, joblib.load(buffer))
buffer = BytesIO(model[attribute_name])
setattr(self.model,attribute_name, joblib.load(buffer))
except Exception as e:
print(f"Error loading {attribute_name}: {str(e)}")
# If we do not wish to raise the exception after logging the error, comment the line below
raise e
raise RuntimeError(f"Error deserializing { attribute_name}: {str(e)}")
# If we do not wish to raise the exception after logging the error, comment the line above

def extract_features(self, trip: ecwc.Confirmedtrip) -> List[float]:
"""
Expand Down
2 changes: 1 addition & 1 deletion emission/analysis/modelling/trip_model/model_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def build(self, config=None) -> eamuu.TripModel:
# Dict[ModelType, TripModel]
MODELS = {
ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning,
ModelType.RANDOM_FOREST_CLASSIFIER: eamuf.ForestClassifier
ModelType.RANDOM_FOREST_CLASSIFIER: eamuf.ForestClassifierModel
}
model = MODELS.get(self)
if model is None:
Expand Down
250 changes: 105 additions & 145 deletions emission/tests/modellingTests/TestForestModel.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,66 @@
import unittest
import logging
import numpy as np
import uuid
import json
import os

import emission.analysis.modelling.trip_model.model_storage as eamums
import emission.analysis.modelling.trip_model.model_type as eamumt
import emission.analysis.modelling.trip_model.run_model as eamur
import emission.analysis.modelling.trip_model.model_type as eamumt
import emission.analysis.modelling.trip_model.model_storage as eamums
import emission.storage.json_wrappers as esj
import emission.storage.timeseries.abstract_timeseries as esta
import emission.tests.modellingTests.modellingTestAssets as etmm
import emission.storage.decorations.analysis_timeseries_queries as esda
import emission.core.get_database as edb
import emission.storage.pipeline_queries as epq
import emission.core.wrapper.pipelinestate as ecwp
import numpy as np
import emission.core.wrapper.entry as ecwe
import emission.storage.decorations.analysis_timeseries_queries as esdatq

class TestRunForestModel(unittest.TestCase):
"""these tests were copied forward during a refactor of the tour model
[https://github.com/e-mission/e-mission-server/blob/10772f892385d44e11e51e796b0780d8f6609a2c/emission/analysis/modelling/tour_model_first_only/load_predict.py#L114]
class TestForestModel(unittest.TestCase):

it's uncertain what condition they are in besides having been refactored to
use the more recent tour modeling code.
"""

def setUp(self):
"""
sets up the end-to-end run model test with Confirmedtrip data
"""
logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s',
level=logging.DEBUG)

# configuration for randomly-generated test data
self.user_id = user_id = 'TestRunForestModel-TestData'
self.origin = (-105.1705977, 39.7402654,)
self.destination = (-105.1755606, 39.7673075)
self.min_trips = 14
self.total_trips = 100
self.clustered_trips = 33 # must have at least self.min_trips similar trips by default
self.has_label_percent = 0.9 # let's make a few that don't have a label, but invariant
# $clustered_trips * $has_label_percent > self.min_trips
# must be correct or else this test could fail under some random test cases.

# for a negative test, below
self.unused_user_id = 'asdjfkl;asdfjkl;asd08234ur13fi4jhf2103mkl'

# test data can be saved between test invocations, check if data exists before generating
ts = esta.TimeSeries.get_time_series(user_id)
test_data = list(ts.find_entries(["analysis/confirmed_trip"]))
if len(test_data) == 0:
# generate test data for the database
logging.debug(f"inserting mock Confirmedtrips into database")

# generate labels with a known sample weight that we can rely on in the test
label_data = {
"mode_confirm": ['ebike', 'bike'],
"purpose_confirm": ['happy-hour', 'dog-park'],
"replaced_mode": ['walk'],
"mode_weights": [0.9, 0.1],
"purpose_weights": [0.1, 0.9]
}

train = etmm.generate_mock_trips(
user_id=user_id,
trips=self.total_trips,
origin=self.origin,
destination=self.destination,
trip_part='od',
label_data=label_data,
within_threshold=self.clustered_trips,
threshold=0.004, # ~400m
has_label_p=self.has_label_percent
)

ts.bulk_insert(train)

# confirm data write did not fail
test_data = esda.get_entries(key="analysis/confirmed_trip", user_id=user_id, time_query=None)
if len(test_data) != self.total_trips:
logging.debug(f'test invariant failed after generating test data')
self.fail()
else:
logging.debug(f'found {self.total_trips} trips in database')

self.user_id = uuid.UUID('aa9fdec9-2944-446c-8ee2-50d79b3044d3')
self.ts = esta.TimeSeries.get_time_series(self.user_id)
self.new_trips_per_invocation = 3
self.model_type = eamumt.ModelType.RANDOM_FOREST_CLASSIFIER
self.model_storage = eamums.ModelStorage.DOCUMENT_DATABASE
sim_threshold = 500 # meters
self.forest_model_config= {
"loc_feature" : "coordinates",
"radius": 500,
"size_thresh":1,
"purity_thresh":1.0,
"gamma":0.05,
"C":1,
"n_estimators":100,
"criterion":"gini",
"max_depth":'null',
"min_samples_split":2,
"min_samples_leaf":1,
"max_features":"sqrt",
"bootstrap":True,
"random_state":42,
"use_start_clusters":False,
"use_trip_clusters":True
}

existing_entries_for_user = list(self.ts.find_entries([esdatq.CONFIRMED_TRIP_KEY]))
if len(existing_entries_for_user) != 0:
raise Exception(f"test invariant failed, there should be no entries for user {self.user_id}")

# load in trips from a test file source
input_file = 'emission/tests/data/real_examples/shankari_2016-06-20.expected_confirmed_trips'
with open(input_file, 'r') as f:
trips_json = json.load(f, object_hook=esj.wrapped_object_hook)
self.trips = [ecwe.Entry(r) for r in trips_json]
logging.debug(f'loaded {len(self.trips)} trips from {input_file}')

def tearDown(self):
"""
clean up database
Expand All @@ -88,86 +70,64 @@ def tearDown(self):
edb.get_pipeline_state_db().delete_many({'user_id': self.user_id})


# def test_model_consistency(self):
# """
# Test to ensure that the model's predictions on the mock data remain consistent.
# """
# # Get the mock data from the parent class's setup
# mock_data = self.mock_data

# # Predict using the model
# current_predictions = eamur.predict_labels_with_n(
# trip=mock_data,
# model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER,
# model_storage=eamums.ModelStorage.DOCUMENT_DATABASE
# ) # assuming this is how you get predictions
# ## TODO :
# # Check if there are any previously stored predictions
# stored_predictions = list(self.collection.find({}))

# if len(stored_predictions) == 0:
# # If not, store the current predictions as the ground truth
# self.collection.insert_many([{"index": i, "prediction": p} for i, p in enumerate(current_predictions)])
# logging.debug("Stored current model predictions as ground truth.")
# else:
# # If there are stored predictions, compare them with the current predictions
# for stored_pred in stored_predictions:
# index, stored_value = stored_pred["index"], stored_pred["prediction"]
# current_value = current_predictions[index]

# self.assertEqual(stored_value, current_value, f"Prediction at index {index} has changed! Expected {stored_value}, but got {current_value}.")

# logging.debug("Model predictions are consistent with previously stored predictions.")

## TODO : Fix regression Tests

# def test_regression(self):
# """
# Regression test to ensure consistent model results.
# """
# # Load the previously stored predictions (if any)
# previous_predictions = self.load_previous_predictions()
def testRandomForestRegression(self):
"""
test to ensure consistent model results. Load data for a user from json, split
into train and test. After training, we generate predictions and match them with
predictions from last time. If the code is run for the first time, the current predicitons
will be stored as ground truth.
"""
file_path= 'emission/tests/modellingTests/data.json'
split=int(0.9*len(self.trips))
train_data= self.trips[:split]

self.ts.bulk_insert(train_data)

# confirm write to database succeeded
self.initial_data = list(self.ts.find_entries([esdatq.CONFIRMED_TRIP_KEY]))
if len(self.initial_data) == 0:
logging.debug(f'Writing train data failed')
self.fail()

test_data=self.trips[split:]
logging.debug(f'LENDATA{len(train_data),len(test_data)}')
eamur.update_trip_model(
user_id=self.user_id,
model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER,
model_storage=eamums.ModelStorage.DOCUMENT_DATABASE,
min_trips=4,
model_config=self.forest_model_config
)
model = eamur._load_stored_trip_model(
user_id=self.user_id,
model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER,
model_storage=eamums.ModelStorage.DOCUMENT_DATABASE,
model_config=self.forest_model_config
)

# # Run the current model to get predictions
# current_predictions = self.run_current_model()

# # If there are no previous predictions, store the current predictions
# if previous_predictions is None:
# self.store_predictions(current_predictions)
# else:
# # Compare the current predictions with the previous predictions
# self.assertPredictionsMatch(previous_predictions, current_predictions)

# def load_previous_predictions(self):
# # Retrieve stored predictions from the database
# # Using get_analysis_timeseries_db as an example, replace with the correct method if needed
# db = edb.get_analysis_timeseries_db()
# predictions = db.find_one({"user_id": self.user_id, "metadata.key": "predictions"})
# return predictions

# def run_current_model(self):
# # Placeholder: Run the current model and get predictions
# # Replace this with the actual model running code
# predictions = None
# return predictions

# def store_predictions(self, predictions):
# # Store the predictions in the database
# # Using get_analysis_timeseries_db as an example, replace with the correct method if needed
# db = edb.get_analysis_timeseries_db()
# entry = {
# "user_id": self.user_id,
# "metadata": {
# "key": "predictions",
# "write_ts": pd.Timestamp.now().timestamp() # Using pandas timestamp as an example
# },
# "data": predictions
# }
# db.insert_one(entry)

# def assertPredictionsMatch(self, prev, curr):
# # Placeholder: Check if the predictions match
# # This will depend on the format and type of your predictions
# # For example, if predictions are lists or arrays, you can use numpy
# if not np.array_equal(prev, curr):
# self.fail("Current model predictions do not match previously stored predictions!")
curr_predictions_list = eamur.predict_labels_with_n(
trip_list = [test_data],
model=model
)


## predictions take the form like :
#
#{'labels': {'mode_confirm': 'ebike', 'replaced_mode': 'walk', 'purpose_confirm': 'dog-park'}, 'p': 1.0}
# we can store these predictions in a json and then for every run other than the first we
# can load the predictions and compare

try:
if os.path.exists(file_path) and os.path.getsize(file_path)>0:
with open(file_path, 'r') as f:
prev_predictions_list = json.load(f)
logging.debug()
self.assertEqual(prev_predictions_list,curr_predictions_list," previous predictions should match current predictions")
else:
with open(file_path,'w') as file:
json.dump(curr_predictions_list,file,indent=4)
logging.debug("Previous predicitons stored for future matching" )
except json.JSONDecodeError:
logging.debug("jsonDecodeErrorError")
return " decoding JSON."
Loading

0 comments on commit 33cdaab

Please sign in to comment.