Kea  1.9.9-git
multi_threading_mgr.cc
Go to the documentation of this file.
1 // Copyright (C) 2019-2021 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
8 
10 
11 namespace isc {
12 namespace util {
13 
15  : enabled_(false), critical_section_count_(0), thread_pool_size_(0) {
16 }
17 
19 }
20 
23  static MultiThreadingMgr manager;
24  return (manager);
25 }
26 
27 bool
29  return (enabled_);
30 }
31 
32 void
34  enabled_ = enabled;
35 }
36 
37 void
39  stopProcessing();
40  ++critical_section_count_;
41 }
42 
43 void
45  if (critical_section_count_ == 0) {
46  isc_throw(InvalidOperation, "invalid negative value for override");
47  }
48  --critical_section_count_;
49  startProcessing();
50 }
51 
52 bool
54  return (critical_section_count_ != 0);
55 }
56 
59  return thread_pool_;
60 }
61 
62 uint32_t
64  return (thread_pool_size_);
65 }
66 
67 void
69  thread_pool_size_ = size;
70 }
71 
72 uint32_t
74  return (thread_pool_.getMaxQueueSize());
75 }
76 
77 void
79  thread_pool_.setMaxQueueSize(size);
80 }
81 
82 uint32_t
84  return (std::thread::hardware_concurrency());
85 }
86 
87 void
88 MultiThreadingMgr::apply(bool enabled, uint32_t thread_count, uint32_t queue_size) {
89  // check the enabled flag
90  if (enabled) {
91  // check for auto scaling (enabled flag true but thread_count 0)
92  if (!thread_count) {
93  // might also return 0
94  thread_count = MultiThreadingMgr::detectThreadCount();
95  }
96  } else {
97  thread_count = 0;
98  queue_size = 0;
99  }
100  // check enabled flag and explicit number of threads or system supports
101  // hardware concurrency
102  if (thread_count) {
103  if (thread_pool_.size()) {
104  thread_pool_.stop();
105  }
106  setThreadPoolSize(thread_count);
107  setPacketQueueSize(queue_size);
108  setMode(true);
109  if (!isInCriticalSection()) {
110  thread_pool_.start(thread_count);
111  }
112  } else {
114  thread_pool_.reset();
115  setMode(false);
116  setThreadPoolSize(thread_count);
117  setPacketQueueSize(queue_size);
118  }
119 }
120 
121 void
122 MultiThreadingMgr::stopProcessing() {
123  if (getMode() && !isInCriticalSection()) {
124  if (getThreadPoolSize()) {
125  thread_pool_.stop();
126  }
127 
128  for (const auto& cb : cs_callbacks_.getCallbackPairs()) {
129  try {
130  (cb.entry_cb_)();
131  } catch (...) {
132  // We can't log it and throwing could be chaos.
133  // We'll swallow it and tell people their callbacks
134  // must be exception-proof
135  }
136  }
137  }
138 }
139 
140 void
141 MultiThreadingMgr::startProcessing() {
142  if (getMode() && !isInCriticalSection()) {
143  if (getThreadPoolSize()) {
144  thread_pool_.start(getThreadPoolSize());
145  }
146 
147  for (const auto& cb : cs_callbacks_.getCallbackPairs()) {
148  try {
149  (cb.exit_cb_)();
150  } catch (...) {
151  // We can't log it and throwing could be chaos.
152  // We'll swallow it and tell people their callbacks
153  // must be exception-proof
154  }
155  }
156  }
157 }
158 
159 void
161  const CSCallbackPair::Callback& entry_cb,
162  const CSCallbackPair::Callback& exit_cb) {
163  cs_callbacks_.addCallbackPair(name, entry_cb, exit_cb);
164 }
165 
166 void
168  cs_callbacks_.removeCallbackPair(name);
169 }
170 
171 void
173  cs_callbacks_.removeAll();
174 }
175 
178 }
179 
182 }
183 
184 void
185 CSCallbackPairList::addCallbackPair(const std::string& name,
186  const CSCallbackPair::Callback& entry_cb,
187  const CSCallbackPair::Callback& exit_cb) {
188  if (name.empty()) {
189  isc_throw(BadValue, "CSCallbackPairList - name cannot be empty");
190  }
191 
192  if (!entry_cb) {
193  isc_throw(BadValue, "CSCallbackPairList - entry callback for " << name
194  << " cannot be empty");
195  }
196 
197  if (!exit_cb) {
198  isc_throw(BadValue, "CSCallbackPairList - exit callback for " << name
199  << " cannot be empty");
200  }
201 
202  for (auto const& callback : cb_pairs_) {
203  if (callback.name_ == name) {
204  isc_throw(BadValue, "CSCallbackPairList - callbacks for " << name
205  << " already exist");
206  }
207  }
208 
209  cb_pairs_.push_back(CSCallbackPair(name, entry_cb, exit_cb));
210 }
211 
212 void
213 CSCallbackPairList::removeCallbackPair(const std::string& name) {
214  for (auto it = cb_pairs_.begin(); it != cb_pairs_.end(); ++it) {
215  if ((*it).name_ == name) {
216  cb_pairs_.erase(it);
217  break;
218  }
219  }
220 }
221 
222 void
224  cb_pairs_.clear();
225 }
226 
227 const std::list<CSCallbackPair>&
229  return (cb_pairs_);
230 }
231 
232 } // namespace util
233 } // namespace isc
static MultiThreadingMgr & instance()
Returns a single instance of Multi Threading Manager.
uint32_t getThreadPoolSize() const
Get the configured dhcp thread pool size.
void removeAllCriticalSectionCallbacks()
Removes all callbacks in the list of CriticalSection callbacks.
void enterCriticalSection()
Enter critical section.
void exitCriticalSection()
Exit critical section.
void setThreadPoolSize(uint32_t size)
Set the configured dhcp thread pool size.
Multi Threading Manager.
void start(uint32_t thread_count)
start all the threads
Definition: thread_pool.h:72
void setMaxQueueSize(size_t max_queue_size)
set maximum number of work items in the queue
Definition: thread_pool.h:146
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
std::function< void()> Callback
Defines a callback as a simple void() functor.
size_t size()
size number of thread pool threads
Definition: thread_pool.h:160
void setMode(bool enabled)
Set the multi-threading mode.
bool getMode() const
Get the multi-threading mode.
void removeAll()
Removes all callbacks from the list.
const std::list< CSCallbackPair > & getCallbackPairs()
Fetches the list of callback pairs.
void addCallbackPair(const std::string &name, const CSCallbackPair::Callback &entry_cb, const CSCallbackPair::Callback &exit_cb)
Adds a callback pair to the list.
void addCriticalSectionCallbacks(const std::string &name, const CSCallbackPair::Callback &entry_cb, const CSCallbackPair::Callback &exit_cb)
Adds a pair of callbacks to the list of CriticalSection callbacks.
virtual ~MultiThreadingMgr()
Destructor.
void setPacketQueueSize(uint32_t size)
Set the configured dhcp packet queue size.
Defines the logger used by the top-level component of kea-dhcp-ddns.
ThreadPool< std::function< void()> > & getThreadPool()
Get the dhcp thread pool.
void removeCriticalSectionCallbacks(const std::string &name)
Removes the set of callbacks associated with a given name from the list of CriticalSection callbacks...
size_t getMaxQueueSize()
get maximum number of work items in the queue
Definition: thread_pool.h:153
A generic exception that is thrown if a function is called in a prohibited way.
void stop()
stop all the threads
Definition: thread_pool.h:85
void removeCallbackPair(const std::string &name)
Removes a callback pair from the list.
void reset()
reset the thread pool stopping threads and clearing the internal queue
Definition: thread_pool.h:60
bool isInCriticalSection()
Is in critical section flag.
Defines a thread pool which uses a thread pool queue for managing work items.
Definition: thread_pool.h:34
static uint32_t detectThreadCount()
The system current detected hardware concurrency thread count.
uint32_t getPacketQueueSize()
Get the configured dhcp packet queue size.
void apply(bool enabled, uint32_t thread_count, uint32_t queue_size)
Apply the multi-threading related settings.
Embodies a named pair of CriticalSection callbacks.