diff --git a/python/mspasspy/db/normalize.py b/python/mspasspy/db/normalize.py index d0bfd85b..e21928e5 100755 --- a/python/mspasspy/db/normalize.py +++ b/python/mspasspy/db/normalize.py @@ -221,54 +221,54 @@ def find_one(self, mspass_object, *args, **kwargs) -> tuple: """ pass - def find_doc(self,doc)->Metadata: + + def find_doc(self, doc) -> Metadata: """ find a unique match using a python dictionary as input. - - The bulk_normalize function requires an implementation of a - method with this name. It is conceptually similar to find_one - but it uses a python dictionary (the doc argument) as input - instead of a mspass seismic data object. It also returns only - a Metadata container on success or None if it fails to find - a match. - - This method is little more than a thin wrapper around - an implementation of the find_one method. It checkes - the elog for entries marked Invalid and if - so returns None. Otherwise it converts the Metdata container - to a python dictionary it return. It is part of the + + The bulk_normalize function requires an implementation of a + method with this name. It is conceptually similar to find_one + but it uses a python dictionary (the doc argument) as input + instead of a mspass seismic data object. It also returns only + a Metadata container on success or None if it fails to find + a match. + + This method is little more than a thin wrapper around + an implementation of the find_one method. It checkes + the elog for entries marked Invalid and if + so returns None. Otherwise it converts the Metdata container + to a python dictionary it return. It is part of the base class because it depends only on the near equivalence of - a python dictionary and the MsPASS Metadata containers. - - When find_one returns a ErrorLogger object the contents are - inspected. Errors less severe than "Invalid" are ignored + a python dictionary and the MsPASS Metadata containers. + + When find_one returns a ErrorLogger object the contents are + inspected. Errors less severe than "Invalid" are ignored and dropped. If the log contains a message tagged "Invalid" - this function will silently return None. That could be - problematic as it is indistinguishable from the return when - there is no match, but is useful to simply the api. If an - entry is tagged "Fatal" a MsPASSError exception will be + this function will silently return None. That could be + problematic as it is indistinguishable from the return when + there is no match, but is useful to simply the api. If an + entry is tagged "Fatal" a MsPASSError exception will be thrown with the message posted to the MsPASSError container. - - Subclasses may wish to override this method if the approach - used here is inappropriate. i.e. if this were C++ this + + Subclasses may wish to override this method if the approach + used here is inappropriate. i.e. if this were C++ this method would be declared virtual. """ md2test = Metadata(doc) - [md,elog] = self.find_one(md2test) + [md, elog] = self.find_one(md2test) if elog: elist = elog.worst_errors() # Return no success if Invalid - worst=elist[0].badness + worst = elist[0].badness if worst == ErrorSeverity.Invalid: return None - elif worst==ErrorSeverity.Fatal: - message="find_doc method failed. Messages posted:\n" + elif worst == ErrorSeverity.Fatal: + message = "find_doc method failed. Messages posted:\n" for e in elist: message += e.message + "\n" - raise MsPASSError(message,ErrorSeverity.Fatal) - - return dict(md) + raise MsPASSError(message, ErrorSeverity.Fatal) + return dict(md) class DatabaseMatcher(BasicMatcher): @@ -365,11 +365,11 @@ def find(self, mspass_object): is there is no place to put it and something else has gone really wrong. """ - if not isinstance(mspass_object,Metadata): + if not isinstance(mspass_object, Metadata): elog = PyErrorLogger() message = "received invalid data. Arg0 must be a valid MsPASS data object" elog.log_error(message, ErrorSeverity.Invalid) - if hasattr(mspass_object,"dead"): + if hasattr(mspass_object, "dead"): if mspass_object.dead(): return [None, None] query = self.query_generator(mspass_object) @@ -682,14 +682,14 @@ def find(self, mspass_object) -> tuple: containing attributes_to_load and load_if_defined (if appropriate) in each component. """ - if not isinstance(mspass_object,Metadata): + if not isinstance(mspass_object, Metadata): elog = PyErrorLogger() elog.log_error( "Received datum that was not a valid MsPASS data object", ErrorSeverity.Invalid, ) return [None, elog] - if hasattr(mspass_object,"dead"): + if hasattr(mspass_object, "dead"): if mspass_object.dead(): return [None, None] thisid = self.cache_id(mspass_object) @@ -790,7 +790,7 @@ def _df_load_normalization_cache(self, df, collection): This function does the same thing as _db_load_normalization_cache, the only difference is that this current function takes one argument, which is a dataframe. - + :param df: a pandas/dask dataframe where we load data from """ query_result = df @@ -962,7 +962,7 @@ def find(self, mspass_object) -> tuple: null load_if_defined into one Metadata container for each row of the returned DataFrame. """ - if not isinstance(mspass_object,Metadata): + if not isinstance(mspass_object, Metadata): elog = PyErrorLogger( "DataFrameCacheMatcher.find", "Received datum that was not a valid MsPASS data object", @@ -1072,8 +1072,8 @@ def find_one(self, mspass_object) -> tuple: # This is a safety purely for code maintenance. # currently find either returns None or a list with data in it # we enter this safety ONLY if find returns a zero length list - # we raise an exception if that happens because it is - # not expeted. + # we raise an exception if that happens because it is + # not expeted. raise MsPASSError( "DataFrameCacheMatchter.find_one: find returned an empty list. Can only happen if custom matcher has overridden find. Find should return None if the match fails", ErrorSeverity.Fatal, @@ -1956,9 +1956,9 @@ def find_doc(self, doc, wfdoc_starttime_key="starttime"): find_one above where a linear search is used to handle the time interval matching. Here, however, the time field is extracted from doc with the key defined by starttime. - - This method overrides the generic version in BasicMatcher due - to some special peculiarities of miniseed. + + This method overrides the generic version in BasicMatcher due + to some special peculiarities of miniseed. :param doc: document (pretty much assumed to be from wf_miniseed) to be matched with channel or site. @@ -2200,7 +2200,7 @@ def subset(self, mspass_object) -> pd.DataFrame: two other situations can cause the return to have no data: (1) dead input, and (2) match keys missing from mspass_object. """ - if hasattr(mspass_object,"dead"): + if hasattr(mspass_object, "dead"): if mspass_object.dead(): return pd.DataFrame() # I don't think this can cause a memory problem as in python @@ -2482,9 +2482,9 @@ def query_generator(self, mspass_object) -> dict: # badly, but it makes the code more stable - otherwise # a parallel job could, for example, abort if one of the # components in a bag/rdd got set to None - if not isinstance(mspass_object,Metadata): + if not isinstance(mspass_object, Metadata): return None - if hasattr(mspass_object,"dead"): + if hasattr(mspass_object, "dead"): if mspass_object.dead(): return None @@ -2580,10 +2580,10 @@ class OriginTimeMatcher(DataFrameCacheMatcher): this list will need to be changed to remove _id as it in that context no ObjectID would normally be defined. Be warned, however, that if used with a normalize function the _id may be required to match a - "source_id" cross reference in a seismic data object. Also note - that the list must contain the key defined by the related - argument "source_time_key" as that is used to match times in - the source data with data start times. + "source_id" cross reference in a seismic data object. Also note + that the list must contain the key defined by the related + argument "source_time_key" as that is used to match times in + the source data with data start times. :type attributes_to_load: list of string defining keys in collection documents @@ -2623,10 +2623,10 @@ class OriginTimeMatcher(DataFrameCacheMatcher): :type data_time_key: string :param source_time_key: dataframe column name to use as source - origin time field. Default is "time". This key must match - a key in the attributes_to_load list or the constructor will - throw an exception. Note this should match the key definingn - origin time in the collection not the common actual value + origin time field. Default is "time". This key must match + a key in the attributes_to_load list or the constructor will + throw an exception. Note this should match the key definingn + origin time in the collection not the common actual value stored with data. I.e. normal usage is "time" not "source_time" :type source_time_key: string Can also be a None type which is causes the internal value to be set to "time" @@ -2669,23 +2669,22 @@ def __init__( message += "key for fetching origin time=" + self.source_time_key message += " is not in attributes_to_load list\n" message += "Required for matching with waveform start times" - raise MsPASSError(message,ErrorSeverity.Fatal) - + raise MsPASSError(message, ErrorSeverity.Fatal) def subset(self, mspass_object) -> pd.DataFrame: - """ - Implementation of subset method requried by inheritance from - DataframeCacheMatcher. Returns a subset of the cache - Dataframe with source origin times matching the definition - of this object. i.e. a time interval relative to the - start time defined by mspass_object. Note that if a key is - given the time will be extrated from the Metadata container of + """ + Implementation of subset method requried by inheritance from + DataframeCacheMatcher. Returns a subset of the cache + Dataframe with source origin times matching the definition + of this object. i.e. a time interval relative to the + start time defined by mspass_object. Note that if a key is + given the time will be extrated from the Metadata container of mspass_object. If no key is defined (self.data_time_key == None) - the t0 attribute of mspass_object will be used. + the t0 attribute of mspass_object will be used. """ - if not isinstance(mspass_object,Metadata): + if not isinstance(mspass_object, Metadata): return pd.DataFrame() - if hasattr(mspass_object,"dead"): + if hasattr(mspass_object, "dead"): if mspass_object.dead(): return pd.DataFrame() @@ -2714,29 +2713,30 @@ def subset(self, mspass_object) -> pd.DataFrame: dfret = self.cache.query(dfquery) return dfret + def find_one(self, mspass_object) -> tuple: """ - Override of find_one method of DataframeMatcher. The override is - necessary to handle the ambiguity of a timer interval match for - source origin times. That is, there is a finite probability - that tow earthquakes can occur with the interval of this matcher - defined by the time projected from the waveform start time - (starttime - self.t0offset) + or - self.tolerance. When - multiple matches are found this method handles that ambiguity by - finding the source where the origin time is closest to the - waveform start time corrected by self.t0offset. - - Note this method normally expects input to be an atomic - seismic object. It also, however, accepts any object that - is a subclass of Metadata. The most important example of that - is `TimeSeriesEnsemble` and `SeismogramEnsemble` objects. - For that to work, however, you MUST define a key to use to + Override of find_one method of DataframeMatcher. The override is + necessary to handle the ambiguity of a timer interval match for + source origin times. That is, there is a finite probability + that tow earthquakes can occur with the interval of this matcher + defined by the time projected from the waveform start time + (starttime - self.t0offset) + or - self.tolerance. When + multiple matches are found this method handles that ambiguity by + finding the source where the origin time is closest to the + waveform start time corrected by self.t0offset. + + Note this method normally expects input to be an atomic + seismic object. It also, however, accepts any object that + is a subclass of Metadata. The most important example of that + is `TimeSeriesEnsemble` and `SeismogramEnsemble` objects. + For that to work, however, you MUST define a key to use to fetch a reference time in the constructor to this object - via the `data_time_key` argument. If you then load the + via the `data_time_key` argument. If you then load the appropriate reference time in the ensemble's Metadata container - you can normalize a common source gather's ensemble container + you can normalize a common source gather's ensemble container with a workflow. Here is a code fragment illustrating the idea: - + ``` source_matcher = OriginTimeMatcher(db,data_time_key="origin_time") e = db.read_data(cursor, ... read args...) # read ensemle e @@ -2744,21 +2744,21 @@ def find_one(self, mspass_object) -> tuple: e['origin_time'] = otime e = normalize(e,source_matcher) ``` - If the match suceeds the attributes defined in te Dataframe - cache will be loaded into the Metadata contaienr of e. - That is the defiition of a common source gather. - + If the match suceeds the attributes defined in te Dataframe + cache will be loaded into the Metadata contaienr of e. + That is the defiition of a common source gather. + :param mspass_object: atomic seismic data object to be matched. - The match is normally made against the datum's t0 value so - there is an implict assumption the datum is a UTC epoch time. - If a data set is passed through this operator and the data - are relative time all will fail. The function intentionaly - avoids that test for efficiency. A plain Metadata container - can be passed through mspass_object if and only if it - contains a value associated with the key defined by the - starttime_key attibute. - - :return: a tuple consistent with the BasicMatcher API definition. + The match is normally made against the datum's t0 value so + there is an implict assumption the datum is a UTC epoch time. + If a data set is passed through this operator and the data + are relative time all will fail. The function intentionaly + avoids that test for efficiency. A plain Metadata container + can be passed through mspass_object if and only if it + contains a value associated with the key defined by the + starttime_key attibute. + + :return: a tuple consistent with the BasicMatcher API definition. (i.e. pair [Metadata,ErrorLogger]) """ findreturn = self.find(mspass_object) @@ -2769,19 +2769,19 @@ def find_one(self, mspass_object) -> tuple: md2use = mdlist[0] elif len(mdlist) > 1: md2use = self._nearest_time_source(mspass_object, mdlist) - return [md2use,findreturn[1]] + return [md2use, findreturn[1]] - def _nearest_time_source(self,mspass_object,mdlist): + def _nearest_time_source(self, mspass_object, mdlist): """ - Private method to define the algorithm used to resolve - an ambiguity when multipe sources are returned by find. - This returns the Metadata container for the source - most whose offset origin time most closely matches te - content defined my mspass_object. + Private method to define the algorithm used to resolve + an ambiguity when multipe sources are returned by find. + This returns the Metadata container for the source + most whose offset origin time most closely matches te + content defined my mspass_object. """ if self.prepend_collection_name: - # the find method returns modified names if - # prepend_collection_names is True. Note the + # the find method returns modified names if + # prepend_collection_names is True. Note the # actual DAtaframe uses the names without thae prepend string # This is needed to handle that property of find time_key = self.collection + "_" + self.source_time_key @@ -2789,18 +2789,18 @@ def _nearest_time_source(self,mspass_object,mdlist): time_key = self.source_time_key N_matches = len(mdlist) # find component of list with the minimum projected time offset - dt=np.zeros(N_matches) - i=0 + dt = np.zeros(N_matches) + i = 0 for md in mdlist: dt[i] = md[time_key] i += 1 # always use t0 if possile. # this logic, however, allows mspass_object to be a # plain Metadata container or a python dictionary - # intentinally let this throw an exception for Metadata if the - # required key is missing. If t0 is not defined it tries to + # intentinally let this throw an exception for Metadata if the + # required key is missing. If t0 is not defined it tries to # use self.data_time_key (normaly "startttme") - if hasattr(mspass_object,"t0"): + if hasattr(mspass_object, "t0"): test_time = mspass_object.t0 else: test_time = mspass_object[self.data_time_key] @@ -2810,55 +2810,55 @@ def _nearest_time_source(self,mspass_object,mdlist): component_to_use = np.argmin(dt) return mdlist[component_to_use] - def find_doc(self,doc,starttime_key="starttime")->dict: - """ - Override of the find_doc method of BasicMatcher. This method - acts lke find_one but the inputs and outputs are different. - The input to this method is a python dictionary that is - expected to normally be a MongoDB document. The output is - also a python dictionary without (normally) a reduced set of - attributes defined by self.attributes_to_load and - self.load_if_defined. We need to override the base class - version of ths method because the base class version by - default requires an atomic seismic data object - (TimeSEries or Seismogram). The algorithm used is a variant of - that in the subset method of this class. - - This method also differs from find_one it that it has no - mechanism to log errors. find_one returns a Metadata container - and an ErrorLogger container used to post messages. This method - will return a None if there are errors that cause it to fail. - That can be ambiguous because a None return also is used to - indicate failure to match anything. The primary use of this - method is normalizing an entire data set with the ObjetIds of - source documnts with the `bulk_normaize` function. In that case - additional forensic work is possible with MongoDB to uncover why - a given document match failed. - - Because the interval match relative to a waveform start time can - be ambiguous from global events (Although rare earthquakes can - easily occur with + or - self.tolerance time) when multiple - rows of the dataframe match the interval test the one returned - is the one for which the time projected from the waveform - start time (uses self.t0offset value) is defined as the match + def find_doc(self, doc, starttime_key="starttime") -> dict: + """ + Override of the find_doc method of BasicMatcher. This method + acts lke find_one but the inputs and outputs are different. + The input to this method is a python dictionary that is + expected to normally be a MongoDB document. The output is + also a python dictionary without (normally) a reduced set of + attributes defined by self.attributes_to_load and + self.load_if_defined. We need to override the base class + version of ths method because the base class version by + default requires an atomic seismic data object + (TimeSEries or Seismogram). The algorithm used is a variant of + that in the subset method of this class. + + This method also differs from find_one it that it has no + mechanism to log errors. find_one returns a Metadata container + and an ErrorLogger container used to post messages. This method + will return a None if there are errors that cause it to fail. + That can be ambiguous because a None return also is used to + indicate failure to match anything. The primary use of this + method is normalizing an entire data set with the ObjetIds of + source documnts with the `bulk_normaize` function. In that case + additional forensic work is possible with MongoDB to uncover why + a given document match failed. + + Because the interval match relative to a waveform start time can + be ambiguous from global events (Although rare earthquakes can + easily occur with + or - self.tolerance time) when multiple + rows of the dataframe match the interval test the one returned + is the one for which the time projected from the waveform + start time (uses self.t0offset value) is defined as the match that is returned. - - :param doc: wf document (i.e. a document used to construct - an atomic datum) to be matched with content of this object - (assued the source collection or a variant that contains - source origin times). + + :param doc: wf document (i.e. a document used to construct + an atomic datum) to be matched with content of this object + (assued the source collection or a variant that contains + source origin times). :type doc: python dictionary - :param starttime_key: key that can be used fetch the waveform - segment start time that is to be used to match against + :param starttime_key: key that can be used fetch the waveform + segment start time that is to be used to match against origin times loaded in the object's cache. ' :type starttime_key: str (default "starttime") - :return: python dictionary of the best match or None if there is - no match or in nonfatal error conditions. + :return: python dictionary of the best match or None if there is + no match or in nonfatal error conditions. """ if starttime_key in doc: test_time = doc[starttime_key] test_time -= self.t0offset - # copied from subset method + # copied from subset method tmin = test_time - self.tolerance tmax = test_time + self.tolerance # For this matcher we dogmatically use <= equivalent in the between @@ -2871,23 +2871,23 @@ def find_doc(self,doc,starttime_key="starttime")->dict: ) ) subset_df = self.cache.query(dfquery) - N_matches=len(subset_df) + N_matches = len(subset_df) if N_matches <= 0: # no match return return None - elif N_matches>1: - # first find the row with source origin time most closely + elif N_matches > 1: + # first find the row with source origin time most closely # matching the doc starrtime value - dt=np.zeros(N_matches) - i=0 + dt = np.zeros(N_matches) + i = 0 for index, row in subset_df.iterrows(): # this key has to exist or we wouldn't get here - dt[i]= row[self.source_time_key] + dt[i] = row[self.source_time_key] dt -= test_time dt = np.abs(dt) - row_index_to_use = np.argmin(dt) + row_index_to_use = np.argmin(dt) else: - row_index_to_use=0 + row_index_to_use = 0 row = subset_df.iloc[row_index_to_use] doc_out = dict() notnulltest = row.notnull() @@ -2907,14 +2907,14 @@ def find_doc(self,doc,starttime_key="starttime")->dict: doc_out[mdkey] = row[key] else: # land here if a required attribute is missing - # from the dataframe cache. find logs - # an error but all we can do here is flag - # failure returning None. There is a rare - # possibilit of this failing with multiple - # source documents where one is bad and the other + # from the dataframe cache. find logs + # an error but all we can do here is flag + # failure returning None. There is a rare + # possibilit of this failing with multiple + # source documents where one is bad and the other # is not return None - + for k in self.load_if_defined: if notnulltest[k]: if k in self.aliases: @@ -3234,7 +3234,7 @@ def subset(self, mspass_object) -> pd.DataFrame: DataFramematcher """ - if isinstance(mspass_object,Metadata): + if isinstance(mspass_object, Metadata): if mspass_object.live: if _input_is_atomic(mspass_object): stime = mspass_object.t0 @@ -3309,7 +3309,7 @@ def normalize(mspass_object, matcher, kill_on_failure=True): :return: copy of mspass_object. dead data are returned immediately. if kill_on_failure is true the result may be killed on return. """ - if hasattr(mspass_object,"dead"): + if hasattr(mspass_object, "dead"): if mspass_object.dead(): return mspass_object find_output = matcher.find_one(mspass_object) diff --git a/python/tests/db/test_normalize.py b/python/tests/db/test_normalize.py index 7c2f6b54..1dd61859 100644 --- a/python/tests/db/test_normalize.py +++ b/python/tests/db/test_normalize.py @@ -667,11 +667,12 @@ def test_EqualityMatcher_normalize_df(self): class TestOriginTimeMatcher(TestNormalize): """ - tester class for OriginTimeMatcher and OriginTimeDBMatcher. - Note that the tests require a database to be running and and the - right data loaded by the setup methods for the TestNormalize - class it inherits. + tester class for OriginTimeMatcher and OriginTimeDBMatcher. + Note that the tests require a database to be running and and the + right data loaded by the setup methods for the TestNormalize + class it inherits. """ + def setup_method(self): super().setup_method() self.df = pd.DataFrame(list(self.db["source"].find())) @@ -682,27 +683,27 @@ def setup_method(self): def test_OriginTimeMatcher_find_one(self): """ - These tests of the OriginTimeMatcher and OriginTimeDBMatcher - center on matching a single datum either in the form of an - original wf_miniseed document or the TimeSeries object created - by read_data using that document. The database is loaded with the - setup methods for this module. Be aware these test are very very - heavily dependent on magic properties of that import db. - If the dump files of that database were lost it will be a serious - pain to reconstruct this set of tests. - - - Actually the key thing for these tests is a magic number of 522 - which is the origin time offset of the starttime of the one and only + These tests of the OriginTimeMatcher and OriginTimeDBMatcher + center on matching a single datum either in the form of an + original wf_miniseed document or the TimeSeries object created + by read_data using that document. The database is loaded with the + setup methods for this module. Be aware these test are very very + heavily dependent on magic properties of that import db. + If the dump files of that database were lost it will be a serious + pain to reconstruct this set of tests. + + + Actually the key thing for these tests is a magic number of 522 + which is the origin time offset of the starttime of the one and only one datum used in these tests. 522 is a rounding of 522.28499 - determined in testing outside this file. That minor difference - is appropriate and ok since this opoerator has a range test. - I (glp) have no idea where that number came from but suspect it is - a P wave arrival time and the data were cut relative to that time. - A key point is you should not expect the data in the waveform to + determined in testing outside this file. That minor difference + is appropriate and ok since this opoerator has a range test. + I (glp) have no idea where that number came from but suspect it is + a P wave arrival time and the data were cut relative to that time. + A key point is you should not expect the data in the waveform to have any relationship to reality. We test here only the starttime - relative to the contents of the source collection stored in the - test database loaded by the class setup method. + relative to the contents of the source collection stored in the + test database loaded by the class setup method. """ cached_matcher = OriginTimeMatcher( self.db, source_time_key="time", t0offset=522.0 @@ -750,15 +751,15 @@ def test_OriginTimeMatcher_find_one(self): ts["testtime"] = 9999.99 db_retdoc = db_matcher.find_one(ts) assert db_retdoc[0] is None - + def test_OriginTimeMatcher_find_doc(self): """ - Nearly identical code to "find_one" version immediately above but - for the find_doc method that is independently implemented. + Nearly identical code to "find_one" version immediately above but + for the find_doc method that is independently implemented. There is only a dataframe version of that method though. - - TODO: this test does not validate multiple match algorithm - returning minimum time offset as unique match. find_one test needs + + TODO: this test does not validate multiple match algorithm + returning minimum time offset as unique match. find_one test needs a similar test. """ cached_matcher = OriginTimeMatcher( @@ -768,19 +769,18 @@ def test_OriginTimeMatcher_find_doc(self): wfdoc = self.db.wf_miniseed.find_one( {"_id": ObjectId("627fc20559a116ff99f38243")} ) - test_time=wfdoc["starttime"] + test_time = wfdoc["starttime"] retdoc = cached_matcher.find_doc(wfdoc) # Failed find returns a none in component 0 so catch that assert retdoc - assert isinstance(retdoc,dict) - + assert isinstance(retdoc, dict) + # test failure with unmatched time - should silenetly return None wfdoc["starttime"] = 99999.99 retdoc = cached_matcher.find_doc(wfdoc) assert retdoc is None - def test_OriginTimeMatcher_normalize(self): # t0offset value needed to work with test data set. See above # if future pytest runs fail here