Skip to content

Commit

Permalink
Move all APDB iteration to apdb.store
Browse files Browse the repository at this point in the history
  • Loading branch information
isullivan committed Feb 13, 2025
1 parent 555378b commit 7f97c7e
Showing 1 changed file with 6 additions and 50 deletions.
56 changes: 6 additions & 50 deletions python/lsst/ap/association/diaPipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,56 +792,12 @@ def writeToApdb(self, updatedDiaObjects, associatedDiaSources, diaForcedSources)
diaObjectStore = dropEmptyColumns(self.schema, updatedDiaObjects, tableName="DiaObject")
diaSourceStore = dropEmptyColumns(self.schema, associatedDiaSources, tableName="DiaSource")
diaForcedSourceStore = dropEmptyColumns(self.schema, diaForcedSources, tableName="DiaForcedSource")

nObject = len(diaObjectStore)
nSource = len(diaSourceStore)
nForced = len(diaForcedSourceStore)
nRecords = max(nObject, nSource, nForced)
nWritten = 0
start = 0
if self.config.maximumTableLength > 0:
maximumTableLength = self.config.maximumTableLength
else:
maximumTableLength = nRecords
end = min(nRecords, maximumTableLength)
time = DateTime.now().toAstropy()
while nWritten < nRecords:
srcEnd = min(start + maximumTableLength, nSource)
if srcEnd <= start:
finalDiaSources = None
nSourceChunk = 0
else:
finalDiaSources = diaSourceStore.iloc[start:srcEnd].copy()
nSourceChunk = srcEnd - start

fSrcEnd = min(start + maximumTableLength, nForced)
if fSrcEnd <= start:
finalForcedSources = None
nForcedChunk = 0
else:
finalForcedSources = diaForcedSourceStore.iloc[start:fSrcEnd].copy()
nForcedChunk = fSrcEnd - start
objEnd = min(start + maximumTableLength, nObject)
if objEnd <= start:
finalDiaObjects = None
nObjectChunk = 0
else:
finalDiaObjects = diaObjectStore.iloc[start:end].copy()
nObjectChunk = fSrcEnd - start

self.log.info("Writing %i/%i diaObjects, %i/%i diaSources and %i/%i diaForcedSources to the APDB",
nObjectChunk, nObject,
nSourceChunk, nSource,
nForcedChunk, nForced,
)
self.apdb.store(
time,
finalDiaObjects,
finalDiaSources,
finalForcedSources)
nWritten += end - start
start = end + 1
end = min(nRecords, start + maximumTableLength)
self.apdb.store(
DateTime.now().toAstropy(),
diaObjectStore,
diaSourceStore,
diaForcedSourceStore,
maximum_table_length=self.config.maximumTableLength)
self.log.info("APDB updated.")

def testDataFrameIndex(self, df):
Expand Down

0 comments on commit 7f97c7e

Please sign in to comment.