Kea  1.9.9-git
http_thread_pool.cc
Go to the documentation of this file.
1 // Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
8 
10 #include <asiolink/io_service.h>
12 #include <exceptions/exceptions.h>
13 #include <http/http_log.h>
14 #include <http/http_messages.h>
15 #include <http/http_thread_pool.h>
17 #include <util/unlock_guard.h>
18 
19 #include <boost/shared_ptr.hpp>
20 
21 #include <atomic>
22 #include <functional>
23 #include <iostream>
24 #include <list>
25 #include <mutex>
26 #include <thread>
27 
28 using namespace isc;
29 using namespace isc::asiolink;
30 using namespace isc::http;
31 using namespace isc::util;
32 
33 HttpThreadPool::HttpThreadPool(IOServicePtr io_service, size_t pool_size,
34  bool defer_start /* = false */)
35  : pool_size_(pool_size), io_service_(io_service),
36  run_state_(State::STOPPED), mutex_(), thread_cv_(),
37  main_cv_(), paused_(0), running_(0), exited_(0) {
38  if (!pool_size) {
39  isc_throw(BadValue, "HttpThreadPool::ctor pool_size must be > 0");
40  }
41 
42  // If we weren't given an IOService, create our own.
43  if (!io_service_) {
44  io_service_.reset(new IOService());
45  }
46 
47  // If we're not deferring the start, do it now.
48  if (!defer_start) {
49  run();
50  }
51 }
52 
54  stop();
55 }
56 
57 void
59  setState(State::RUNNING);
60 }
61 
62 void
64  setState(State::PAUSED);
65 }
66 
67 void
69  setState(State::STOPPED);
70 }
71 
73 HttpThreadPool::getState() {
74  std::lock_guard<std::mutex> lck(mutex_);
75  return (run_state_);
76 }
77 
78 bool
79 HttpThreadPool::validateStateChange(State new_state) const {
80  switch (run_state_) {
81  case State::STOPPED:
82  return (new_state == State::RUNNING);
83  case State::RUNNING:
84  return (new_state != State::RUNNING);
85  case State::PAUSED:
86  return (new_state != State::PAUSED);
87  }
88 
89  return (false);
90 }
91 
92 void
93 HttpThreadPool::setState(State new_state) {
94  std::unique_lock<std::mutex> main_lck(mutex_);
95 
96  // Bail if the transition is invalid.
97  if (!validateStateChange(new_state)) {
98  return;
99  }
100 
101  run_state_ = new_state;
102  // Notify threads of state change.
103  thread_cv_.notify_all();
104 
105  switch (new_state) {
106  case State::RUNNING: {
107  // Restart the IOService.
108  io_service_->restart();
109 
110  // While we have fewer threads than we should, make more.
111  while (threads_.size() < pool_size_) {
112  boost::shared_ptr<std::thread> thread(new std::thread(
113  std::bind(&HttpThreadPool::threadWork, this)));
114 
115  // Add thread to the pool.
116  threads_.push_back(thread);
117  }
118 
119  // Main thread waits here until all threads are running.
120  main_cv_.wait(main_lck,
121  [&]() {
122  return (running_ == threads_.size());
123  });
124 
125  exited_ = 0;
126  break;
127  }
128 
129  case State::PAUSED: {
130  // Stop IOService.
131  if (!io_service_->stopped()) {
132  io_service_->poll();
133  io_service_->stop();
134  }
135 
136  // Main thread waits here until all threads are paused.
137  main_cv_.wait(main_lck,
138  [&]() {
139  return (paused_ == threads_.size());
140  });
141 
142  break;
143  }
144 
145  case State::STOPPED: {
146  // Stop IOService.
147  if (!io_service_->stopped()) {
148  io_service_->poll();
149  io_service_->stop();
150  }
151 
152  // Main thread waits here until all threads have exited.
153  main_cv_.wait(main_lck,
154  [&]() {
155  return (exited_ == threads_.size());
156  });
157 
158  for (auto const& thread : threads_) {
159  thread->join();
160  }
161 
162  threads_.clear();
163  break;
164  }}
165 }
166 
167 void
168 HttpThreadPool::threadWork() {
169  bool done = false;
170  while (!done) {
171  switch (getState()) {
172  case State::RUNNING: {
173  {
174  std::unique_lock<std::mutex> lck(mutex_);
175  running_++;
176 
177  // If We're all running notify main thread.
178  if (running_ == pool_size_) {
179  main_cv_.notify_all();
180  }
181  }
182 
183  // Run the IOService.
184  io_service_->run();
185 
186  {
187  std::unique_lock<std::mutex> lck(mutex_);
188  running_--;
189  }
190 
191  break;
192  }
193 
194  case State::PAUSED: {
195  std::unique_lock<std::mutex> lck(mutex_);
196  paused_++;
197 
198  // If we're all paused notify main.
199  if (paused_ == threads_.size()) {
200  main_cv_.notify_all();
201  }
202 
203  // Wait here till I'm released.
204  thread_cv_.wait(lck,
205  [&]() {
206  return (run_state_ != State::PAUSED);
207  });
208 
209  paused_--;
210  break;
211  }
212 
213  case State::STOPPED: {
214  done = true;
215  break;
216  }}
217  }
218 
219  std::unique_lock<std::mutex> lck(mutex_);
220  exited_++;
221 
222  // If we've all exited, notify main.
223  if (exited_ == threads_.size()) {
224  main_cv_.notify_all();
225  }
226 }
227 
230  return (io_service_);
231 }
232 
233 uint16_t
235  return (pool_size_);
236 }
237 
238 uint16_t
240  return (threads_.size());
241 }
Pool is populated with running threads.
uint16_t getPoolSize() const
Fetches the maximum size of the thread pool.
void pause()
Transitions the pool from RUNNING to PAUSED.
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
Definition: edns.h:19
uint16_t getThreadCount() const
Fetches the number of threads in the pool.
void run()
Transitions the pool from STOPPED or PAUSED to RUNNING.
Defines a State within the State Model.
Definition: state_model.h:61
Defines the logger used by the top-level component of kea-dhcp-ddns.
State
Describes the possible operational state of the thread pool.
asiolink::IOServicePtr getIOService() const
Fetches the IOService that drives the pool.
void stop()
Transitions the pool from RUNNING or PAUSED to STOPPED.