Skip to content

Commit

Permalink
Merge pull request #10993 from todor-ivanov/bugfix_MSUnmerged_Missing…
Browse files Browse the repository at this point in the history
…RecordsNonFinalRSEs_fix-10992

Add rucioConMonErrors counter.
  • Loading branch information
amaltaro authored Feb 15, 2022
2 parents 2492e5a + 08e8436 commit 4e33708
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 66 deletions.
97 changes: 31 additions & 66 deletions src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,15 @@ def __init__(self, msConfig, logger=None):
Functor(self.filterUnmergedFiles),
Functor(self.getPfn),
Functor(self.cleanRSE),
Functor(self.updateRSECounters, pName),
Functor(self.updateServiceCounters),
Functor(self.updateRSETimestamps, start=False, end=True),
Functor(self.uploadRSEToMongoDB),
Functor(self.purgeRseObj)])
# Initialization of the deleted files counters:
self.rseCounters = {}
# Initialization service counters:
self.plineCounters = {}
self.rseTimestamps = {}

# Initialization service common data structures:
self.rseConsStats = {}
self.protectedLFNs = []

Expand Down Expand Up @@ -257,7 +258,7 @@ def _execute(self, rseList):
"""

pline = self.plineUnmerged
self.resetServiceCounters(plineName=pline.name)
self.resetServiceCounters()
self.plineCounters[pline.name]['totalNumRses'] = len(rseList)

for rseName in rseList:
Expand Down Expand Up @@ -471,27 +472,37 @@ def consRecordAge(self, rse):
msg = "RSE: %s Missing in stats records at Rucio Consistency Monitor. " % rseName
msg += "Skipping it in the current run."
self.logger.warning(msg)
rse['rucioConMonStatus'] = "Missing"
self.updateRSETimestamps(rse, start=False, end=True)
self.uploadRSEToMongoDB(rse)
raise MSUnmergedPlineExit(msg)

isConsDone = self.rseConsStats[rseName]['status'] == 'done'
isConsNewer = self.rseConsStats[rseName]['end_time'] > self.rseTimestamps[rseName]['prevStartTime']

if not isConsNewer:
msg = "RSE: %s With old consistency record in Rucio Consistency Monitor. " % rseName
if 'isClean' in rse and rse['isClean']:
msg += "And the RSE has been cleaned during the last Rucio Consistency Monitor polling cycle."
msg += "Skipping it in the current run."
self.logger.info(msg)
rse['rucioConMonStatus'] = self.rseConsStats[rseName]['status']
self.updateRSETimestamps(rse, start=False, end=True)
self.uploadRSEToMongoDB(rse)
raise MSUnmergedPlineExit(msg)
else:
msg += "But the RSE has NOT been fully cleaned during the last Rucio Consistency Monitor polling cycle."
msg += "Retrying cleanup in the current run."
self.logger.info(msg)
rse['rucioConMonStatus'] = self.rseConsStats[rseName]['status']

if not isConsDone:
msg = "RSE: %s In non-final state in Rucio Consistency Monitor. " % rseName
msg += "Skipping it in the current run."
self.logger.warning(msg)
rse['rucioConMonStatus'] = self.rseConsStats[rseName]['status']
self.updateRSETimestamps(rse, start=False, end=True)
self.uploadRSEToMongoDB(rse)
raise MSUnmergedPlineExit(msg)

if isConsNewer and isConsDone:
Expand Down Expand Up @@ -731,24 +742,15 @@ def updateRSETimestamps(self, rse, start=True, end=True):
rse['timestamps'] = self.rseTimestamps[rseName]
return rse

def updateRSECounters(self, rse, pName):
def updateServiceCounters(self, rse):
"""
Update/Upload all counters from the rse object into the MSUnmerged
service counters
:param rse: The RSE to work on
:param pName: The pipeline name whose counters to be updated
:return: rse
"""
rseName = rse['name']
self.resetServiceCounters(rseName=rseName)
self.rseCounters[rseName]['totalNumFiles'] = rse['counters']['totalNumFiles']
self.rseCounters[rseName]['totalNumDirs'] = rse['counters']['totalNumDirs']
self.rseCounters[rseName]['filesDeletedSuccess'] = rse['counters']['filesDeletedSuccess']
self.rseCounters[rseName]['filesDeletedFail'] = rse['counters']['filesDeletedFail']
self.rseCounters[rseName]['dirsDeletedSuccess'] = rse['counters']['dirsDeletedSuccess']
self.rseCounters[rseName]['dirsDeletedFail'] = rse['counters']['dirsDeletedFail']
self.rseCounters[rseName]['gfalErrors'] = rse['counters']['gfalErrors']

pName = self.plineUnmerged.name
self.plineCounters[pName]['totalNumFiles'] += rse['counters']['totalNumFiles']
self.plineCounters[pName]['totalNumDirs'] += rse['counters']['totalNumDirs']
self.plineCounters[pName]['filesDeletedSuccess'] += rse['counters']['filesDeletedSuccess']
Expand All @@ -761,61 +763,23 @@ def updateRSECounters(self, rse, pName):

return rse

def resetServiceCounters(self, rseName=None, plineName=None):
def resetServiceCounters(self):
"""
A simple function for zeroing the service counters.
:param rseName: RSE Name whose counters to be zeroed
:param plineName: The Pline Name whose counters to be zeroed
"""

# Resetting Just the RSE Counters
if rseName is not None:
if rseName not in self.rseCounters:
self.rseCounters[rseName] = {}
self.rseCounters[rseName]['totalNumFiles'] = 0
self.rseCounters[rseName]['totalNumDirs'] = 0
self.rseCounters[rseName]['filesDeletedSuccess'] = 0
self.rseCounters[rseName]['filesDeletedFail'] = 0
self.rseCounters[rseName]['dirsDeletedSuccess'] = 0
self.rseCounters[rseName]['dirsDeletedFail'] = 0
self.rseCounters[rseName]['gfalErrors'] = {}
return

# Resetting Just the pline counters
if plineName is not None:
if plineName not in self.plineCounters:
self.plineCounters[plineName] = {}
self.plineCounters[plineName]['totalNumFiles'] = 0
self.plineCounters[plineName]['totalNumDirs'] = 0
self.plineCounters[plineName]['filesDeletedSuccess'] = 0
self.plineCounters[plineName]['filesDeletedFail'] = 0
self.plineCounters[plineName]['dirsDeletedSuccess'] = 0
self.plineCounters[plineName]['dirsDeletedFail'] = 0
self.plineCounters[plineName]['totalNumRses'] = 0
self.plineCounters[plineName]['rsesProcessed'] = 0
self.plineCounters[plineName]['rsesCleaned'] = 0
return

# Resetting all counters
for rseName in self.rseCounters:
self.rseCounters[rseName]['totalNumFiles'] = 0
self.rseCounters[rseName]['totalNumDirs'] = 0
self.rseCounters[rseName]['filesDeletedSuccess'] = 0
self.rseCounters[rseName]['filesDeletedFail'] = 0
self.rseCounters[rseName]['dirsDeletedSuccess'] = 0
self.rseCounters[rseName]['dirsDeletedFail'] = 0
self.rseCounters[rseName]['dirsDeletedFail'] = 0

for plineName in self.plineCounters:
self.plineCounters[plineName]['totalNumFiles'] = 0
self.plineCounters[plineName]['totalNumDirs'] = 0
self.plineCounters[plineName]['filesDeletedSuccess'] = 0
self.plineCounters[plineName]['filesDeletedFail'] = 0
self.plineCounters[plineName]['dirsDeletedSuccess'] = 0
self.plineCounters[plineName]['dirsDeletedFail'] = 0
self.plineCounters[plineName]['totalNumRses'] = 0
self.plineCounters[plineName]['rsesProcessed'] = 0
self.plineCounters[plineName]['rsesCleaned'] = 0
# Resetting pline counters
plineName = self.plineUnmerged.name
self.plineCounters.setdefault(plineName, {})
self.plineCounters[plineName]['totalNumFiles'] = 0
self.plineCounters[plineName]['totalNumDirs'] = 0
self.plineCounters[plineName]['filesDeletedSuccess'] = 0
self.plineCounters[plineName]['filesDeletedFail'] = 0
self.plineCounters[plineName]['dirsDeletedSuccess'] = 0
self.plineCounters[plineName]['dirsDeletedFail'] = 0
self.plineCounters[plineName]['totalNumRses'] = 0
self.plineCounters[plineName]['rsesProcessed'] = 0
self.plineCounters[plineName]['rsesCleaned'] = 0
return

def getRSEFromMongoDB(self, rse):
Expand Down Expand Up @@ -859,6 +823,7 @@ def getStatsFromMongoDB(self, detail=False, **kwargs):
mongoProjection = {"_id": False,
"name": True,
"pfnPrefix": True,
"rucioConMonStatus": True,
"isClean": True,
"timestamps": True,
"counters": True}
Expand Down
2 changes: 2 additions & 0 deletions src/python/WMCore/MicroService/MSUnmerged/MSUnmergedRSE.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def defaultDoc(self):
"name": self.rseName,
"pfnPrefix": None,
"isClean": False,
"rucioConMonStatus": None,
"timestamps": {'rseConsStatTime': 0.0,
'prevStartTime': 0.0,
'startTime': 0.0,
Expand Down Expand Up @@ -79,6 +80,7 @@ def buildMongoProjection(self, fullRSEToDB=False):
"_id": False,
"name": True,
"pfnPrefix": True,
"rucioConMonStatus": True,
"isClean" : True,
"timestamps": True,
"counters": True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def setUp(self):
self.expectedRSE = {'name': 'T2_US_Wisconsin',
'pfnPrefix': None,
'isClean': False,
"rucioConMonStatus": None,
'timestamps': {'rseConsStatTime': 0.0,
'prevStartTime': 0.0,
'startTime': 0.0,
Expand Down

0 comments on commit 4e33708

Please sign in to comment.