Commit cff4ab33 authored by Jakub Klinkovský's avatar Jakub Klinkovský
Browse files

Implemented asynchronous operations for ByteArraySynchronizer

parent b9d08707
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -210,7 +210,7 @@ if( ${WITH_CUDA} )
               set( CUDA_HOST_COMPILER ${CMAKE_CXX_COMPILER} )
            endif()
        endif()
        set(CUDA_NVCC_FLAGS ${CUDA_NVCC_FLAGS} ; -DHAVE_CUDA --expt-relaxed-constexpr --expt-extended-lambda)
        set(CUDA_NVCC_FLAGS ${CUDA_NVCC_FLAGS} ; -DHAVE_CUDA --expt-relaxed-constexpr --expt-extended-lambda --default-stream per-thread)
        # disable false compiler warnings
        #   reference for the -Xcudafe --diag_suppress and --display_error_number flags: https://stackoverflow.com/a/54142937
        #   incomplete list of tokens: http://www.ssl.berkeley.edu/~jimm/grizzly_docs/SSL/opt/intel/cc/9.0/lib/locale/en_US/mcpcom.msg
+532 −0

File added.

Preview size limit exceeded, changes collapsed.

+342 −0
Original line number Diff line number Diff line
/////////////////////////////////////////////////////////////////////
//          Copyright Yibo Zhu 2017
// Distributed under the Boost Software License, Version 1.0.
//    (See accompanying file LICENSE_1_0.txt or copy at
//          http://www.boost.org/LICENSE_1_0.txt)
/////////////////////////////////////////////////////////////////////

#pragma once

#include "utility.h"
#include <atomic>
#include <cassert>
#include <limits>

namespace async {

struct bounded_traits {
  static constexpr bool NOEXCEPT_CHECK = false; // exception handling flag
  static constexpr std::size_t CachelineSize = 64;
  static constexpr std::size_t CachelineAlignment = 16; // must not be larger than alignof(std::max_align_t), see issue #1
  using sequence_type = std::uint64_t;
};

template <typename T, typename TRAITS = bounded_traits> class bounded_queue {
private:
  static_assert(std::is_nothrow_destructible<T>::value,
                "T must be nothrow destructible");

public:
  static constexpr std::size_t cacheline_size = TRAITS::CachelineSize;
  static constexpr std::size_t cacheline_alignment = TRAITS::CachelineAlignment;
  using seq_t = typename TRAITS::sequence_type;
  explicit bounded_queue(std::size_t size)
      : fastmodulo((size > 0 && ((size & (size - 1)) == 0))),
        bitshift(fastmodulo ? getShiftBitsCount(size) : 0),
        elements(new element[size]), mask(fastmodulo ? size - 1 : 0),
        qsize(size), enqueueIx(0), dequeueIx(0) {
    assert(qsize > 0); // any size <= 0 is illegal
  }
  bounded_queue(bounded_queue const &) = delete;
  bounded_queue(bounded_queue &&) = delete;
  bounded_queue &operator=(bounded_queue const &) = delete;
  bounded_queue &operator=(bounded_queue &&) = delete;
  ~bounded_queue() { delete[] elements; }
  std::size_t size() { return qsize; }

  template <typename... Args, // NON-SAFE
            typename = typename std::enable_if<
                !TRAITS::NOEXCEPT_CHECK ||
                std::is_nothrow_constructible<T, Args &&...>::value>::type>
  inline void blocking_enqueue(Args &&... args) noexcept {
    auto enqidx = enqueueIx.fetch_add(1, std::memory_order_acq_rel);
    auto &ele = elements[index(enqidx)];
    auto enq_tkt = ticket(enqidx);
    while (enq_tkt != ele.tkt.load(std::memory_order_acquire))
      continue;
    ele.construct(std::forward<Args>(args)...);
    ele.tkt.store(enq_tkt + 1, std::memory_order_release);
  }

  template <typename... Args, // SAFE-IMPL
            typename = typename std::enable_if<
                TRAITS::NOEXCEPT_CHECK &&
                !std::is_nothrow_constructible<T, Args &&...>::value>::type>
  inline bool blocking_enqueue(Args &&... args) noexcept {
    auto enqidx = enqueueIx.fetch_add(1, std::memory_order_acq_rel);
    auto &ele = elements[index(enqidx)];
    auto enq_tkt = ticket(enqidx);
    while (enq_tkt != ele.tkt.load(std::memory_order_acquire))
      continue;
    if (ele.construct(std::forward<Args>(args)...)) {
      ele.hasdata.store(true, std::memory_order_release);
      ele.tkt.store(enq_tkt + 1, std::memory_order_release);
      return true;
    } else {
      ele.hasdata.store(false, std::memory_order_release);
      ele.tkt.store(enq_tkt + 1, std::memory_order_release);
      return false;
    }
  }

  template <typename... Args, // NON-SAFE
            typename std::enable_if<
                !TRAITS::NOEXCEPT_CHECK ||
                    std::is_nothrow_constructible<T, Args &&...>::value,
                int>::type = 0>
  inline bool enqueue(Args &&... args) noexcept {
    auto enqidx = enqueueIx.load(std::memory_order_acquire);
    for (;;) {
      auto &ele = elements[index(enqidx)];
      seq_t tkt = ele.tkt.load(std::memory_order_acquire);
      seq_t enq_tkt = ticket(enqidx);
      seq_t diff = tkt - enq_tkt;
      if (diff == 0) {
        if (enqueueIx.compare_exchange_strong(enqidx, enqidx + 1,
                                              std::memory_order_release,
                                              std::memory_order_relaxed)) {
          ele.construct(std::forward<Args>(args)...);
          ele.tkt.store(enq_tkt + 1, std::memory_order_release);
          return true;
        }
      } else if (diff >= std::numeric_limits<seq_t>::max() / 2)
        return false; // queue is full
      else
        enqidx = enqueueIx.load(std::memory_order_acquire);
    }
  }

  template <typename... Args, // SAFE-IMPL
            typename std::enable_if<
                TRAITS::NOEXCEPT_CHECK &&
                    !std::is_nothrow_constructible<T, Args &&...>::value,
                int>::type = 0>
  inline bool enqueue(Args &&... args) noexcept {
    auto enqidx = enqueueIx.load(std::memory_order_relaxed);
    for (;;) {
      auto &ele = elements[index(enqidx)];
      seq_t tkt = ele.tkt.load(std::memory_order_acquire);
      seq_t enq_tkt = ticket(enqidx);
      seq_t diff = tkt - enq_tkt;
      if (diff == 0) {
        if (enqueueIx.compare_exchange_strong(enqidx, enqidx + 1,
                                              std::memory_order_release,
                                              std::memory_order_relaxed)) {
          if (ele.construct(std::forward<Args>(args)...)) {
            ele.hasdata.store(true, std::memory_order_release);
            ele.tkt.store(enq_tkt + 1, std::memory_order_release);
            return true;
          } else {
            ele.hasdata.store(false, std::memory_order_release);
            ele.tkt.store(enq_tkt + 1, std::memory_order_release);
            return false;
          }
        }
      } else if (diff >= std::numeric_limits<seq_t>::max() / 2)
        return false; // queue is full
      else
        enqidx = enqueueIx.load(std::memory_order_acquire);
    }
  }

  template <typename U = T, // NON-SAFE
            typename = typename std::enable_if<
                !TRAITS::NOEXCEPT_CHECK ||
                std::is_nothrow_constructible<U>::value>::type>
  inline void blocking_dequeue(U &data) noexcept {
    auto deqidx = dequeueIx.fetch_add(1, std::memory_order_acq_rel);
    auto &ele = elements[index(deqidx)];
    seq_t deq_tkt = ticket(deqidx) + 1;
    while (deq_tkt != ele.tkt.load(std::memory_order_acquire))
      continue;
    ele.move(data);
    ele.tkt.store(deq_tkt + 1, std::memory_order_release);
  }

  template <typename U = T, // SAFE-IMPL
            typename = typename std::enable_if<
                TRAITS::NOEXCEPT_CHECK &&
                !std::is_nothrow_constructible<U>::value>::type>
  inline bool blocking_dequeue(U &data) noexcept {
    auto deqidx = dequeueIx.fetch_add(1, std::memory_order_acq_rel);
    auto &ele = elements[index(deqidx)];
    seq_t deq_tkt = ticket(deqidx) + 1;
    while (deq_tkt != ele.tkt.load(std::memory_order_acquire))
      continue;
    if (ele.hasdata.load(std::memory_order_acquire)) {
      ele.move(data);
      ele.tkt.store(deq_tkt + 1, std::memory_order_release);
      return true;
    } else {
      ele.tkt.store(deq_tkt + 1, std::memory_order_release);
      return false;
    }
  }

  template <typename U = T, // NON-SAFE
            typename std::enable_if<!TRAITS::NOEXCEPT_CHECK ||
                                        std::is_nothrow_constructible<U>::value,
                                    int>::type = 0>
  inline bool dequeue(U &data) {

    auto deqidx = dequeueIx.load(std::memory_order_acquire);
    for (;;) {
      auto &ele = elements[index(deqidx)];
      seq_t tkt = ele.tkt.load(std::memory_order_acquire);
      seq_t deq_tkt = ticket(deqidx) + 1;
      seq_t diff = tkt - deq_tkt;
      if (diff == 0) {
        if (dequeueIx.compare_exchange_strong(deqidx, deqidx + 1,
                                              std::memory_order_acq_rel,
                                              std::memory_order_relaxed)) {
          ele.move(data);
          ele.tkt.store(deq_tkt + 1, std::memory_order_release);
          return true;
        }
      } else if (diff >= std::numeric_limits<seq_t>::max() / 2)
        return false; // queue is empty
      else {

        deqidx = dequeueIx.load(std::memory_order_acquire);
      }
    }
  }

  template <
      typename U = T, // SAFE-IMPL
      typename std::enable_if<TRAITS::NOEXCEPT_CHECK &&
                                  !std::is_nothrow_constructible<U>::value,
                              int>::type = 0>
  inline bool
  dequeue(U &data) // false could be queue is empty, or skip an invalid element
  {

    auto deqidx = dequeueIx.load(std::memory_order_acquire);
    for (;;) {
      auto &ele = elements[index(deqidx)];
      seq_t tkt = ele.tkt.load(std::memory_order_acquire);
      seq_t deq_tkt = ticket(deqidx) + 1;
      seq_t diff = tkt - deq_tkt;
      if (diff == 0) {
        if (dequeueIx.compare_exchange_strong(deqidx, deqidx + 1,
                                              std::memory_order_acq_rel,
                                              std::memory_order_relaxed)) {
          if (ele.hasdata.load(std::memory_order_acquire)) {
            ele.move(data);
            ele.tkt.store(deq_tkt + 1, std::memory_order_release);
            return true;
          } else {
            ele.tkt.store(deq_tkt + 1, std::memory_order_release);
            return false;
          }
        }
      } else if (diff >= std::numeric_limits<seq_t>::max() / 2)
        return false; // queue is empty
      else {
        deqidx = dequeueIx.load(std::memory_order_acquire);
      }
    }
  }

private:
  inline seq_t index(seq_t const seq) {
    if (fastmodulo)
      return seq & mask;
    else
      return seq >= qsize ? seq % qsize : seq;
  }

  inline seq_t ticket(seq_t const seq) {
    if (fastmodulo)
      return (seq >> bitshift) << 1;
    else
      return (seq / static_cast<seq_t>(qsize)) << 1;
  }
  //TODO& Review: replace the following with c++ concepts
  template <typename U = T, typename Enable = void> struct checkdata {};

  template <typename U>
  struct checkdata<U, typename std::enable_if<
                          !TRAITS::NOEXCEPT_CHECK ||
                          std::is_nothrow_constructible<U>::value>::type> {};

  template <typename U>
  struct checkdata<U, typename std::enable_if<
                          TRAITS::NOEXCEPT_CHECK &&
                          !std::is_nothrow_constructible<U>::value>::type> {
    checkdata() : hasdata(false) {}
    std::atomic<bool> hasdata;
  };

  struct element : public checkdata<T> {
    element() : tkt(0) {}
    ~element() {
      if (tkt & 1) // enqueue op visited
        destruct();
    }

    template <typename... Args, // NON-SAFE
              typename = typename std::enable_if<
                  !TRAITS::NOEXCEPT_CHECK ||
                  std::is_nothrow_constructible<T, Args &&...>::value>::type>
    inline void construct(Args &&... args) noexcept {
      new (&storage) T(std::forward<Args>(args)...);
    }

    template <typename... Args, // SAFE-IMPL
              typename = typename std::enable_if<
                  TRAITS::NOEXCEPT_CHECK &&
                  !std::is_nothrow_constructible<T, Args &&...>::value>::type>
    inline bool construct(Args &&... args) noexcept {
      try {
        new (&storage) T(std::forward<Args>(args)...);
      } catch (...) {
        return false;
      }
      return true;
    }

    inline void destruct() noexcept { reinterpret_cast<T *>(&storage)->~T(); }

    inline T *getptr() { return reinterpret_cast<T *>(&storage); }

    template <
        typename U = T, // NON-SAFE
        typename std::enable_if<!TRAITS::NOEXCEPT_CHECK ||
                                    std::is_nothrow_move_assignable<U>::value,
                                int>::type = 0>
    inline void move(U &data) {
      data = std::move(*getptr());
      destruct();
    }

    template <
        typename U = T, // SAFE-IMPL
        typename std::enable_if<TRAITS::NOEXCEPT_CHECK &&
                                    !std::is_nothrow_move_assignable<U>::value,
                                int>::type = 0>
    inline void move(U &data) {
      try {
        data = std::move(*getptr());
      } catch (...) {
      }
      destruct();
    }

    std::atomic<seq_t> tkt;
    typename std::aligned_storage<sizeof(T), alignof(T)>::type storage;
    std::atomic<bool> hasdata;
  };

  bool const fastmodulo;   // true if qsize is power of 2
  int const bitshift;      // used if fastmodulo is true
  element *const elements; // pointer to buffer
  std::size_t const mask;       // used if fastmodulo is true
  std::size_t const qsize;      // queue size
  alignas(cacheline_alignment) char cacheline_padding1[cacheline_size];
  alignas(cacheline_alignment) std::atomic<seq_t> enqueueIx;
  alignas(cacheline_alignment) char cacheline_padding2[cacheline_size];
  alignas(cacheline_alignment) std::atomic<seq_t> dequeueIx;
  alignas(cacheline_alignment) char cacheline_padding3[cacheline_size];
};
} // namespace async
+429 −0

File added.

Preview size limit exceeded, changes collapsed.

+192 −0
Original line number Diff line number Diff line
/////////////////////////////////////////////////////////////////////
//          Copyright Yibo Zhu 2017
// Distributed under the Boost Software License, Version 1.0.
//    (See accompanying file LICENSE_1_0.txt or copy at
//          http://www.boost.org/LICENSE_1_0.txt)
/////////////////////////////////////////////////////////////////////
#pragma once
#include "queue.h"
#include <atomic>
#include <functional>
#include <future>
#include <iterator>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
namespace async {
// thread pool to execute functions, functors, lamdas asynchronously,
// default poolsize = machine's logical CPU cores/threads
class threadpool final {
public:
  static int defaultpoolsize() { return std::thread::hardware_concurrency(); }

  threadpool(int poolsize = defaultpoolsize())
      : idlecount(0), conflag(false) {
    configurepool(poolsize);
  }

  threadpool(const threadpool &) = delete;
  threadpool(threadpool &&) = delete;
  threadpool &operator=(const threadpool &) = delete;
  threadpool &operator=(threadpool &&) = delete;

  ~threadpool() { cleanup(); }

  inline std::size_t size() {
    std::lock_guard<std::mutex> lg(poolmux);
    return threads.size();
  }

  inline int idlesize() { return idlecount; }

  // can be called to resize the pool at any time after construction and before
  // destruction, recommand to be called from main thread or manager thread even
  // though it is thread-safe
  void configurepool(std::size_t poolsize) {
    std::unique_lock<std::mutex> veclk(poolmux);
    auto currentsize = threads.size();
    if (currentsize < poolsize) { // expand the pool
      for (std::size_t i = currentsize; i < poolsize; i++) {
        tpstops.emplace_back(addthread());
      }
    } else if (currentsize > poolsize) { // shrink the pool
      std::vector<std::unique_ptr<std::thread>> dumpthreads;
      std::vector<std::atomic<bool> *> dumpthreadstops;
      std::move(threads.begin() + poolsize, threads.end(),
                std::back_inserter(dumpthreads));
      std::move(tpstops.begin() + poolsize, tpstops.end(),
                std::back_inserter(dumpthreadstops));
      tpstops.resize(poolsize);
      threads.resize(poolsize);
      veclk.unlock();
      for (auto &a : dumpthreadstops) {
        *a = true;
      }
      for (auto &t : dumpthreads) {
        t->detach();
      }
      {
        std::unique_lock<std::mutex> lk(qcvmux); // suspended threads to quit
        qcv.notify_all();
      }
    }
  }

  template <typename Func, typename... Args>
  inline auto post(Func &&func, Args &&... args)
#if ((defined(__clang__) || defined(__GNUC__)) && __cplusplus <= 201103L) ||   \
    (defined(_MSC_VER) && _MSC_VER <= 1800)
      -> std::future<typename std::result_of<Func(Args...)>::type>
#endif
  { // TODO: replace result_of with invoke_result_t when migrate to c++17
    auto taskptr = std::make_shared<
        std::packaged_task<typename std::result_of<Func(Args...)>::type()>>(
        std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
    taskqueue.enqueue([taskptr]() { (*taskptr)(); });
    {
      std::lock_guard<std::mutex> lg(qcvmux);
      conflag = true;
    }
    qcv.notify_one();
    return taskptr->get_future();
  }

  template <typename Func>
  inline auto post(Func &&func)
#if ((defined(__clang__) || defined(__GNUC__)) && __cplusplus <= 201103L) ||   \
    (defined(_MSC_VER) && _MSC_VER <= 1800)
      -> std::future<typename std::result_of<Func()>::type>
#endif
  { // a special case for func() type without any parameters, might be
    // removed later
    auto taskptr = std::make_shared<
        std::packaged_task<typename std::result_of<Func()>::type()>>(
        std::forward<Func>(func));
    taskqueue.enqueue([taskptr]() { (*taskptr)(); });
    {
      std::lock_guard<std::mutex> lg(qcvmux);
      conflag = true;
    }
    qcv.notify_one();
    return taskptr->get_future();
  }

private:
  struct executor {
    executor(std::unique_ptr<std::atomic<bool>> &&ptr, threadpool &pool)
        : stop(std::move(ptr)), thpool(pool) {}
    void operator()() {
      while (!*stop) {
        if (!thpool.executetask_in_loop(*stop)) {
          return; // signaled to quit
        }
        thpool.wait_for_task(*stop); // wait for new task
      }
    }

  private:
    std::unique_ptr<std::atomic<bool>> stop;
    threadpool &thpool;
  };

  std::atomic<bool> *addthread() {
    auto stopuniptr = std::make_unique<std::atomic<bool>>(false);
    auto stoprawptr = stopuniptr.get();
    threads.emplace_back(
        std::make_unique<std::thread>(executor(std::move(stopuniptr), *this)));
    return stoprawptr;
  }

  void cleanup() { // make sure no more tasks being pushed to the taskqueue
    {
      std::lock_guard<std::mutex> lk(qcvmux);
      qcv.notify_all(); // let running thread drain the task queue? no need,
                        // should be removed
    }
    for (auto &stop : tpstops) {
      *stop = true; // stop signaled
    }
    {
      std::lock_guard<std::mutex> lk(qcvmux);
      qcv.notify_all(); // notify again
    }
    for (auto &thread : threads) {
      if (thread->joinable())
        thread->join();
    }
    threads.clear();
    tpstops.clear();
  }

  inline void wait_for_task(std::atomic<bool> const &stop) {
    idlecount.fetch_add(1, std::memory_order_relaxed);
    {
      std::unique_lock<std::mutex> lk(qcvmux);
      qcv.wait(lk, [&]() {
        return conflag || stop.load(std::memory_order_acquire);
      }); //memory_oder can be removed
      conflag = false;
    }
    idlecount.fetch_sub(1, std::memory_order_relaxed);
  }

  inline bool executetask_in_loop(std::atomic<bool> const &stop) {
    std::function<void()> func;
    for (; taskqueue.dequeue(func);) {
      func();
      if (stop) // stop is signaled
        return false;
    }
    return true;
  }

  std::vector<std::unique_ptr<std::thread>> threads;
  std::vector<std::atomic<bool> *> tpstops; // threads terminate flags
  async::queue<std::function<void()>> taskqueue;
  std::atomic<int> idlecount; // idle thread count
  std::mutex qcvmux, poolmux;
  std::condition_variable qcv;
  bool conflag; // continue flag for cv
};
} // namespace async
Loading