Skip to content

Commit

Permalink
Handle PR#12575 comments. Lock _supported_extensions access, return i…
Browse files Browse the repository at this point in the history
…terator from create_archive
  • Loading branch information
OhadMeir committed Jan 18, 2024
1 parent 9c9ab4c commit a4deba1
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 20 deletions.
33 changes: 17 additions & 16 deletions src/source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <src/core/enum-helpers.h>

#include <rsutils/string/from.h>

#include <src/core/stream-profile-interface.h>

namespace librealsense
{
Expand Down Expand Up @@ -54,6 +54,8 @@ namespace librealsense

void frame_source::init(std::shared_ptr<metadata_parser_map> metadata_parsers)
{
std::lock_guard< std::recursive_mutex > lock( _mutex );

_supported_extensions = { RS2_EXTENSION_VIDEO_FRAME,
RS2_EXTENSION_COMPOSITE_FRAME,
RS2_EXTENSION_POINTS,
Expand All @@ -65,35 +67,36 @@ namespace librealsense
_metadata_parsers = metadata_parsers;
}

void frame_source::create_archive( archive_id id )
std::map< frame_source::archive_id, std::shared_ptr< archive_interface > >::iterator
frame_source::create_archive( archive_id id )
{
std::lock_guard< std::recursive_mutex > lock( _mutex );

auto it = std::find( _supported_extensions.begin(), _supported_extensions.end(), id.second );
if( it == _supported_extensions.end() )
throw wrong_api_call_sequence_exception( "Requested frame type is not supported!" );

std::lock_guard< std::recursive_mutex > lock( _mutex );

_archive[id] = make_archive( id.second, &_max_publish_list_size, _metadata_parsers );
if( ! _archive[id] )
auto ret = _archive.insert( { id, make_archive( id.second, &_max_publish_list_size, _metadata_parsers ) } );
if( ! ret.second || ! ret.first->second ) // Check insertion success and allocation success
throw std::runtime_error( rsutils::string::from()
<< "Failed to create archive of type " << get_string( id.second ) );

_archive[id]->set_sensor( _sensor );
ret.first->second->set_sensor( _sensor );

return ret.first;
}

callback_invocation_holder frame_source::begin_callback( archive_id id )
{
// We use a special index for extensions, like GPU accelerated frames. See add_extension.
if( id.second >= RS2_EXTENSION_COUNT )
id.first = RS2_STREAM_COUNT; // For added extensions like GPU accelerated frames
id.first = RS2_STREAM_COUNT;

std::lock_guard< std::recursive_mutex > lock( _mutex );

auto it = _archive.find( id );
if( it == _archive.end() )
{
create_archive( id );
it = _archive.find( id ); // Now will successfully find
}
it = create_archive( id );

return it->second->begin_callback();
}
Expand All @@ -112,17 +115,15 @@ namespace librealsense
frame_additional_data && additional_data,
bool requires_memory )
{
// We use a special index for extensions, like GPU accelerated frames. See add_extension.
if( id.second >= RS2_EXTENSION_COUNT )
id.first = RS2_STREAM_COUNT; // For added extensions like GPU accelerated frames

std::lock_guard< std::recursive_mutex > lock( _mutex );

auto it = _archive.find( id );
if( it == _archive.end() )
{
create_archive( id );
it = _archive.find( id ); // Now will successfully find
}
it = create_archive( id );

return it->second->alloc_and_track( size, std::move( additional_data ), requires_memory );
}
Expand Down
14 changes: 10 additions & 4 deletions src/source.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ namespace librealsense
class LRS_EXTENSION_API frame_source
{
public:
//using stream_uid = std::pair< rs2_stream, int >; // Stream type and index
using archive_id = std::pair< rs2_stream, rs2_extension >;

frame_source( uint32_t max_publish_list_size = 16 );
Expand Down Expand Up @@ -49,10 +48,17 @@ namespace librealsense
template<class T>
void add_extension( rs2_extension ex )
{
archive_id special_index = { RS2_STREAM_COUNT, ex };

std::lock_guard< std::recursive_mutex > lock( _mutex );

auto it = std::find( _supported_extensions.begin(), _supported_extensions.end(), ex );
if( it == _supported_extensions.end() )
{
_supported_extensions.push_back( ex );
}

// We use a special index for extensions since we don't know the stream type here.
// We can't wait with the allocation because we need the type T in the creation.
archive_id special_index = { RS2_STREAM_COUNT, ex };
_archive[special_index] = std::make_shared< frame_archive< T > >( &_max_publish_list_size, _metadata_parsers );
}

Expand All @@ -63,7 +69,7 @@ namespace librealsense
private:
friend class syncer_process_unit;

void create_archive( archive_id id );
std::map< archive_id, std::shared_ptr< archive_interface > >::iterator create_archive( archive_id id );

mutable std::recursive_mutex _mutex;

Expand Down

0 comments on commit a4deba1

Please sign in to comment.