Project

General

Profile

Feature #4696

SegmentPublisher: segmentation utility

Added by Ashlesh Gawande almost 2 years ago. Updated 11 days ago.

Status:
In Progress
Priority:
Normal
Category:
Utils
Target version:
Start date:
Due date:
% Done:

0%

Estimated time:
6.00 h

Description

Develop a SegmentPublisher class that implements the recommended segmentation procedure.
It should support both a finite object and an infinite stream.

A few existing implementations:


Related issues

Has duplicate ndn-cxx - Feature #1354: SegmentedEncodingBufferDuplicate

Actions
Blocks ndn-cxx - Feature #4699: Dispatcher: use SegmentPublisher in StatusDatasetContextNew

Actions
Blocks ndn-tools - Task #4702: ndnputchunks: use SegmentPublisherNew

Actions
Blocks NLSR - Feature #4791: Use SegmentPublisher utility from ndn-cxx to publish LSA dataNew12/17/2018

Actions
Blocks ndn-cxx - Feature #5004: CertificateBundle encoding and decodingIn ProgressJeremy Clark

Actions
#1

Updated by Davide Pesavento almost 2 years ago

  • Description updated (diff)
#2

Updated by Junxiao Shi almost 2 years ago

  • Subject changed from Provide a segment publisher to complement segment fetcher to SegmentPublisher: segmentation utility
  • Description updated (diff)
  • Target version set to v0.7
  • Estimated time set to 6.00 h
#3

Updated by Junxiao Shi almost 2 years ago

  • Blocks Feature #4699: Dispatcher: use SegmentPublisher in StatusDatasetContext added
#4

Updated by Davide Pesavento almost 2 years ago

It should support both a finite object and an infinite stream.

Can you elaborate on the "infinite stream"? Are you simply talking about the case when the content is read from a std::istream? Or are you referring to continuous data production? I think you're mixing two things here: (i) whether the ADU is a stream or a quantized piece of content with well-defined boundaries, and (ii) whether data production is continuous or instantaneous; these two dimensions are orthogonal to each other.

#5

Updated by Davide Pesavento almost 2 years ago

  • Blocks Task #4702: ndnputchunks: use SegmentPublisher added
#6

Updated by Ashlesh Gawande over 1 year ago

  • Blocks Feature #4760: Use SegmentPublisher utility from PSync to publish LSA data added
#7

Updated by Ashlesh Gawande over 1 year ago

  • Blocks Feature #4662: Segment hello and sync data in partial sync added
#8

Updated by Ashlesh Gawande over 1 year ago

  • Blocks Task #4716: Segment sync data in full sync added
#9

Updated by Ashlesh Gawande over 1 year ago

  • Blocks deleted (Feature #4662: Segment hello and sync data in partial sync)
#10

Updated by Ashlesh Gawande over 1 year ago

  • Blocks deleted (Task #4716: Segment sync data in full sync)
#11

Updated by Ashlesh Gawande over 1 year ago

  • Blocks deleted (Feature #4760: Use SegmentPublisher utility from PSync to publish LSA data)
#12

Updated by Ashlesh Gawande over 1 year ago

  • Blocks Feature #4791: Use SegmentPublisher utility from ndn-cxx to publish LSA data added
#13

Updated by Junxiao Shi about 1 year ago

#14

Updated by Davide Pesavento 9 months ago

  • Target version deleted (v0.7)
#15

Updated by Junxiao Shi 8 months ago

  • Blocks Feature #5004: CertificateBundle encoding and decoding added
#16

Updated by Junxiao Shi 8 months ago

Can you elaborate on the "infinite stream"? Are you simply talking about the case when the content is read from a std::istream? Or are you referring to continuous data production?

I'm referring to continuous data production.

For reference:
Recently I'm attempting to implement SegmentPublisher in NDNts. It has two parts: chunker slices the input into segment payloads no larger than a specified size, and serve turns the chunks into Data packets for available.
The basic logic is pull-based: when the producer receives an Interest for a segment not yet generated, it attempts to read more input via the chunker, until reaching the requested segment number.
A difficulty I'm running into is, since the input stream is non-seekable, the producer has to keep all generated segments in memory, because it should be able to satisfy an Interest asking for an older segment. ndnputchunks has the same problem #4866.

#17

Updated by Ashlesh Gawande 3 months ago

Junxiao Shi wrote:

Can you elaborate on the "infinite stream"? Are you simply talking about the case when the content is read from a std::istream? Or are you referring to continuous data production?

I'm referring to continuous data production.

For reference:
Recently I'm attempting to implement SegmentPublisher in NDNts. It has two parts: chunker slices the input into segment payloads no larger than a specified size, and serve turns the chunks into Data packets for available.
The basic logic is pull-based: when the producer receives an Interest for a segment not yet generated, it attempts to read more input via the chunker, until reaching the requested segment number.
A difficulty I'm running into is, since the input stream is non-seekable, the producer has to keep all generated segments in memory, because it should be able to satisfy an Interest asking for an older segment. ndnputchunks has the same problem #4866.

Can supporting continuous data stream be a separate issue - seems more complicated (can't seem to find time to research further)? This can be just a simple segment and store a given buffer (instantaneous) utility (a more generalized version of PSync's).

#18

Updated by Junxiao Shi 3 months ago

You don't have to solve the infinite buffering issue. Other than that, keep reading from a stream and generate a segment when you have a pre-configured amount of payload. It's simple enough to fit in this issue.

#19

Updated by Jeremy Clark 3 months ago

For the fetcher side of this change, should the current behavior where the fetcher retrieves all segments up to the last block, writes them to a buffer, and passes the buffer to the calling process still be supported?

If the stream is continuous, the m_receivedSegments hashmap can't keep storing new segments.

Or is it possible the fetcher can determine whether the object is finite or infinite such as through the existence of a final block ID in the first segment and react accordingly?

#20

Updated by Ashlesh Gawande 3 months ago

Okay, so I think this SegmentPublisher should basically be a Segmenter and just be responsible for Segmenting given data and store it in desired storage type. Publishing would then be the responsibility of the app.

#include <ndn-cxx/face.hpp>
#include <ndn-cxx/security/v2/key-chain.hpp>
#include <ndn-cxx/ims/in-memory-storage.hpp>
#include <ndn-cxx/ims/in-memory-storage-fifo.hpp>
#include <ndn-cxx/ims/in-memory-storage-persistent.hpp>
#include <ndn-cxx/ims/in-memory-storage-lru.hpp>
#include <ndn-cxx/ims/in-memory-storage-lfu.hpp>

namespace ndn {
namespace util {

template<typename IMS>
class Segmenter
{
  static_assert(std::is_base_of<ndn::InMemoryStorage, IMS>::value == true);

public:
  struct Options
  {
    security::SigningInfo signingInfo;
    time::milliseconds freshnessPeriod{10000};
    size_t maxSegmentSize = MAX_NDN_PACKET_SIZE >> 1;
    // optionally return the corresponding segment to application so that it can answer the interest:
    optional<uint64_t> seg;
  };

  /*
    Always use ioService in IMS as it seems like without it data is not marked stale
    (this could be leading to some problems that we have seen in PSync)
    I think Apps can use longer freshness instead of never marking the data stale ?
  */

  /*
     Persistent does not have a limit and hence no limit constructor
  */
  template<typename T = IMS,
           typename = typename std::enable_if<std::is_same<T, InMemoryStoragePersistent>::value>::type>
  Segmenter(boost::asio::io_service& ioService, KeyChain& keyChain)
   : m_keyChain(keyChain)
  {
    ims = std::make_unique<IMS>(ioService);
  }

  template<typename T = IMS,
           typename = typename std::enable_if<!std::is_same<T, InMemoryStoragePersistent>::value>::type>
  Segmenter(boost::asio::io_service& ioService, KeyChain& keyChain,
            size_t limit = std::numeric_limits<size_t>::max())
   : m_keyChain(keyChain)
  {
    ims = std::make_unique<IMS>(ioService, limit);
  }

  /*
    Not sure whether Options should go here or in the constructor or both?
    It makes more sense here, as applications maybe serving different kinds of data
  */
  const std::shared_ptr<Data>&
  store(const Name& dataName, const uint8_t* buffer, size_t bufferSize, const Options& options);

  const std::shared_ptr<Data>&
  store(const Name& dataName, Block, const Options& options);

  /*
    Not sure if this needs to be asynchronous? putchunk's is synchronous.
  */
  const std::shared_ptr<Data>&
  store(const Name& dataName, std::istream& is, const Options& options);

public:
  std::unique_ptr<IMS> ims;

private:
  KeyChain& m_keyChain;
};

} // namespace util
} // namespace ndn

Applications should have the Segmenter as a member.
Applications can then segment and store that data using Segmenter and serve it by directly accessing the public ims.

#21

Updated by Junxiao Shi 3 months ago

Okay, so I think this SegmentPublisher should basically be a Segmenter and just be responsible for Segmenting given data and store it in desired storage type.

Yes.

  const std::shared_ptr<Data>&
  store(const Name& dataName, const uint8_t* buffer, size_t bufferSize, const Options& options);

  const std::shared_ptr<Data>&
  store(const Name& dataName, Block, const Options& options);

  const std::shared_ptr<Data>&
  store(const Name& dataName, std::istream& is, const Options& options);

What's the return value of these functions? Data packets have been inserted into the IMS, and do not need to be returned.

  const std::shared_ptr<Data>&
  store(const Name& dataName, std::istream& is, const Options& options);

This function needs to be async. ndnputchunks assumes the stream is finite, which is not necessarily the case. It breaks if you pipe a infinite stream (e.g. video from webcam) into ndnputchunks.

Publishing would then be the responsibility of the app.
Applications should have the Segmenter as a member.
Applications can then segment and store that data using Segmenter and serve it by directly accessing the public ims.

This functionality should be provided in another class, also under this Feature.

#22

Updated by Junxiao Shi 3 months ago

For reference, I just put a similar feature in @ndn/segmented-object package. Its main components are:

  • ChunkSource takes an input that is either a buffer, a (potentially infinite) stream, or an iterable of buffers, and returns an iterable of chunks that are sized properly for segments.
  • DataProducer turns these chunks into Data packets. It handles Data naming, setting ContentType and FreshnessPeriod, etc. In the future it will also handle encryption.
  • SequentialDataProducer adds Data buffering functionality, which is needed only if the underlying ChunkSource is not "seekable": a buffer is seekable because you can read an earlier chunk again, while a stream is not seekable. It has bufferBehind option that determines how many earlier segments (in regards to latest arriving Interest) are to be kept in the buffer. The caveat is that, if bufferBehind is not infinity, some Interests requesting early segments may not be answered when there are multiple consumers.
  • serve function handles prefix registration.
  • serveVersioned function can automatically insert a version component, enabling basic version discovery through CanBePrefix. In the future it will also publish RDR metadata packet.
#23

Updated by Ashlesh Gawande 3 months ago

Thank you for the comments.

The return value is the data corresponding to the given segment (named seg in Options for now) after segmentation.
If an app (or the new Publisher class) receives an interest, and wants to segment data for it, it can tell the Segmenter to return the segment the interest wants. Or is it better to let it do the segmentation and then let the app (or Publisher class) find corresponding data using ims in such an on-demand segmentation?

For the Publisher class - is there any functionality overlap with the Dispatcher?

Also, a question on the ndnts feature - is there a major difference between having a bufferBehind option and say using a Fifo IMS with a reasonable limit (as dictated by the app) such that old data is kicked out on its own as the stream continues?

#24

Updated by Ashlesh Gawande 3 months ago

Updated class, not sure what's the best way to make the function async:

#include <ndn-cxx/face.hpp>
#include <ndn-cxx/security/v2/key-chain.hpp>
#include <ndn-cxx/ims/in-memory-storage.hpp>
#include <ndn-cxx/ims/in-memory-storage-fifo.hpp>
#include <ndn-cxx/ims/in-memory-storage-persistent.hpp>
#include <ndn-cxx/ims/in-memory-storage-lru.hpp>
#include <ndn-cxx/ims/in-memory-storage-lfu.hpp>
#include <ndn-cxx/util/scheduler.hpp>

namespace ndn {
namespace util {

template<typename IMS>
class Segmenter
{
  static_assert(std::is_base_of<InMemoryStorage, IMS>::value == true);

public:
  struct Options
  {
    security::SigningInfo signingInfo;
    time::milliseconds freshnessPeriod{10000};
    size_t maxSegmentSize = MAX_NDN_PACKET_SIZE >> 1;
    optional<uint64_t> segment; // return the requested segment to app so that it can answer the interest
    bool removeStale = false;   // Schedule removal of data from IMS after freshnessPeriod if true
  };

  /*
    Always use ioService in IMS as it seems like without it data is not marked stale
    I think Apps can use longer freshness instead of never marking the data stale?
  */

  /*
     Persistent does not have a limit
  */
  template<typename T = IMS,
           typename = typename std::enable_if<std::is_same<T, InMemoryStoragePersistent>::value>::type>
  Segmenter(boost::asio::io_service& ioService, KeyChain& keyChain)
   : m_scheduler(ioService)
   , m_keyChain(keyChain)
  {
    ims = std::make_unique<IMS>(ioService);
  }

  template<typename T = IMS,
           typename = typename std::enable_if<!std::is_same<T, InMemoryStoragePersistent>::value>::type>
  Segmenter(boost::asio::io_service& ioService, KeyChain& keyChain,
            size_t limit = std::numeric_limits<size_t>::max())
   : m_scheduler(ioService)
   , m_keyChain(keyChain)
  {
    ims = std::make_unique<IMS>(ioService, limit);
  }

  const std::shared_ptr<Data>&
  store(const Name& dataName, const uint8_t* buffer, size_t bufferSize, const Options& options);

  const std::shared_ptr<Data>&
  store(const Name& dataName, Block, const Options& options);

  /*
     If infinite, then I was thinking to spawn a detached std::thread to insert from the stream continously.
      -> I guess no locks are needed assuming that application will only use
         this function, use it once (maybe we could enforce this), and not insert anything in the IMS directly.
         Or is it better to put a lock when inserting?
      -> Are there any alternatives to using std::thread to be considered?
  */
  const std::shared_ptr<Data>&
  store(const Name& dataName, std::istream& is, const Options& options, bool infinite = false);

public:
  std::unique_ptr<IMS> ims;

private:
  Scheduler m_scheduler;
  KeyChain& m_keyChain;
};

} // namespace util
} // namespace ndn

#25

Updated by Junxiao Shi 3 months ago

To store a (potentially infinite) stream, using a secondary thread would open up a lot of thread safety issues. Instead:

  • Maintain a list of in-progress stream store operations in the segmenter.
  • Triggered by the associated boost::asio::io_context, read from stream and save into IMS.
    • In Arduino this would be called in loop() function. You'll have to find the Asio equivalent.
  • The store function should return a handle of the in-progress stream store operation, which has
    • Function to cancel the operation: stop storing more segments and close the stream)
    • Function to know progress, i.e. how many segments have been stored.
    • Signals to know progress: triggered after storing each segment, and triggered when the operation has completed and stream is closed.
    • Signal to receive errors.
  • An in-progress stream store operation should be removed from the aforementioned list when it's completed, canceled, or errored.
#26

Updated by Ashlesh Gawande 3 months ago

Thanks, not sure on two points:

  1. How to use std::istream with posix::stream_descriptor (Currently passing file descriptor instead)?
  2. Should we async_read_some or async_read?

Updated:

#include <ndn-cxx/face.hpp>
#include <ndn-cxx/security/v2/key-chain.hpp>
#include <ndn-cxx/ims/in-memory-storage.hpp>
#include <ndn-cxx/ims/in-memory-storage-fifo.hpp>
#include <ndn-cxx/ims/in-memory-storage-persistent.hpp>
#include <ndn-cxx/ims/in-memory-storage-lru.hpp>
#include <ndn-cxx/ims/in-memory-storage-lfu.hpp>
#include <ndn-cxx/util/scheduler.hpp>
#include <ndn-cxx/util/signal.hpp>
#include <ndn-cxx/detail/cancel-handle.hpp>

#include <boost/asio/posix/stream_descriptor.hpp>

namespace ndn {
namespace util {

template<typename IMS>
class Segmenter
{
  static_assert(std::is_base_of<InMemoryStorage, IMS>::value == true);

public:
  struct Options
  {
    security::SigningInfo signingInfo;
    time::milliseconds freshnessPeriod{10000};
    size_t maxSegmentSize = MAX_NDN_PACKET_SIZE >> 1;
    optional<uint64_t> segment; // return the requested segment to app so that it can answer the interest
    bool removeStale = false;   // Schedule removal of data from IMS after freshnessPeriod if true
  };

  class StreamHandle : public ndn::detail::CancelHandle
  {
  public:
    StreamHandle(boost::asio::io_service& ioService, int fd, const Segmenter::Options& options)
     : CancelHandle([this] () { stop(); })
     , m_socket(ioService, fd)
     , m_options(options)
    {
      start();
    }

    /*
      Start async_read_some to read up to maxSegmentSize
        -> If call back has an error, emit onError and stop.
        -> If no more bytes to read, emit onComplete and stop
        -> If bytes read, insert into IMS, increment m_numSegmentsStored,
           emit onSegmentInserted, call start again
    */
    void
    start();

    /*
      Emit onError and cancel async operations, close, and release fd
    */
    void
    stop();

    size_t
    getNumSegmentsStored();

  public:
    Signal<StreamHandle, Data> onSegmentInserted;
    Signal<StreamHandle, boost::system::error_code> onError;
    Signal<StreamHandle> onComplete;

  private:
    boost::asio::posix::stream_descriptor m_socket;
    Options& m_options;
    size_t m_numSegmentsStored;
  };

  /*
     Persistent does not have a limit
  */
  template<typename T = IMS,
           typename = typename std::enable_if<std::is_same<T, InMemoryStoragePersistent>::value>::type>
  Segmenter(boost::asio::io_service& ioService, KeyChain& keyChain)
   : m_scheduler(ioService)
   , m_keyChain(keyChain)
   , m_ioService(ioService)
  {
    ims = std::make_unique<IMS>(ioService);
  }

  template<typename T = IMS,
           typename = typename std::enable_if<!std::is_same<T, InMemoryStoragePersistent>::value>::type>
  Segmenter(boost::asio::io_service& ioService, KeyChain& keyChain,
            size_t limit = std::numeric_limits<size_t>::max())
   : m_scheduler(ioService)
   , m_keyChain(keyChain)
   , m_ioService(ioService)
  {
    ims = std::make_unique<IMS>(ioService, limit);
  }

  /*
   * Return the Data corresponding to options.segment
   */
  const std::shared_ptr<Data>&
  store(const Name& dataName, const uint8_t* buffer, size_t bufferSize, const Options& options);

  const std::shared_ptr<Data>&
  store(const Name& dataName, Block, const Options& options);

  /*
   * Create Handle for stream
   *   -> Insert into m_streams, capture iterator
   *   -> onError or onComplete callback, remove the handler from m_streams
   *   -> Return the handler
   *
   * Takes in a file descriptor to pass to boost::asio::posix::stream_descriptor
   * (This change because I am not sure how to pass std::istream to stream_descriptor)
   */
  const std::shared_ptr<StreamHandle>&
  store(const Name& dataName, int fd, const Options& options);

public:
  std::unique_ptr<IMS> ims;

private:
  Scheduler m_scheduler;
  KeyChain& m_keyChain;

  boost::asio::io_service& m_ioService;
  std::set<std::shared_ptr<StreamHandle>> m_streams;
};

} // namespace util
} // namespace ndn
#27

Updated by Ashlesh Gawande about 1 month ago

After discussion with Jeremy, seems easier to just take in a reference to IMS instead.

#28

Updated by Davide Pesavento 19 days ago

  • Status changed from New to In Progress
  • Target version set to v0.8
#29

Updated by Davide Pesavento 19 days ago

Junxiao Shi wrote:

To store a (potentially infinite) stream, using a secondary thread would open up a lot of thread safety issues. Instead:

  • Maintain a list of in-progress stream store operations in the segmenter.
  • Triggered by the associated boost::asio::io_context, read from stream and save into IMS. [...]
  • The store function should return a handle of the in-progress stream store operation, which has [...]
  • An in-progress stream store operation should be removed from the aforementioned list when it's completed, canceled, or errored.

This makes no sense at all, on a various different levels.
First of all, as usual, you're needlessly complicating the issue and vastly overengineering the solution. We can easily solve 80% of the current use cases with 20% of the work. Let's do that instead of conjecturing and trying to solve theoretical problems on which we don't have enough application-driven experience.
Secondly, this approach is technically flawed. Asio does not guarantee that async operations will behave correctly on file descriptors that refer to regular files (as opposed to sockets). And even if it does happen to work on a particular platform, (1) it's not portable and (2) file I/O is actually performed immediately, not asynchronously. Asio is a networking library, don't try to use it in other use cases.

I strongly recommend to keep it simple and use a std::istream. Optionally you can support an eager mode, where the entire stream until EOF is consumed and Data packets are produced immediately, and a lazy mode, where no Data packets are initially produced and the app has to explicitly call another function to generate the next n segments up to a given segment number.

The istream interface is obviously in addition to the interface(s) taking a buffer (or array of buffers) that have already been proposed.

#30

Updated by Junxiao Shi 19 days ago

I never said the stream is a socket. It could be (a wrapper of) std::istream.
I don't know enough about Asio. It's not meant to be operated like a socket, but loop() is simply a function that is invoked frequently via a timer.

The equivalent functionality in NDNts is here:
https://github.com/yoursunny/NDNts/tree/b431e05dcff54b21b272c9b8a637cc74bdca81ad/packages/segmented-object/src/serve/chunk-source
Start looking at BufferChunkSource (input from a byte array) and StreamChunkSource (input from an infinite stream) and FileChunkSource (input from a file with known size), as well as their base classes.
They all implement ChunkSource interface. If getChunk is implemented, it's lazy; otherwise, listChunks will be used, it's eager.

#31

Updated by Ashlesh Gawande 11 days ago

What ndn-cxx applications require SegmentPublisher to be implemented at this point of time?

  • For #4699, StatusDatasetContext can be implemented with Segmenter instead of replacing.
  • NLSR, PSync would be using the Segmenter directly.
  • ndnputchunks can also just replace custom in-memory storage solution with the Segmenter.
#32

Updated by Junxiao Shi 11 days ago

All three applications should use SegmentPublisher to replace their custom buffering behavior, not just Segmenter. More importantly, this allows new developers to more easily create new applications.

Also available in: Atom PDF