Feature #4696
openSegmentPublisher: segmentation utility
0%
Description
Develop a SegmentPublisher
class that implements the recommended segmentation procedure.
It should support both a finite object and an infinite stream.
A few existing implementations:
Updated by Junxiao Shi about 6 years ago
- Subject changed from Provide a segment publisher to complement segment fetcher to SegmentPublisher: segmentation utility
- Description updated (diff)
- Target version set to v0.7
- Estimated time set to 6.00 h
Updated by Junxiao Shi about 6 years ago
- Blocks Feature #4699: Dispatcher: use SegmentPublisher in StatusDatasetContext added
Updated by Davide Pesavento about 6 years ago
It should support both a finite object and an infinite stream.
Can you elaborate on the "infinite stream"? Are you simply talking about the case when the content is read from a std::istream
? Or are you referring to continuous data production? I think you're mixing two things here: (i) whether the ADU is a stream or a quantized piece of content with well-defined boundaries, and (ii) whether data production is continuous or instantaneous; these two dimensions are orthogonal to each other.
Updated by Davide Pesavento about 6 years ago
- Blocks Task #4702: ndnputchunks: use Segmenter class added
Updated by Ashlesh Gawande about 6 years ago
- Blocks Feature #4760: Use SegmentPublisher utility from PSync to publish LSA data added
Updated by Ashlesh Gawande about 6 years ago
- Blocks Feature #4662: Segment hello and sync data in partial sync added
Updated by Ashlesh Gawande about 6 years ago
- Blocks Task #4716: Segment sync data in full sync added
Updated by Ashlesh Gawande about 6 years ago
- Blocks deleted (Feature #4662: Segment hello and sync data in partial sync)
Updated by Ashlesh Gawande about 6 years ago
- Blocks deleted (Task #4716: Segment sync data in full sync)
Updated by Ashlesh Gawande almost 6 years ago
- Blocks deleted (Feature #4760: Use SegmentPublisher utility from PSync to publish LSA data)
Updated by Ashlesh Gawande almost 6 years ago
- Blocks Feature #4791: Use SegmentPublisher utility from ndn-cxx to publish LSA data added
Updated by Junxiao Shi over 5 years ago
- Has duplicate Feature #1354: SegmentedEncodingBuffer added
Updated by Junxiao Shi about 5 years ago
- Blocks Feature #5004: CertificateBundle encoding and decoding added
Updated by Junxiao Shi about 5 years ago
Can you elaborate on the "infinite stream"? Are you simply talking about the case when the content is read from a
std::istream
? Or are you referring to continuous data production?
I'm referring to continuous data production.
For reference:
Recently I'm attempting to implement SegmentPublisher
in NDNts. It has two parts: chunker slices the input into segment payloads no larger than a specified size, and serve turns the chunks into Data packets for available.
The basic logic is pull-based: when the producer receives an Interest for a segment not yet generated, it attempts to read more input via the chunker, until reaching the requested segment number.
A difficulty I'm running into is, since the input stream is non-seekable, the producer has to keep all generated segments in memory, because it should be able to satisfy an Interest asking for an older segment. ndnputchunks
has the same problem #4866.
Updated by Ashlesh Gawande over 4 years ago
Junxiao Shi wrote:
Can you elaborate on the "infinite stream"? Are you simply talking about the case when the content is read from a
std::istream
? Or are you referring to continuous data production?I'm referring to continuous data production.
For reference:
Recently I'm attempting to implementSegmentPublisher
in NDNts. It has two parts: chunker slices the input into segment payloads no larger than a specified size, and serve turns the chunks into Data packets for available.
The basic logic is pull-based: when the producer receives an Interest for a segment not yet generated, it attempts to read more input via the chunker, until reaching the requested segment number.
A difficulty I'm running into is, since the input stream is non-seekable, the producer has to keep all generated segments in memory, because it should be able to satisfy an Interest asking for an older segment.ndnputchunks
has the same problem #4866.
Can supporting continuous data stream be a separate issue - seems more complicated (can't seem to find time to research further)? This can be just a simple segment and store a given buffer (instantaneous) utility (a more generalized version of PSync's).
Updated by Junxiao Shi over 4 years ago
You don't have to solve the infinite buffering issue. Other than that, keep reading from a stream and generate a segment when you have a pre-configured amount of payload. It's simple enough to fit in this issue.
Updated by Jeremy Clark over 4 years ago
For the fetcher side of this change, should the current behavior where the fetcher retrieves all segments up to the last block, writes them to a buffer, and passes the buffer to the calling process still be supported?
If the stream is continuous, the m_receivedSegments hashmap can't keep storing new segments.
Or is it possible the fetcher can determine whether the object is finite or infinite such as through the existence of a final block ID in the first segment and react accordingly?
Updated by Ashlesh Gawande over 4 years ago
Okay, so I think this SegmentPublisher should basically be a Segmenter and just be responsible for Segmenting given data and store it in desired storage type. Publishing would then be the responsibility of the app.
#include <ndn-cxx/face.hpp>
#include <ndn-cxx/security/v2/key-chain.hpp>
#include <ndn-cxx/ims/in-memory-storage.hpp>
#include <ndn-cxx/ims/in-memory-storage-fifo.hpp>
#include <ndn-cxx/ims/in-memory-storage-persistent.hpp>
#include <ndn-cxx/ims/in-memory-storage-lru.hpp>
#include <ndn-cxx/ims/in-memory-storage-lfu.hpp>
namespace ndn {
namespace util {
template<typename IMS>
class Segmenter
{
static_assert(std::is_base_of<ndn::InMemoryStorage, IMS>::value == true);
public:
struct Options
{
security::SigningInfo signingInfo;
time::milliseconds freshnessPeriod{10000};
size_t maxSegmentSize = MAX_NDN_PACKET_SIZE >> 1;
// optionally return the corresponding segment to application so that it can answer the interest:
optional<uint64_t> seg;
};
/*
Always use ioService in IMS as it seems like without it data is not marked stale
(this could be leading to some problems that we have seen in PSync)
I think Apps can use longer freshness instead of never marking the data stale ?
*/
/*
Persistent does not have a limit and hence no limit constructor
*/
template<typename T = IMS,
typename = typename std::enable_if<std::is_same<T, InMemoryStoragePersistent>::value>::type>
Segmenter(boost::asio::io_service& ioService, KeyChain& keyChain)
: m_keyChain(keyChain)
{
ims = std::make_unique<IMS>(ioService);
}
template<typename T = IMS,
typename = typename std::enable_if<!std::is_same<T, InMemoryStoragePersistent>::value>::type>
Segmenter(boost::asio::io_service& ioService, KeyChain& keyChain,
size_t limit = std::numeric_limits<size_t>::max())
: m_keyChain(keyChain)
{
ims = std::make_unique<IMS>(ioService, limit);
}
/*
Not sure whether Options should go here or in the constructor or both?
It makes more sense here, as applications maybe serving different kinds of data
*/
const std::shared_ptr<Data>&
store(const Name& dataName, const uint8_t* buffer, size_t bufferSize, const Options& options);
const std::shared_ptr<Data>&
store(const Name& dataName, Block, const Options& options);
/*
Not sure if this needs to be asynchronous? putchunk's is synchronous.
*/
const std::shared_ptr<Data>&
store(const Name& dataName, std::istream& is, const Options& options);
public:
std::unique_ptr<IMS> ims;
private:
KeyChain& m_keyChain;
};
} // namespace util
} // namespace ndn
Applications should have the Segmenter as a member.
Applications can then segment and store that data using Segmenter and serve it by directly accessing the public ims.
Updated by Junxiao Shi over 4 years ago
Okay, so I think this SegmentPublisher should basically be a Segmenter and just be responsible for Segmenting given data and store it in desired storage type.
Yes.
const std::shared_ptr<Data>& store(const Name& dataName, const uint8_t* buffer, size_t bufferSize, const Options& options); const std::shared_ptr<Data>& store(const Name& dataName, Block, const Options& options); const std::shared_ptr<Data>& store(const Name& dataName, std::istream& is, const Options& options);
What's the return value of these functions? Data packets have been inserted into the IMS, and do not need to be returned.
const std::shared_ptr<Data>& store(const Name& dataName, std::istream& is, const Options& options);
This function needs to be async. ndnputchunks
assumes the stream is finite, which is not necessarily the case. It breaks if you pipe a infinite stream (e.g. video from webcam) into ndnputchunks
.
Publishing would then be the responsibility of the app.
Applications should have the Segmenter as a member.
Applications can then segment and store that data using Segmenter and serve it by directly accessing the public ims.
This functionality should be provided in another class, also under this Feature.
Updated by Junxiao Shi over 4 years ago
For reference, I just put a similar feature in @ndn/segmented-object
package. Its main components are:
- ChunkSource takes an input that is either a buffer, a (potentially infinite) stream, or an iterable of buffers, and returns an iterable of chunks that are sized properly for segments.
- DataProducer turns these chunks into Data packets. It handles Data naming, setting ContentType and FreshnessPeriod, etc. In the future it will also handle encryption.
- SequentialDataProducer adds Data buffering functionality, which is needed only if the underlying ChunkSource is not "seekable": a buffer is seekable because you can read an earlier chunk again, while a stream is not seekable. It has
bufferBehind
option that determines how many earlier segments (in regards to latest arriving Interest) are to be kept in the buffer. The caveat is that, ifbufferBehind
is not infinity, some Interests requesting early segments may not be answered when there are multiple consumers. - serve function handles prefix registration.
- serveVersioned function can automatically insert a version component, enabling basic version discovery through CanBePrefix. In the future it will also publish RDR metadata packet.
Updated by Ashlesh Gawande over 4 years ago
Thank you for the comments.
The return value is the data corresponding to the given segment (named seg in Options for now) after segmentation.
If an app (or the new Publisher class) receives an interest, and wants to segment data for it, it can tell the Segmenter to return the segment the interest wants. Or is it better to let it do the segmentation and then let the app (or Publisher class) find corresponding data using ims in such an on-demand segmentation?
For the Publisher class - is there any functionality overlap with the Dispatcher?
Also, a question on the ndnts feature - is there a major difference between having a bufferBehind option and say using a Fifo IMS with a reasonable limit (as dictated by the app) such that old data is kicked out on its own as the stream continues?
Updated by Ashlesh Gawande over 4 years ago
Updated class, not sure what's the best way to make the function async:
#include <ndn-cxx/face.hpp>
#include <ndn-cxx/security/v2/key-chain.hpp>
#include <ndn-cxx/ims/in-memory-storage.hpp>
#include <ndn-cxx/ims/in-memory-storage-fifo.hpp>
#include <ndn-cxx/ims/in-memory-storage-persistent.hpp>
#include <ndn-cxx/ims/in-memory-storage-lru.hpp>
#include <ndn-cxx/ims/in-memory-storage-lfu.hpp>
#include <ndn-cxx/util/scheduler.hpp>
namespace ndn {
namespace util {
template<typename IMS>
class Segmenter
{
static_assert(std::is_base_of<InMemoryStorage, IMS>::value == true);
public:
struct Options
{
security::SigningInfo signingInfo;
time::milliseconds freshnessPeriod{10000};
size_t maxSegmentSize = MAX_NDN_PACKET_SIZE >> 1;
optional<uint64_t> segment; // return the requested segment to app so that it can answer the interest
bool removeStale = false; // Schedule removal of data from IMS after freshnessPeriod if true
};
/*
Always use ioService in IMS as it seems like without it data is not marked stale
I think Apps can use longer freshness instead of never marking the data stale?
*/
/*
Persistent does not have a limit
*/
template<typename T = IMS,
typename = typename std::enable_if<std::is_same<T, InMemoryStoragePersistent>::value>::type>
Segmenter(boost::asio::io_service& ioService, KeyChain& keyChain)
: m_scheduler(ioService)
, m_keyChain(keyChain)
{
ims = std::make_unique<IMS>(ioService);
}
template<typename T = IMS,
typename = typename std::enable_if<!std::is_same<T, InMemoryStoragePersistent>::value>::type>
Segmenter(boost::asio::io_service& ioService, KeyChain& keyChain,
size_t limit = std::numeric_limits<size_t>::max())
: m_scheduler(ioService)
, m_keyChain(keyChain)
{
ims = std::make_unique<IMS>(ioService, limit);
}
const std::shared_ptr<Data>&
store(const Name& dataName, const uint8_t* buffer, size_t bufferSize, const Options& options);
const std::shared_ptr<Data>&
store(const Name& dataName, Block, const Options& options);
/*
If infinite, then I was thinking to spawn a detached std::thread to insert from the stream continously.
-> I guess no locks are needed assuming that application will only use
this function, use it once (maybe we could enforce this), and not insert anything in the IMS directly.
Or is it better to put a lock when inserting?
-> Are there any alternatives to using std::thread to be considered?
*/
const std::shared_ptr<Data>&
store(const Name& dataName, std::istream& is, const Options& options, bool infinite = false);
public:
std::unique_ptr<IMS> ims;
private:
Scheduler m_scheduler;
KeyChain& m_keyChain;
};
} // namespace util
} // namespace ndn
Updated by Junxiao Shi over 4 years ago
To store a (potentially infinite) stream, using a secondary thread would open up a lot of thread safety issues. Instead:
- Maintain a list of in-progress stream store operations in the segmenter.
- Triggered by the associated
boost::asio::io_context
, read from stream and save into IMS.- In Arduino this would be called in
loop()
function. You'll have to find the Asio equivalent.
- In Arduino this would be called in
- The
store
function should return a handle of the in-progress stream store operation, which has- Function to cancel the operation: stop storing more segments and close the stream)
- Function to know progress, i.e. how many segments have been stored.
- Signals to know progress: triggered after storing each segment, and triggered when the operation has completed and stream is closed.
- Signal to receive errors.
- An in-progress stream store operation should be removed from the aforementioned list when it's completed, canceled, or errored.
Updated by Ashlesh Gawande over 4 years ago
Thanks, not sure on two points:
- How to use std::istream with posix::stream_descriptor (Currently passing file descriptor instead)?
- Should we async_read_some or async_read?
Updated:
#include <ndn-cxx/face.hpp>
#include <ndn-cxx/security/v2/key-chain.hpp>
#include <ndn-cxx/ims/in-memory-storage.hpp>
#include <ndn-cxx/ims/in-memory-storage-fifo.hpp>
#include <ndn-cxx/ims/in-memory-storage-persistent.hpp>
#include <ndn-cxx/ims/in-memory-storage-lru.hpp>
#include <ndn-cxx/ims/in-memory-storage-lfu.hpp>
#include <ndn-cxx/util/scheduler.hpp>
#include <ndn-cxx/util/signal.hpp>
#include <ndn-cxx/detail/cancel-handle.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
namespace ndn {
namespace util {
template<typename IMS>
class Segmenter
{
static_assert(std::is_base_of<InMemoryStorage, IMS>::value == true);
public:
struct Options
{
security::SigningInfo signingInfo;
time::milliseconds freshnessPeriod{10000};
size_t maxSegmentSize = MAX_NDN_PACKET_SIZE >> 1;
optional<uint64_t> segment; // return the requested segment to app so that it can answer the interest
bool removeStale = false; // Schedule removal of data from IMS after freshnessPeriod if true
};
class StreamHandle : public ndn::detail::CancelHandle
{
public:
StreamHandle(boost::asio::io_service& ioService, int fd, const Segmenter::Options& options)
: CancelHandle([this] () { stop(); })
, m_socket(ioService, fd)
, m_options(options)
{
start();
}
/*
Start async_read_some to read up to maxSegmentSize
-> If call back has an error, emit onError and stop.
-> If no more bytes to read, emit onComplete and stop
-> If bytes read, insert into IMS, increment m_numSegmentsStored,
emit onSegmentInserted, call start again
*/
void
start();
/*
Emit onError and cancel async operations, close, and release fd
*/
void
stop();
size_t
getNumSegmentsStored();
public:
Signal<StreamHandle, Data> onSegmentInserted;
Signal<StreamHandle, boost::system::error_code> onError;
Signal<StreamHandle> onComplete;
private:
boost::asio::posix::stream_descriptor m_socket;
Options& m_options;
size_t m_numSegmentsStored;
};
/*
Persistent does not have a limit
*/
template<typename T = IMS,
typename = typename std::enable_if<std::is_same<T, InMemoryStoragePersistent>::value>::type>
Segmenter(boost::asio::io_service& ioService, KeyChain& keyChain)
: m_scheduler(ioService)
, m_keyChain(keyChain)
, m_ioService(ioService)
{
ims = std::make_unique<IMS>(ioService);
}
template<typename T = IMS,
typename = typename std::enable_if<!std::is_same<T, InMemoryStoragePersistent>::value>::type>
Segmenter(boost::asio::io_service& ioService, KeyChain& keyChain,
size_t limit = std::numeric_limits<size_t>::max())
: m_scheduler(ioService)
, m_keyChain(keyChain)
, m_ioService(ioService)
{
ims = std::make_unique<IMS>(ioService, limit);
}
/*
* Return the Data corresponding to options.segment
*/
const std::shared_ptr<Data>&
store(const Name& dataName, const uint8_t* buffer, size_t bufferSize, const Options& options);
const std::shared_ptr<Data>&
store(const Name& dataName, Block, const Options& options);
/*
* Create Handle for stream
* -> Insert into m_streams, capture iterator
* -> onError or onComplete callback, remove the handler from m_streams
* -> Return the handler
*
* Takes in a file descriptor to pass to boost::asio::posix::stream_descriptor
* (This change because I am not sure how to pass std::istream to stream_descriptor)
*/
const std::shared_ptr<StreamHandle>&
store(const Name& dataName, int fd, const Options& options);
public:
std::unique_ptr<IMS> ims;
private:
Scheduler m_scheduler;
KeyChain& m_keyChain;
boost::asio::io_service& m_ioService;
std::set<std::shared_ptr<StreamHandle>> m_streams;
};
} // namespace util
} // namespace ndn
Updated by Ashlesh Gawande over 4 years ago
After discussion with Jeremy, seems easier to just take in a reference to IMS instead.
Updated by Davide Pesavento over 4 years ago
- Status changed from New to In Progress
- Target version set to 0.8.0
Updated by Davide Pesavento over 4 years ago
Junxiao Shi wrote:
To store a (potentially infinite) stream, using a secondary thread would open up a lot of thread safety issues. Instead:
- Maintain a list of in-progress stream store operations in the segmenter.
- Triggered by the associated
boost::asio::io_context
, read from stream and save into IMS. [...]- The
store
function should return a handle of the in-progress stream store operation, which has [...]- An in-progress stream store operation should be removed from the aforementioned list when it's completed, canceled, or errored.
This makes no sense at all, on a various different levels.
First of all, as usual, you're needlessly complicating the issue and vastly overengineering the solution. We can easily solve 80% of the current use cases with 20% of the work. Let's do that instead of conjecturing and trying to solve theoretical problems on which we don't have enough application-driven experience.
Secondly, this approach is technically flawed. Asio does not guarantee that async operations will behave correctly on file descriptors that refer to regular files (as opposed to sockets). And even if it does happen to work on a particular platform, (1) it's not portable and (2) file I/O is actually performed immediately, not asynchronously. Asio is a networking library, don't try to use it in other use cases.
I strongly recommend to keep it simple and use a std::istream
. Optionally you can support an eager mode, where the entire stream until EOF is consumed and Data packets are produced immediately, and a lazy mode, where no Data packets are initially produced and the app has to explicitly call another function to generate the next n
segments up to a given segment number.
The istream
interface is obviously in addition to the interface(s) taking a buffer (or array of buffers) that have already been proposed.
Updated by Junxiao Shi over 4 years ago
I never said the stream is a socket. It could be (a wrapper of) std::istream
.
I don't know enough about Asio. It's not meant to be operated like a socket, but loop()
is simply a function that is invoked frequently via a timer.
The equivalent functionality in NDNts is here:
https://github.com/yoursunny/NDNts/tree/b431e05dcff54b21b272c9b8a637cc74bdca81ad/packages/segmented-object/src/serve/chunk-source
Start looking at BufferChunkSource
(input from a byte array) and StreamChunkSource
(input from an infinite stream) and FileChunkSource
(input from a file with known size), as well as their base classes.
They all implement ChunkSource
interface. If getChunk
is implemented, it's lazy; otherwise, listChunks
will be used, it's eager.
Updated by Ashlesh Gawande over 4 years ago
What ndn-cxx applications require SegmentPublisher to be implemented at this point of time?
- For #4699,
StatusDatasetContext
can be implemented with Segmenter instead of replacing. - NLSR, PSync would be using the Segmenter directly.
- ndnputchunks can also just replace custom in-memory storage solution with the Segmenter.
Updated by Junxiao Shi over 4 years ago
All three applications should use SegmentPublisher
to replace their custom buffering behavior, not just Segmenter
. More importantly, this allows new developers to more easily create new applications.
Updated by Davide Pesavento over 3 years ago
- Target version changed from 0.8.0 to 0.9.0
Updated by Davide Pesavento over 3 years ago
- Description updated (diff)
- Assignee deleted (
Ashlesh Gawande)
Updated by Davide Pesavento about 2 years ago
- Related to Feature #5247: Segmenter: simple content segmentation helper added
Updated by Davide Pesavento about 2 years ago
- Blocks deleted (Task #4702: ndnputchunks: use Segmenter class)
Updated by Davide Pesavento about 2 years ago
- Status changed from In Progress to New
- Priority changed from Normal to High
- Target version deleted (
0.9.0)
I'm splitting this task and implementing the simpler cases (finite object, eager creation of all Data packets in one shot) in #5247.
Updated by Junxiao Shi about 1 year ago
- Blocks deleted (Feature #4791: Use SegmentPublisher utility from ndn-cxx to publish LSA data)