Skip to content

Commit

Permalink
Merge pull request #212 from C0nsultant/topic/improve_ioqueue_utiliza…
Browse files Browse the repository at this point in the history
…tion

Minimize number of flushes in backend
  • Loading branch information
ax3l authored Jun 7, 2018
2 parents 9ec149f + 728d51f commit aca17f8
Show file tree
Hide file tree
Showing 16 changed files with 236 additions and 191 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ Bug Fixes
- SerialIOTest: ``loadChunk`` template missing for ADIOS1 #227
- prepare running serial applications linked against parallel ADIOS1 library #228

Other
"""""

- minimize number of flushes in backend #212


0.1.0-alpha
-----------
Expand Down
3 changes: 1 addition & 2 deletions include/openPMD/RecordComponent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ RecordComponent::loadChunk(Offset const& o, Extent const& e, std::shared_ptr< T
dRead.extent = e;
dRead.dtype = getDatatype();
dRead.data = std::static_pointer_cast< void >(data);
IOHandler->enqueue(IOTask(this, dRead));
IOHandler->flush();
m_chunks->push(IOTask(this, dRead));
}
}

Expand Down
1 change: 0 additions & 1 deletion include/openPMD/backend/Container.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ class Container : public Attributable
Parameter< Operation::CREATE_PATH > pCreate;
pCreate.path = path;
IOHandler->enqueue(IOTask(this, pCreate));
IOHandler->flush();
}

flushAttributes();
Expand Down
3 changes: 1 addition & 2 deletions include/openPMD/backend/PatchRecordComponent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ PatchRecordComponent::load(uint64_t idx, std::shared_ptr< T > data)
dRead.extent = {1};
dRead.dtype = getDatatype();
dRead.data = std::static_pointer_cast< void >(data);
IOHandler->enqueue(IOTask(this, dRead));
IOHandler->flush();
m_chunks->push(IOTask(this, dRead));
}

template< typename T >
Expand Down
62 changes: 31 additions & 31 deletions src/Iteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,37 +92,31 @@ Iteration::flushFileBased(uint64_t i)
Parameter< Operation::CREATE_FILE > fCreate;
fCreate.name = auxiliary::replace_first(s->iterationFormat(), "%T", std::to_string(i));
IOHandler->enqueue(IOTask(s, fCreate));
IOHandler->flush();

/* create basePath */
Parameter< Operation::CREATE_PATH > pCreate;
pCreate.path = auxiliary::replace_first(s->basePath(), "%T/", "");
IOHandler->enqueue(IOTask(&s->iterations, pCreate));
IOHandler->flush();

/* create iteration path */
pCreate.path = std::to_string(i);
IOHandler->enqueue(IOTask(this, pCreate));
IOHandler->flush();
} else
{
/* open file */
Series* s = dynamic_cast<Series *>(parent->attributable->parent->attributable);
Parameter< Operation::OPEN_FILE > fOpen;
fOpen.name = auxiliary::replace_last(s->iterationFormat(), "%T", std::to_string(i));
IOHandler->enqueue(IOTask(s, fOpen));
IOHandler->flush();

/* open basePath */
Parameter< Operation::OPEN_PATH > pOpen;
pOpen.path = auxiliary::replace_first(s->basePath(), "%T/", "");
IOHandler->enqueue(IOTask(&s->iterations, pOpen));
IOHandler->flush();

/* open iteration path */
pOpen.path = std::to_string(i);
IOHandler->enqueue(IOTask(this, pOpen));
IOHandler->flush();
}

flush();
Expand All @@ -137,7 +131,6 @@ Iteration::flushGroupBased(uint64_t i)
Parameter< Operation::CREATE_PATH > pCreate;
pCreate.path = std::to_string(i);
IOHandler->enqueue(IOTask(this, pCreate));
IOHandler->flush();
}

flush();
Expand All @@ -146,34 +139,43 @@ Iteration::flushGroupBased(uint64_t i)
void
Iteration::flush()
{
/* Find the root point [Series] of this file,
* meshesPath and particlesPath are stored there */
Writable *w = this->parent;
while( w->parent )
w = w->parent;
Series* s = dynamic_cast<Series *>(w->attributable);

if( !meshes.empty() )
if( IOHandler->accessType == AccessType::READ_ONLY )
{
if( !s->containsAttribute("meshesPath") )
s->setMeshesPath("meshes/");
s->flushMeshesPath();
meshes.flush(s->meshesPath());
for( auto& m : meshes )
m.second.flush(m.first);
}

if( !particles.empty() )
{
if( !s->containsAttribute("particlesPath") )
s->setParticlesPath("particles/");
s->flushParticlesPath();
particles.flush(s->particlesPath());
for( auto& species : particles )
species.second.flush(species.first);
}
} else
{
/* Find the root point [Series] of this file,
* meshesPath and particlesPath are stored there */
Writable *w = this->parent;
while( w->parent )
w = w->parent;
Series* s = dynamic_cast<Series *>(w->attributable);

if( !meshes.empty() )
{
if( !s->containsAttribute("meshesPath") )
s->setMeshesPath("meshes/");
s->flushMeshesPath();
meshes.flush(s->meshesPath());
for( auto& m : meshes )
m.second.flush(m.first);
}

flushAttributes();
if( !particles.empty() )
{
if( !s->containsAttribute("particlesPath") )
s->setParticlesPath("particles/");
s->flushParticlesPath();
particles.flush(s->particlesPath());
for( auto& species : particles )
species.second.flush(species.first);
}

flushAttributes();
}
}

void
Expand Down Expand Up @@ -252,7 +254,6 @@ Iteration::read()
Parameter< Operation::OPEN_PATH > pOpen;
pOpen.path = s->meshesPath();
IOHandler->enqueue(IOTask(&meshes, pOpen));
IOHandler->flush();

meshes.readAttributes();

Expand Down Expand Up @@ -313,7 +314,6 @@ Iteration::read()
Parameter< Operation::OPEN_PATH > pOpen;
pOpen.path = s->particlesPath();
IOHandler->enqueue(IOTask(&particles, pOpen));
IOHandler->flush();

particles.readAttributes();

Expand Down
49 changes: 27 additions & 22 deletions src/Mesh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,32 +182,38 @@ Mesh::setTimeOffset(T to)
void
Mesh::flush(std::string const& name)
{
if( !written )
if( IOHandler->accessType == AccessType::READ_ONLY )
{
if( *m_containsScalar )
{
MeshRecordComponent& r = at(RecordComponent::SCALAR);
r.m_writable->parent = parent;
r.parent = parent;
r.flush(name);
m_writable->abstractFilePosition = r.m_writable->abstractFilePosition;
abstractFilePosition = r.abstractFilePosition;
written = true;
} else
for( auto& comp : *this )
comp.second.flush(comp.first);
} else
{
if( !written )
{
Parameter< Operation::CREATE_PATH > pCreate;
pCreate.path = name;
IOHandler->enqueue(IOTask(this, pCreate));
IOHandler->flush();
for( auto& comp : *this )
comp.second.parent = this->m_writable.get();
if( *m_containsScalar )
{
MeshRecordComponent& r = at(RecordComponent::SCALAR);
r.m_writable->parent = parent;
r.parent = parent;
r.flush(name);
m_writable->abstractFilePosition = r.m_writable->abstractFilePosition;
abstractFilePosition = r.abstractFilePosition;
written = true;
} else
{
Parameter< Operation::CREATE_PATH > pCreate;
pCreate.path = name;
IOHandler->enqueue(IOTask(this, pCreate));
for( auto& comp : *this )
comp.second.parent = this->m_writable.get();
}
}
}

for( auto& comp : *this )
comp.second.flush(comp.first);
for( auto& comp : *this )
comp.second.flush(comp.first);

flushAttributes();
flushAttributes();
}
}

void
Expand Down Expand Up @@ -315,7 +321,6 @@ Mesh::read()
MeshRecordComponent& rc = (*this)[component];
pOpen.path = component;
IOHandler->enqueue(IOTask(&rc, pOpen));
IOHandler->flush();
*rc.m_isConstant = true;
rc.read();
}
Expand Down
1 change: 0 additions & 1 deletion src/ParticlePatches.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ ParticlePatches::read()
PatchRecord& pr = (*this)[record_name];
pOpen.path = record_name;
IOHandler->enqueue(IOTask(&pr, pOpen));
IOHandler->flush();
pr.read();
}

Expand Down
28 changes: 18 additions & 10 deletions src/ParticleSpecies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ ParticleSpecies::read()
{
pOpen.path = "particlePatches";
IOHandler->enqueue(IOTask(&particlePatches, pOpen));
IOHandler->flush();
particlePatches.read();
} else
{
Expand Down Expand Up @@ -107,18 +106,27 @@ ParticleSpecies::read()
void
ParticleSpecies::flush(std::string const& path)
{
Container< Record >::flush(path);

for( auto& record : *this )
record.second.flush(record.first);

if( particlePatches.find("numParticles") != particlePatches.end()
&& particlePatches.find("numParticlesOffset") != particlePatches.end()
&& particlePatches.size() >= 3 )
if( IOHandler->accessType == AccessType::READ_ONLY )
{
particlePatches.flush("particlePatches");
for( auto& record : *this )
record.second.flush(record.first);
for( auto& patch : particlePatches )
patch.second.flush(patch.first);
} else
{
Container< Record >::flush(path);

for( auto& record : *this )
record.second.flush(record.first);

if( particlePatches.find("numParticles") != particlePatches.end()
&& particlePatches.find("numParticlesOffset") != particlePatches.end()
&& particlePatches.size() >= 3 )
{
particlePatches.flush("particlePatches");
for( auto& patch : particlePatches )
patch.second.flush(patch.first);
}
}
}
} // openPMD
49 changes: 27 additions & 22 deletions src/Record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,38 @@ Record::setUnitDimension(std::map< UnitDimension, double > const& udim)
void
Record::flush(std::string const& name)
{
if( !written )
if( IOHandler->accessType == AccessType::READ_ONLY )
{
if( *m_containsScalar )
{
RecordComponent& r = at(RecordComponent::SCALAR);
r.m_writable->parent = parent;
r.parent = parent;
r.flush(name);
m_writable->abstractFilePosition = r.m_writable->abstractFilePosition;
abstractFilePosition = r.abstractFilePosition;
written = true;
} else
for( auto& comp : *this )
comp.second.flush(comp.first);
} else
{
if( !written )
{
Parameter< Operation::CREATE_PATH > pCreate;
pCreate.path = name;
IOHandler->enqueue(IOTask(this, pCreate));
IOHandler->flush();
for( auto& comp : *this )
comp.second.parent = getWritable(this);
if( *m_containsScalar )
{
RecordComponent& r = at(RecordComponent::SCALAR);
r.m_writable->parent = parent;
r.parent = parent;
r.flush(name);
m_writable->abstractFilePosition = r.m_writable->abstractFilePosition;
abstractFilePosition = r.abstractFilePosition;
written = true;
} else
{
Parameter< Operation::CREATE_PATH > pCreate;
pCreate.path = name;
IOHandler->enqueue(IOTask(this, pCreate));
for( auto& comp : *this )
comp.second.parent = getWritable(this);
}
}
}

for( auto& comp : *this )
comp.second.flush(comp.first);
for( auto& comp : *this )
comp.second.flush(comp.first);

flushAttributes();
flushAttributes();
}
}

void
Expand All @@ -100,7 +106,6 @@ Record::read()
RecordComponent& rc = (*this)[component];
pOpen.path = component;
IOHandler->enqueue(IOTask(&rc, pOpen));
IOHandler->flush();
*rc.m_isConstant = true;
rc.read();
}
Expand Down
Loading

0 comments on commit aca17f8

Please sign in to comment.