iceoryx_doc  1.0.1
Public Types | Public Member Functions | Protected Member Functions | List of all members
iox::popo::ChunkDistributor< ChunkDistributorDataType > Class Template Reference

The ChunkDistributor is the low layer building block to send SharedChunks to a dynamic number of ChunkQueus. Together with the ChunkQueuePusher, the ChunkDistributor builds the infrastructure to exchange memory chunks between different data producers and consumers that could be located in different processes. Besides a modifiable container of ChunkQueues to which a SharedChunk can be deliverd, it holds a configurable history of last sent chunks. This allows to provide a newly added queue a number of last chunks to start from. This is needed for functionality known as latched topic in ROS or field in ara::com. A ChunkDistributor is used to build elements of higher abstraction layers that also do memory managemet and provide an API towards the real user. More...

#include <chunk_distributor.hpp>

Public Types

using MemberType_t = ChunkDistributorDataType
 
using ChunkQueueData_t = typename ChunkDistributorDataType::ChunkQueueData_t
 
using ChunkQueuePusher_t = typename ChunkDistributorDataType::ChunkQueuePusher_t
 

Public Member Functions

 ChunkDistributor (cxx::not_null< MemberType_t *const > chunkDistrubutorDataPtr) noexcept
 
 ChunkDistributor (const ChunkDistributor &other)=delete
 
ChunkDistributoroperator= (const ChunkDistributor &)=delete
 
 ChunkDistributor (ChunkDistributor &&rhs)=default
 
ChunkDistributoroperator= (ChunkDistributor &&rhs)=default
 
cxx::expected< ChunkDistributorError > tryAddQueue (cxx::not_null< ChunkQueueData_t *const > queueToAdd, const uint64_t requestedHistory=0u) noexcept
 Add a queue to the internal list of chunk queues to which chunks are delivered when calling deliverToAllStoredQueues. More...
 
cxx::expected< ChunkDistributorError > tryRemoveQueue (cxx::not_null< ChunkQueueData_t *const > queueToRemove) noexcept
 Remove a queue from the internal list of chunk queues. More...
 
void removeAllQueues () noexcept
 Delete all the stored chunk queues.
 
bool hasStoredQueues () const noexcept
 Get the information whether there are any stored chunk queues. More...
 
void deliverToAllStoredQueues (mepoo::SharedChunk chunk) noexcept
 Deliver the provided shared chunk to all the stored chunk queues. The chunk will be added to the chunk history. More...
 
bool deliverToQueue (cxx::not_null< ChunkQueueData_t *const > queue, mepoo::SharedChunk chunk) noexcept
 Deliver the provided shared chunk to the provided chunk queue. The chunk will NOT be added to the chunk history. More...
 
void addToHistoryWithoutDelivery (mepoo::SharedChunk chunk) noexcept
 Update the chunk history but do not deliver the chunk to any chunk queue. E.g. use case is to to update a non offered field in ara. More...
 
uint64_t getHistorySize () noexcept
 Get the current size of the chunk history. More...
 
uint64_t getHistoryCapacity () const noexcept
 Get the capacity of the chunk history. More...
 
void clearHistory () noexcept
 Clears the chunk history.
 
void cleanup () noexcept
 cleanup the used shrared memory chunks More...
 

Protected Member Functions

const MemberType_t * getMembers () const noexcept
 
MemberType_t * getMembers () noexcept
 

Detailed Description

template<typename ChunkDistributorDataType>
class iox::popo::ChunkDistributor< ChunkDistributorDataType >

The ChunkDistributor is the low layer building block to send SharedChunks to a dynamic number of ChunkQueus. Together with the ChunkQueuePusher, the ChunkDistributor builds the infrastructure to exchange memory chunks between different data producers and consumers that could be located in different processes. Besides a modifiable container of ChunkQueues to which a SharedChunk can be deliverd, it holds a configurable history of last sent chunks. This allows to provide a newly added queue a number of last chunks to start from. This is needed for functionality known as latched topic in ROS or field in ara::com. A ChunkDistributor is used to build elements of higher abstraction layers that also do memory managemet and provide an API towards the real user.

About Concurrency: This ChunkDistributor can be used with different LockingPolicies for different scenarios When different threads operate on it (e.g. application sends chunks and RouDi adds and removes queues), a locking policy must be used that ensures consistent data in the ChunkDistributorData.

Todo:
There are currently some challenge: For the stored queues and the history, containers are used which are not thread safe. Therefore we use an inter-process mutex. But this can lead to deadlocks if a user process gets terminated while one of its threads is in the ChunkDistributor and holds a lock. An easier setup would be if changing the queues by a middleware thread and sending chunks by the user process would not interleave. I.e. there is no concurrent access to the containers. Then a memory synchronization would be sufficient. The cleanup() call is the biggest challenge. This is used to free chunks that are still held by a not properly terminated user application. Even if access from middleware and user threads do not overlap, the history container to cleanup could be in an inconsistent state as the application was hard terminated while changing it. We would need a container like the UsedChunkList to have one that is robust against such inconsistencies.... A perfect job for our future selves

Member Function Documentation

◆ addToHistoryWithoutDelivery()

template<typename ChunkDistributorDataType >
void iox::popo::ChunkDistributor< ChunkDistributorDataType >::addToHistoryWithoutDelivery ( mepoo::SharedChunk  chunk)
inlinenoexcept

Update the chunk history but do not deliver the chunk to any chunk queue. E.g. use case is to to update a non offered field in ara.

Parameters
[in]sharedchunk add to the chunk history

◆ cleanup()

template<typename ChunkDistributorDataType >
void iox::popo::ChunkDistributor< ChunkDistributorDataType >::cleanup
inlinenoexcept

cleanup the used shrared memory chunks

Todo:
currently we have a deadlock / mutex destroy vulnerability if the ThreadSafePolicy is used and a sending application dies when having the lock for sending. If the RouDi daemon wants to cleanup or does discovery changes we have a deadlock or an exception when destroying the mutex As long as we don't have a multi-threaded lock-free ChunkDistributor or another concept we die here

◆ deliverToAllStoredQueues()

template<typename ChunkDistributorDataType >
void iox::popo::ChunkDistributor< ChunkDistributorDataType >::deliverToAllStoredQueues ( mepoo::SharedChunk  chunk)
inlinenoexcept

Deliver the provided shared chunk to all the stored chunk queues. The chunk will be added to the chunk history.

Parameters
[in]sharedchunk to be delivered

◆ deliverToQueue()

template<typename ChunkDistributorDataType >
bool iox::popo::ChunkDistributor< ChunkDistributorDataType >::deliverToQueue ( cxx::not_null< ChunkQueueData_t *const >  queue,
mepoo::SharedChunk  chunk 
)
inlinenoexcept

Deliver the provided shared chunk to the provided chunk queue. The chunk will NOT be added to the chunk history.

Parameters
[in]chunkqueue to which this chunk shall be delivered
[in]sharedchunk to be delivered
Returns
false if a queue overflow occured, otherwise true

◆ getHistoryCapacity()

template<typename ChunkDistributorDataType >
uint64_t iox::popo::ChunkDistributor< ChunkDistributorDataType >::getHistoryCapacity
inlinenoexcept

Get the capacity of the chunk history.

Returns
chunk history capacity

◆ getHistorySize()

template<typename ChunkDistributorDataType >
uint64_t iox::popo::ChunkDistributor< ChunkDistributorDataType >::getHistorySize
inlinenoexcept

Get the current size of the chunk history.

Returns
chunk history size

◆ hasStoredQueues()

template<typename ChunkDistributorDataType >
bool iox::popo::ChunkDistributor< ChunkDistributorDataType >::hasStoredQueues
inlinenoexcept

Get the information whether there are any stored chunk queues.

Returns
true if there are stored chunk queues, false if not

◆ tryAddQueue()

template<typename ChunkDistributorDataType >
cxx::expected< ChunkDistributorError > iox::popo::ChunkDistributor< ChunkDistributorDataType >::tryAddQueue ( cxx::not_null< ChunkQueueData_t *const >  queueToAdd,
const uint64_t  requestedHistory = 0u 
)
inlinenoexcept

Add a queue to the internal list of chunk queues to which chunks are delivered when calling deliverToAllStoredQueues.

Parameters
[in]queueToAddchunk queue to add to the list
[in]requestedHistorynumber of last chunks from history to send if available. If history size is smaller then the available history size chunks are provided
Returns
if the queue could be added it returns success, otherwiese a ChunkDistributor error

◆ tryRemoveQueue()

template<typename ChunkDistributorDataType >
cxx::expected< ChunkDistributorError > iox::popo::ChunkDistributor< ChunkDistributorDataType >::tryRemoveQueue ( cxx::not_null< ChunkQueueData_t *const >  queueToRemove)
inlinenoexcept

Remove a queue from the internal list of chunk queues.

Parameters
[in]chunkqueue to remove from the list
Returns
if the queue could be removed it returns success, otherwiese a ChunkDistributor error

The documentation for this class was generated from the following files: