11 #include <boost/make_shared.hpp>
12 #include <boost/shared_ptr.hpp>
17 #include <condition_variable>
33 template <
typename WorkItem,
typename Container = std::deque<boost::shared_ptr<WorkItem>>>
72 void start(uint32_t thread_count) {
76 if (queue_.enabled()) {
79 startInternal(thread_count);
86 if (!queue_.enabled()) {
97 bool add(
const WorkItemPtr& item) {
98 return (queue_.pushBack(item));
106 return (queue_.pushFront(item));
113 return (queue_.count());
121 auto id = std::this_thread::get_id();
122 if (checkThreadId(
id)) {
136 auto id = std::this_thread::get_id();
137 if (checkThreadId(
id)) {
140 return (queue_.wait(seconds));
147 queue_.setMaxQueueSize(max_queue_size);
154 return (queue_.getMaxQueueSize());
161 return (threads_.size());
170 return (queue_.getQueueStat(which));
178 void startInternal(uint32_t thread_count) {
183 sigaddset(&sset, SIGCHLD);
184 sigaddset(&sset, SIGINT);
185 sigaddset(&sset, SIGHUP);
186 sigaddset(&sset, SIGTERM);
187 pthread_sigmask(SIG_BLOCK, &sset, &osset);
188 queue_.enable(thread_count);
190 for (uint32_t i = 0; i < thread_count; ++i) {
191 threads_.push_back(boost::make_shared<std::thread>(&ThreadPool::run,
this));
195 pthread_sigmask(SIG_SETMASK, &osset, 0);
199 pthread_sigmask(SIG_SETMASK, &osset, 0);
203 void stopInternal() {
204 auto id = std::this_thread::get_id();
205 if (checkThreadId(
id)) {
206 isc_throw(InvalidOperation,
"thread pool stop called by owned thread");
209 for (
auto thread : threads_) {
218 bool checkThreadId(std::thread::id
id) {
219 for (
auto thread : threads_) {
220 if (
id == thread->get_id()) {
238 template <
typename Item,
typename QueueContainer = std::queue<Item>>
239 struct ThreadPoolQueue {
244 : enabled_(false), max_queue_size_(0), working_(0),
245 stat10(0.), stat100(0.), stat1000(0.) {
260 std::lock_guard<std::mutex> lock(mutex_);
261 max_queue_size_ = max_queue_size;
268 std::lock_guard<std::mutex> lock(mutex_);
269 return (max_queue_size_);
283 bool pushBack(
const Item& item) {
289 std::lock_guard<std::mutex> lock(mutex_);
290 if (max_queue_size_ != 0) {
291 while (queue_.size() >= max_queue_size_) {
296 queue_.push_back(item);
310 bool pushFront(
const Item& item) {
315 std::lock_guard<std::mutex> lock(mutex_);
316 if ((max_queue_size_ != 0) &&
317 (queue_.size() >= max_queue_size_)) {
320 queue_.push_front(item);
339 std::unique_lock<std::mutex> lock(mutex_);
342 if (working_ == 0 && queue_.empty()) {
343 wait_cv_.notify_all();
345 cv_.wait(lock, [&]() {
return (!enabled_ || !queue_.empty());});
350 size_t length = queue_.size();
351 stat10 = stat10 * CEXP10 + (1 -
CEXP10) * length;
352 stat100 = stat100 * CEXP100 + (1 -
CEXP100) * length;
353 stat1000 = stat1000 * CEXP1000 + (1 -
CEXP1000) * length;
354 Item item = queue_.front();
365 std::lock_guard<std::mutex> lock(mutex_);
366 return (queue_.size());
374 std::unique_lock<std::mutex> lock(mutex_);
376 wait_cv_.wait(lock, [&]() {
return (working_ == 0 && queue_.empty());});
386 bool wait(uint32_t seconds) {
387 std::unique_lock<std::mutex> lock(mutex_);
389 bool ret = wait_cv_.wait_for(lock, std::chrono::seconds(seconds),
390 [&]() {
return (working_ == 0 && queue_.empty());});
400 std::lock_guard<std::mutex> lock(mutex_);
409 isc_throw(InvalidParameter,
"supported statistic for "
410 <<
"10/100/1000 only, not " << which);
418 std::lock_guard<std::mutex> lock(mutex_);
419 queue_ = QueueContainer();
421 wait_cv_.notify_all();
429 void enable(uint32_t thread_count) {
430 std::lock_guard<std::mutex> lock(mutex_);
432 working_ = thread_count;
440 std::lock_guard<std::mutex> lock(mutex_);
458 QueueContainer queue_;
464 std::condition_variable cv_;
467 std::condition_variable wait_cv_;
472 std::atomic<bool> enabled_;
476 size_t max_queue_size_;
493 while (queue_.enabled()) {
494 WorkItemPtr item = queue_.pop();
506 std::vector<boost::shared_ptr<std::thread>> threads_;
509 ThreadPoolQueue<WorkItemPtr, Container> queue_;
513 template <
typename W,
typename C>
514 const double ThreadPool<W, C>::CEXP10 = std::exp(-.1);
517 template <
typename W,
typename C>
518 const double ThreadPool<W, C>::CEXP100 = std::exp(-.01);
521 template <
typename W,
typename C>
522 const double ThreadPool<W, C>::CEXP1000 = std::exp(-.001);
527 #endif // THREAD_POOL_H
boost::shared_ptr< WorkItem > WorkItemPtr
Type of shared pointers to work items.
size_t count()
count number of work items in the queue
A generic exception that is thrown if a parameter given to a method or function is considered invalid...
static const double CEXP10
Rounding value for 10 packet statistic.
void start(uint32_t thread_count)
start all the threads
void setMaxQueueSize(size_t max_queue_size)
set maximum number of work items in the queue
bool addFront(const WorkItemPtr &item)
add a work item to the thread pool at front
bool wait(uint32_t seconds)
wait for items to be processed or return after timeout
double getQueueStat(size_t which)
get queue length statistic
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
static const double CEXP1000
Rounding value for 1000 packet statistic.
size_t size()
size number of thread pool threads
bool add(const WorkItemPtr &item)
add a work item to the thread pool
Defines the logger used by the top-level component of kea-dhcp-ddns.
static const double CEXP100
Rounding value for 100 packet statistic.
size_t getMaxQueueSize()
get maximum number of work items in the queue
A generic exception that is thrown if a function is called in a prohibited way.
void stop()
stop all the threads
void wait()
wait for current items to be processed
void reset()
reset the thread pool stopping threads and clearing the internal queue
Defines a thread pool which uses a thread pool queue for managing work items.