Skip to content

Commit

Permalink
Merge pull request #472 from mspass-team/cleanup_database
Browse files Browse the repository at this point in the history
Major Revision of database and distributed
  • Loading branch information
wangyinz authored Jan 16, 2024
2 parents 5ec0bd0 + 565da0b commit c8021f6
Show file tree
Hide file tree
Showing 24 changed files with 9,450 additions and 8,313 deletions.
2 changes: 2 additions & 0 deletions cxx/include/mspass/utility/ErrorLogger.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ class ErrorLogger
int log_verbose(const std::string alg, const std::string mess);
std::list<LogData> get_error_log()const{return allmessages;};
int size()const{return allmessages.size();};
/*! Reset error log container to make it empty. */
void clear(){allmessages.clear();};
ErrorLogger& operator=(const ErrorLogger& parent);
/*! For this object + of += means add the log data from the rhs to
the lhs. lhs defines the job_id. */
Expand Down
22 changes: 19 additions & 3 deletions cxx/python/seismic/seismic_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ PYBIND11_MODULE(seismic, m) {
self.set_live();
else
self.kill();
},"Whether the data is valid or not")
},"True if the data is valid")
.def_property("tref",[](const BasicTimeSeries &self) {
return self.timetype();
},[](BasicTimeSeries &self, TimeReferenceType tref) {
Expand Down Expand Up @@ -692,7 +692,15 @@ PYBIND11_MODULE(seismic, m) {
.def(py::init<const Ensemble<Seismogram>&>())
.def(py::init<const LoggingEnsemble<Seismogram>&>())
.def("kill",&LoggingEnsemble<Seismogram>::kill,"Mark the entire ensemble dead")
.def("live",&LoggingEnsemble<Seismogram>::live,"Return true if the ensemble is marked live")
//.def("live",&LoggingEnsemble<Seismogram>::live,"Return true if the ensemble is marked live")
.def_property("live",[](const LoggingEnsemble<Seismogram> &self) {
return self.live();
},[](LoggingEnsemble<Seismogram> &self, bool b) {
if(b)
self.set_live();
else
self.kill();
},"True if the ensemble contains any valid data. False if empty or all invalid.")
.def("dead",&LoggingEnsemble<Seismogram>::dead,"Return true if the entire ensemble is marked dead")
.def("validate",&LoggingEnsemble<Seismogram>::validate,"Test to see if the ensemble has any live members - return true of it does")
.def("set_live",&LoggingEnsemble<Seismogram>::set_live,"Mark ensemble live but use a validate test first")
Expand Down Expand Up @@ -761,7 +769,15 @@ PYBIND11_MODULE(seismic, m) {
.def(py::init<const Ensemble<TimeSeries>&>())
.def(py::init<const LoggingEnsemble<TimeSeries>&>())
.def("kill",&LoggingEnsemble<TimeSeries>::kill,"Mark the entire ensemble dead")
.def("live",&LoggingEnsemble<TimeSeries>::live,"Return true if the ensemble is marked live")
//.def("live",&LoggingEnsemble<TimeSeries>::live,"Return true if the ensemble is marked live")
.def_property("live",[](const LoggingEnsemble<TimeSeries> &self) {
return self.live();
},[](LoggingEnsemble<TimeSeries> &self, bool b) {
if(b)
self.set_live();
else
self.kill();
},"True if the ensemble contains any valid data. False if empty or all invalid.")
.def("dead",&LoggingEnsemble<TimeSeries>::dead,"Return true if the entire ensemble is marked dead")
.def("validate",&LoggingEnsemble<TimeSeries>::validate,"Test to see if the ensemble has any live members - return true of it does")
.def("set_live",&LoggingEnsemble<TimeSeries>::set_live,"Mark ensemble live but use a validate test first")
Expand Down
11 changes: 6 additions & 5 deletions cxx/python/utility/utility_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ static PyGetSetDef MsPASSError_getsetters[] = {

PYBIND11_MODULE(utility, m) {
m.attr("__name__") = "mspasspy.ccore.utility";
m.doc() = "A submodule for utility namespace of ccore";
m.doc() = "A submodule for utility namespace of ccore";

py::class_<SphericalCoordinate>(m,"SphericalCoordinate","Enscapsulates concept of spherical coordinates")
.def(py::init<>())
Expand Down Expand Up @@ -433,7 +433,7 @@ PYBIND11_MODULE(utility, m) {
.def_readwrite("tag",&Metadata_typedef::tag,"Name key for this metadata")
.def_readwrite("mdt",&Metadata_typedef::mdt,"Type of any value associated with this key")
;

/* We need this definition to bind dmatrix to a numpy array as described
in this section of pybind11 documentation:\
https://pybind11.readthedocs.io/en/stable/advanced/pycpp/numpy.html
Expand Down Expand Up @@ -577,7 +577,7 @@ PYBIND11_MODULE(utility, m) {
return strout;
})
;

py::enum_<ErrorSeverity>(m,"ErrorSeverity")
.value("Fatal",ErrorSeverity::Fatal)
.value("Invalid",ErrorSeverity::Invalid)
Expand All @@ -586,7 +586,7 @@ PYBIND11_MODULE(utility, m) {
.value("Debug",ErrorSeverity::Debug)
.value("Informational",ErrorSeverity::Informational)
;

/* The following magic were based on the great example from:
https://www.pierov.org/2020/03/01/python-custom-exceptions-c-extensions/
This appears to be the cleanest and easiest way to implement a custom
Expand Down Expand Up @@ -743,6 +743,7 @@ PYBIND11_MODULE(utility, m) {
.def("size",&ErrorLogger::size,"Return number of entries in this log")
.def(py::self += py::self,"Operator +=")
.def("__len__",&ErrorLogger::size,"Return number of entries in this log")
.def("clear",&ErrorLogger::clear,"Reset log container to zero length destroying any current content")
.def("worst_errors",&ErrorLogger::worst_errors,"Return a list of only the worst errors")
.def("__getitem__", [](ErrorLogger &self, size_t i) {
return py::cast(self).attr("get_error_log")().attr("__getitem__")(i);
Expand Down Expand Up @@ -934,7 +935,7 @@ PYBIND11_MODULE(utility, m) {
}
))
;

/* this pair of functions are potentially useful for interactive queries of
ProcessingHistory data */
m.def("algorithm_history",&algorithm_history,
Expand Down
187 changes: 123 additions & 64 deletions cxx/src/lib/io/fileio.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <vector>
#include <stdio.h>
#include <string>
#include <sstream>
#include "mspass/utility/MsPASSError.h"
#include "mspass/seismic/keywords.h"
#include "mspass/seismic/TimeSeries.h"
Expand All @@ -15,13 +16,14 @@ using namespace mspass::seismic;
using mspass::utility::MsPASSError;
using mspass::utility::ErrorSeverity;
using namespace std;
/* This is a file scope function to allow the overlaoded fwrite_to_file

/*This is a file scope function to allow the overlaoded fwrite_to_file
functions to share this common code to write a contiguous buffer of data.
to a file.
Note I considered adding an old school call to flock to assure this
Note I considered adding an old school call to lockf to assure this
function would be thread safe. However, the man page for the posix
function flockfile says that is no longer necessary and stdio is
function lockffile says that is no longer necessary and stdio is
now tread safe - fopen creates an intrinsic lock that is not released
until fclose is called. That is important for mspass as multiple threads
writing to the same file can be expected to be common. */
Expand Down Expand Up @@ -49,14 +51,19 @@ long int fwrite_sample_data(const string dir, const string dfile, double *dptr,
if( fwrite((void*)dptr,sizeof(double),nd,fp) != nd)
{
fclose(fp);
throw MsPASSError("fwrite_to_file: fwrite error to file "+fname,ErrorSeverity::Invalid);
throw MsPASSError("fwrite_to_file: fwrite error while writing to file "+fname,ErrorSeverity::Invalid);
}
fclose(fp);
return foff;
}catch(...){throw;};
}
/*! Write sample data for a TimeSeries to a file with fwrite. Always
appends and returns foff of the position where fwrite wrote these data.
Returns -1 if it receives a datum marked dead.
*/
long int fwrite_to_file(TimeSeries& d,const string dir,const string dfile)
{
if(d.dead()) return -1;
/* Using this function avoids repetitious code with Seismogram version. */
long int foff;
try{
Expand All @@ -70,8 +77,16 @@ long int fwrite_to_file(TimeSeries& d,const string dir,const string dfile)
d.put_long(SEISMICMD_foff,foff);
return(foff);
}
/*! Write sample data for a Seismogram to a file with fwrite. Always
appends and returns foff of the position where fwrite wrote these data.
Note the data are a raw dump of the contiguous block storing the 3*npts
sample matrix.
Returns -1 if it receives a datum marked dead.
*/
long int fwrite_to_file(Seismogram& d, const string dir,const string dfile)
{
if(d.dead()) return(-1);
/* Using this function avoids repetitious code with TimeSeries version.
Note use of 3*npts as the buffer size*/
long int foff;
Expand All @@ -84,7 +99,26 @@ long int fwrite_to_file(Seismogram& d, const string dir,const string dfile)

return(foff);
}
std::vector<long int> fwrite_to_file(mspass::seismic::LoggingEnsemble<mspass::seismic::TimeSeries>& d, const std::string dir,const std::string dfile)
/*! Write sample data for an Ensemble of TimeSeries to a single file.
Writing ensemble data with this function is more efficient than writing atomic
data one at time. The reason is this function writes all the sample data for
the ensemble to a single and only opens and closes the file specfied once.
It returns a vector of foff values. Dead members have no sample data written
and will generate a -1 entry in the foff vector returned. Caller should handle
that condition in some way.
If entire ensemble is marked dead it will return an empty vector container.
\param d input ensemble to save sample data
\param dir directory name (if entry use current directory)
\param dfile file name for write
*/
std::vector<long int> fwrite_to_file(
mspass::seismic::LoggingEnsemble<mspass::seismic::TimeSeries>& d, \
const std::string dir,
const std::string dfile)
{
try{
FILE *fp;
Expand All @@ -102,34 +136,55 @@ std::vector<long int> fwrite_to_file(mspass::seismic::LoggingEnsemble<mspass::se
fname=dfile;
if((fp=fopen(fname.c_str(),"a")) == NULL)
/* use the name of the overloaded parent instead of the actual function - intentional*/
throw MsPASSError("fwrite_to_file: Open failed on file "+fname,ErrorSeverity::Invalid);
throw MsPASSError("fwrite_to_file (TimeSeriesEnsemble): Open failed on file "+fname,ErrorSeverity::Invalid);
/* This guarantees appending - not essential since we open in "a" mode but
clearer */
fseek(fp,0L,2);

for (int i = 0; i < d.member.size(); ++i) {
/* Silenetly skip dead data */
if(d.member[i].dead()) {
foffs.push_back(0);
// place holder
continue;
}
long int foff = ftell(fp);
foffs.push_back(foff);
TimeSeries& t = d.member[i];
if (fwrite((void *)t.s.data(), sizeof(double), t.npts(), fp) != t.npts())
if(d.member[i].dead())
foffs.push_back(-1);
else
{
fclose(fp);
throw MsPASSError("fwrite_to_file: fwrite error to file " + fname, ErrorSeverity::Invalid);
long int foff = ftell(fp);
foffs.push_back(foff);
TimeSeries& t = d.member[i];
if (fwrite((void *)t.s.data(), sizeof(double), t.npts(), fp) != t.npts())
{
fclose(fp);
stringstream ss;
ss << "fwrite_to_file (TimeSeriesEnsemble): fwrite error while writing ensemble member "
<< i << " to file="<<fname<<endl;
throw MsPASSError(ss.str(), ErrorSeverity::Invalid);
}
/* We always set these 3 attributes in Metadata so they can be properly
saved to the database after a successful write. Repetitious with Seismogram
but a function to do this would be more confusing that helpful */
t.put_string(SEISMICMD_dir, dir);
t.put_string(SEISMICMD_dfile, dfile);
t.put_long(SEISMICMD_foff, foff);
}
/* We always set these 3 attributes in Metadata so they can be properly
saved to the database after a successful write. Repetitious with Seismogram
but a function to do this would be more confusing that helpful */
t.put_string(SEISMICMD_dir, dir);
t.put_string(SEISMICMD_dfile, dfile);
t.put_long(SEISMICMD_foff, foff);
}
}
fclose(fp);

return foffs;
}catch(...){throw;};
}
/*! Write sample data for an Ensemble of Seismogram objects to a single file.
Writing ensemble data with this function is more efficient than writing atomic
data one at time. The reason is this function writes all the sample data for
the ensemble to a single and only opens and closes the file specfied once.
It returns a vector of foff values. Dead members have no sample data written
and will generate a -1 entry in the foff vector returned. Caller should handle
that condition in some way.
If entire ensemble is marked dead it will return an empty vector container.
\param d input ensemble to save sample data
\param dir directory name (if entry use current directory)
\param dfile file name for write
*/
std::vector<long int> fwrite_to_file(mspass::seismic::LoggingEnsemble<mspass::seismic::Seismogram>& d, const std::string dir,const std::string dfile)
{
try{
Expand All @@ -146,27 +201,37 @@ std::vector<long int> fwrite_to_file(mspass::seismic::LoggingEnsemble<mspass::se
fname=dfile;
if((fp=fopen(fname.c_str(),"a")) == NULL)
/* use the name of the overloaded parent instead of the actual function - intentional*/
throw MsPASSError("fwrite_to_file: Open failed on file "+fname,ErrorSeverity::Invalid);
throw MsPASSError("fwrite_to_file (SeismogramEnsemble): Open failed on file "+fname,ErrorSeverity::Invalid);
/* This guarantees appending - not essential since we open in "a" mode but
clearer */
fseek(fp,0L,2);

for (int i = 0; i < d.member.size(); ++i) {
/* Silently skip dead data */
if(d.member[i].dead()) continue;
long int foff = ftell(fp);
foffs.push_back(foff);
Seismogram& t = d.member[i];
if (fwrite((void *)t.u.get_address(0,0), sizeof(double), 3*t.npts(), fp) != 3*t.npts())
if(d.member[i].dead())
foffs.push_back(-1);
else
{
fclose(fp);
throw MsPASSError("fwrite_to_file: fwrite error to file " + fname, ErrorSeverity::Invalid);
if(d.member[i].dead()) continue;
long int foff = ftell(fp);
foffs.push_back(foff);
Seismogram& t = d.member[i];
if (fwrite((void *)t.u.get_address(0,0), sizeof(double), 3*t.npts(), fp) != 3*t.npts())
{
fclose(fp);
stringstream ss;
ss << "fwrite_to_file (SeismogramEnsemble): fwrite error while writing ensemble member "
<< i << " to file="<<fname<<endl;
throw MsPASSError(ss.str(), ErrorSeverity::Invalid);
}
/* We always set these 3 attributes in Metadata so they can be properly
saved to the database after a successful write. Repetitious with Seismogram
but a function to do this would be more confusing that helpful */
t.put_string(SEISMICMD_dir, dir);
t.put_string(SEISMICMD_dfile, dfile);
t.put_long(SEISMICMD_foff, foff);
}
/* We always set these 3 attributes in Metadata so they can be properly
saved to the database after a successful write. Repetitious with Seismogram
but a function to do this would be more confusing that helpful */
t.put_string(SEISMICMD_dir, dir);
t.put_string(SEISMICMD_dfile, dfile);
t.put_long(SEISMICMD_foff, foff);
}
}
fclose(fp);

return foffs;
}catch(...){throw;};
}
Expand Down Expand Up @@ -237,7 +302,7 @@ size_t fread_from_file(mspass::seismic::LoggingEnsemble<mspass::seismic::Seismog
de.elog.log_error(ss.str());
return -1;
}

for (int ind = 0; ind < n; ++ind) {
size_t ns_read;
int i = indexes[ind];
Expand All @@ -255,17 +320,14 @@ size_t fread_from_file(mspass::seismic::LoggingEnsemble<mspass::seismic::Seismog
continue;
}
try{
if(foff>0)
if(fseek(fp,foff,SEEK_SET))
{
if(fseek(fp,foff,SEEK_SET))
{
fclose(fp);
de.member[i].kill();
stringstream ss;
ss << "can not fseek in " << foff << endl;
de.member[i].elog.log_error(ss.str());
continue;
}
fclose(fp);
de.member[i].kill();
stringstream ss;
ss << "can not fseek in " << foff << endl;
de.member[i].elog.log_error(ss.str());
continue;
}
ns_read = fread((void*)de.member[i].u.get_address(0, 0), sizeof(double), 3 * de.member[i].npts(), fp);
if (ns_read != 3 * de.member[i].npts())
Expand Down Expand Up @@ -323,17 +385,14 @@ size_t fread_from_file(mspass::seismic::LoggingEnsemble<mspass::seismic::TimeSer
continue;
}
try{
if(foff>0)
if(fseek(fp,foff,SEEK_SET))
{
if(fseek(fp,foff,SEEK_SET))
{
fclose(fp);
de.member[i].kill();
stringstream ss;
ss << "can not fseek in " << foff << endl;
de.member[i].elog.log_error(ss.str());
continue;
}
fclose(fp);
de.member[i].kill();
stringstream ss;
ss << "can not fseek in " << foff << endl;
de.member[i].elog.log_error(ss.str());
continue;
}
ns_read = fread((void*)(&(de.member[i].s[0])), sizeof(double), de.member[i].npts(), fp);
if (ns_read != de.member[i].npts())
Expand Down
Loading

0 comments on commit c8021f6

Please sign in to comment.