Kea 3.2.0-git
thread_pool.h
Go to the documentation of this file.
1// Copyright (C) 2018-2026 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#ifndef THREAD_POOL_H
8#define THREAD_POOL_H
9
11#include <boost/make_shared.hpp>
12#include <boost/shared_ptr.hpp>
13
14#include <atomic>
15#include <chrono>
16#include <cmath>
17#include <condition_variable>
18#include <list>
19#include <mutex>
20#include <queue>
21#include <thread>
22
23#include <signal.h>
24
25namespace isc {
26namespace util {
27
33template <typename WorkItem, typename Container = std::deque<boost::shared_ptr<WorkItem>>>
34struct ThreadPool {
36 static const double CEXP10;
37
39 static const double CEXP100;
40
42 static const double CEXP1000;
43
45 typedef typename boost::shared_ptr<WorkItem> WorkItemPtr;
46
49 }
50
56 // cppcheck-suppress throwInNoexceptFunction
58 reset();
59 }
60
65 void reset() {
66 stopInternal();
67 queue_.clear();
68 }
69
77 void start(uint32_t thread_count) {
78 if (!thread_count) {
79 isc_throw(InvalidParameter, "thread count is 0");
80 }
81 if (queue_.enabled()) {
82 isc_throw(InvalidOperation, "thread pool already started");
83 }
84 startInternal(thread_count);
85 }
86
90 void stop() {
91 if (!queue_.enabled()) {
92 isc_throw(InvalidOperation, "thread pool already stopped");
93 }
94 stopInternal();
95 }
96
102 bool add(const WorkItemPtr& item) {
103 return (queue_.pushBack(item));
104 }
105
110 bool addFront(const WorkItemPtr& item) {
111 return (queue_.pushFront(item));
112 }
113
117 size_t count() {
118 return (queue_.count());
119 }
120
125 void wait() {
126 auto id = std::this_thread::get_id();
127 if (checkThreadId(id)) {
128 isc_throw(MultiThreadingInvalidOperation, "thread pool wait called by worker thread");
129 }
130 queue_.wait();
131 }
132
140 bool wait(uint32_t seconds) {
141 auto id = std::this_thread::get_id();
142 if (checkThreadId(id)) {
143 isc_throw(MultiThreadingInvalidOperation, "thread pool wait with timeout called by worker thread");
144 }
145 return (queue_.wait(seconds));
146 }
147
153 void pause(bool wait = true) {
154 queue_.pause(wait);
155 }
156
160 void resume() {
161 queue_.resume();
162 }
163
170 bool enabled() {
171 return (queue_.enabled());
172 }
173
180 bool paused() {
181 return (queue_.paused());
182 }
183
187 void setMaxQueueSize(size_t max_queue_size) {
188 queue_.setMaxQueueSize(max_queue_size);
189 }
190
195 return (queue_.getMaxQueueSize());
196 }
197
201 size_t size() {
202 return (threads_.size());
203 }
204
210 double getQueueStat(size_t which) {
211 return (queue_.getQueueStat(which));
212 }
213
214private:
219 void startInternal(uint32_t thread_count) {
220 // Protect us against signals
221 sigset_t sset;
222 sigset_t osset;
223 sigemptyset(&sset);
224 sigaddset(&sset, SIGCHLD);
225 sigaddset(&sset, SIGINT);
226 sigaddset(&sset, SIGHUP);
227 sigaddset(&sset, SIGTERM);
228 pthread_sigmask(SIG_BLOCK, &sset, &osset);
229 queue_.enable(thread_count);
230 try {
231 for (uint32_t i = 0; i < thread_count; ++i) {
232 threads_.push_back(boost::make_shared<std::thread>(&ThreadPool::run, this));
233 }
234 } catch (...) {
235 // Restore signal mask.
236 pthread_sigmask(SIG_SETMASK, &osset, 0);
237 throw;
238 }
239 // Restore signal mask.
240 pthread_sigmask(SIG_SETMASK, &osset, 0);
241 }
242
244 void stopInternal() {
245 auto id = std::this_thread::get_id();
246 if (checkThreadId(id)) {
247 isc_throw(MultiThreadingInvalidOperation, "thread pool stop called by worker thread");
248 }
249 queue_.disable();
250 for (auto const& thread : threads_) {
251 thread->join();
252 }
253 threads_.clear();
254 }
255
259 bool checkThreadId(std::thread::id id) {
260 for (auto const& thread : threads_) {
261 if (id == thread->get_id()) {
262 return (true);
263 }
264 }
265 return (false);
266 }
267
279 template <typename Item, typename QueueContainer = std::queue<Item>>
280 struct ThreadPoolQueue {
284 ThreadPoolQueue()
285 : enabled_(false), paused_(false), max_queue_size_(0), working_(0),
286 unavailable_(0), stat10(0.), stat100(0.), stat1000(0.) {
287 }
288
292 ~ThreadPoolQueue() {
293 disable();
294 clear();
295 }
296
298 void registerThread() {
299 std::lock_guard<std::mutex> lock(mutex_);
300 ++working_;
301 --unavailable_;
302 }
303
305 void unregisterThread() {
306 std::lock_guard<std::mutex> lock(mutex_);
307 --working_;
308 ++unavailable_;
309 }
310
314 void setMaxQueueSize(size_t max_queue_size) {
315 std::lock_guard<std::mutex> lock(mutex_);
316 max_queue_size_ = max_queue_size;
317 }
318
322 size_t getMaxQueueSize() {
323 std::lock_guard<std::mutex> lock(mutex_);
324 return (max_queue_size_);
325 }
326
338 bool pushBack(const Item& item) {
339 bool ret = true;
340 if (!item) {
341 return (ret);
342 }
343 {
344 std::lock_guard<std::mutex> lock(mutex_);
345 if (max_queue_size_ != 0) {
346 while (queue_.size() >= max_queue_size_) {
347 queue_.pop_front();
348 ret = false;
349 }
350 }
351 queue_.push_back(item);
352 }
353 // Notify pop function so that it can effectively remove a work item.
354 cv_.notify_one();
355 return (ret);
356 }
357
365 bool pushFront(const Item& item) {
366 if (!item) {
367 return (true);
368 }
369 {
370 std::lock_guard<std::mutex> lock(mutex_);
371 if ((max_queue_size_ != 0) &&
372 (queue_.size() >= max_queue_size_)) {
373 return (false);
374 }
375 queue_.push_front(item);
376 }
377 // Notify pop function so that it can effectively remove a work item.
378 cv_.notify_one();
379 return (true);
380 }
381
395 Item pop() {
396 std::unique_lock<std::mutex> lock(mutex_);
397 --working_;
398 // Signal thread waiting for threads to pause.
399 if (paused_ && working_ == 0 && unavailable_ == 0) {
400 wait_threads_cv_.notify_all();
401 }
402 // Signal thread waiting for tasks to finish.
403 if (working_ == 0 && queue_.empty()) {
404 wait_cv_.notify_all();
405 }
406 // Wait for push or disable functions.
407 cv_.wait(lock, [&]() {return (!enabled_ || (!queue_.empty() && !paused_));});
408 ++working_;
409 if (!enabled_) {
410 return (Item());
411 }
412 size_t length = queue_.size();
413 stat10 = stat10 * CEXP10 + (1 - CEXP10) * length;
414 stat100 = stat100 * CEXP100 + (1 - CEXP100) * length;
415 stat1000 = stat1000 * CEXP1000 + (1 - CEXP1000) * length;
416 Item item = queue_.front();
417 queue_.pop_front();
418 return (item);
419 }
420
426 size_t count() {
427 std::lock_guard<std::mutex> lock(mutex_);
428 return (queue_.size());
429 }
430
435 void wait() {
436 std::unique_lock<std::mutex> lock(mutex_);
437 // Wait for any item or for working threads to finish.
438 wait_cv_.wait(lock, [&]() {return (working_ == 0 && queue_.empty());});
439 }
440
448 bool wait(uint32_t seconds) {
449 std::unique_lock<std::mutex> lock(mutex_);
450 // Wait for any item or for working threads to finish.
451 bool ret = wait_cv_.wait_for(lock, std::chrono::seconds(seconds),
452 [&]() {return (working_ == 0 && queue_.empty());});
453 return (ret);
454 }
455
461 void pause(bool wait) {
462 std::unique_lock<std::mutex> lock(mutex_);
463 paused_ = true;
464 if (wait) {
465 // Wait for working threads to finish.
466 wait_threads_cv_.wait(lock, [&]() {return (working_ == 0 && unavailable_ == 0);});
467 }
468 }
469
473 void resume() {
474 std::unique_lock<std::mutex> lock(mutex_);
475 paused_ = false;
476 cv_.notify_all();
477 }
478
484 double getQueueStat(size_t which) {
485 std::lock_guard<std::mutex> lock(mutex_);
486 switch (which) {
487 case 10:
488 return (stat10);
489 case 100:
490 return (stat100);
491 case 1000:
492 return (stat1000);
493 default:
494 isc_throw(InvalidParameter, "supported statistic for "
495 << "10/100/1000 only, not " << which);
496 }
497 }
498
502 void clear() {
503 std::lock_guard<std::mutex> lock(mutex_);
504 queue_ = QueueContainer();
505 }
506
512 void enable(uint32_t thread_count) {
513 std::lock_guard<std::mutex> lock(mutex_);
514 enabled_ = true;
515 unavailable_ = thread_count;
516 }
517
521 void disable() {
522 {
523 std::lock_guard<std::mutex> lock(mutex_);
524 paused_ = false;
525 enabled_ = false;
526 }
527 // Notify pop so that it can exit.
528 cv_.notify_all();
529 }
530
537 bool enabled() {
538 return (enabled_);
539 }
540
547 bool paused() {
548 return (paused_);
549 }
550
551 private:
553 QueueContainer queue_;
554
556 std::mutex mutex_;
557
559 std::condition_variable cv_;
560
562 std::condition_variable wait_cv_;
563
565 std::condition_variable wait_threads_cv_;
566
570 std::atomic<bool> enabled_;
571
575 std::atomic<bool> paused_;
576
579 size_t max_queue_size_;
580
582 uint32_t working_;
583
585 uint32_t unavailable_;
586
588 double stat10;
589
591 double stat100;
592
594 double stat1000;
595 };
596
598 void run() {
599 queue_.registerThread();
600 for (bool work = true; work; work = queue_.enabled()) {
601 WorkItemPtr item = queue_.pop();
602 if (item) {
603 try {
604 (*item)();
605 } catch (...) {
606 // catch all exceptions
607 }
608 }
609 }
610 queue_.unregisterThread();
611 }
612
614 std::vector<boost::shared_ptr<std::thread>> threads_;
615
617 ThreadPoolQueue<WorkItemPtr, Container> queue_;
618};
619
621template <typename W, typename C>
622const double ThreadPool<W, C>::CEXP10 = std::exp(-.1);
623
625template <typename W, typename C>
626const double ThreadPool<W, C>::CEXP100 = std::exp(-.01);
627
629template <typename W, typename C>
630const double ThreadPool<W, C>::CEXP1000 = std::exp(-.001);
631
632} // namespace util
633} // namespace isc
634
635#endif // THREAD_POOL_H
A generic exception that is thrown if a function is called in a prohibited way.
A generic exception that is thrown if a parameter given to a method or function is considered invalid...
Exception thrown when a worker thread is trying to stop or pause the respective thread pool (which wo...
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
Defines the logger used by the top-level component of kea-lfc.
bool wait(uint32_t seconds)
wait for items to be processed or return after timeout
static const double CEXP10
Rounding value for 10 packet statistic.
Definition thread_pool.h:36
void setMaxQueueSize(size_t max_queue_size)
set maximum number of work items in the queue
void reset()
reset the thread pool stopping threads and clearing the internal queue
Definition thread_pool.h:65
void start(uint32_t thread_count)
start all the threads
Definition thread_pool.h:77
bool enabled()
return the enable state of the queue
bool paused()
return the pause state of the queue
static const double CEXP1000
Rounding value for 1000 packet statistic.
Definition thread_pool.h:42
boost::shared_ptr< WorkItem > WorkItemPtr
Type of shared pointers to work items.
Definition thread_pool.h:45
size_t getMaxQueueSize()
get maximum number of work items in the queue
double getQueueStat(size_t which)
get queue length statistic
~ThreadPool()
Destructor.
Definition thread_pool.h:57
bool add(const WorkItemPtr &item)
add a work item to the thread pool
size_t size()
size number of thread pool threads
void resume()
resume threads
void stop()
stop all the threads
Definition thread_pool.h:90
void wait()
wait for current items to be processed
ThreadPool()
Constructor.
Definition thread_pool.h:48
static const double CEXP100
Rounding value for 100 packet statistic.
Definition thread_pool.h:39
void pause(bool wait=true)
pause threads
bool addFront(const WorkItemPtr &item)
add a work item to the thread pool at front
size_t count()
count number of work items in the queue