Kea  1.9.9-git
client.cc
Go to the documentation of this file.
1 // Copyright (C) 2018-2021 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
8 
11 #include <asiolink/tls_socket.h>
12 #include <http/client.h>
13 #include <http/http_log.h>
14 #include <http/http_messages.h>
15 #include <http/response_json.h>
16 #include <http/response_parser.h>
18 #include <util/unlock_guard.h>
19 
20 #include <boost/enable_shared_from_this.hpp>
21 #include <boost/weak_ptr.hpp>
22 
23 #include <atomic>
24 #include <array>
25 #include <functional>
26 #include <iostream>
27 #include <map>
28 #include <mutex>
29 #include <queue>
30 #include <thread>
31 
32 
33 using namespace isc;
34 using namespace isc::asiolink;
35 using namespace isc::http;
36 using namespace isc::util;
37 namespace ph = std::placeholders;
38 
39 namespace {
40 
44 constexpr size_t MAX_LOGGED_MESSAGE_SIZE = 1024;
45 
47 typedef std::function<void(boost::system::error_code ec, size_t length)>
48 SocketCallbackFunction;
49 
55 class SocketCallback {
56 public:
57 
63  SocketCallback(SocketCallbackFunction socket_callback)
64  : callback_(socket_callback) {
65  }
66 
73  void operator()(boost::system::error_code ec, size_t length = 0) {
74  if (ec.value() == boost::asio::error::operation_aborted) {
75  return;
76  }
77  callback_(ec, length);
78  }
79 
80 private:
81 
83  SocketCallbackFunction callback_;
84 
85 };
86 
87 class ConnectionPool;
88 
90 typedef boost::shared_ptr<ConnectionPool> ConnectionPoolPtr;
91 
107 class Connection : public boost::enable_shared_from_this<Connection> {
108 public:
109 
117  explicit Connection(IOService& io_service,
118  const TlsContextPtr& tls_context,
119  const ConnectionPoolPtr& conn_pool,
120  const Url& url);
121 
123  ~Connection();
124 
143  void doTransaction(const HttpRequestPtr& request,
144  const HttpResponsePtr& response,
145  const long request_timeout,
146  const HttpClient::RequestHandler& callback,
147  const HttpClient::ConnectHandler& connect_callback,
148  const HttpClient::HandshakeHandler& handshake_callback,
149  const HttpClient::CloseHandler& close_callback);
150 
152  void close();
153 
157  bool isTransactionOngoing() const {
158  return (started_);
159  }
160 
164  bool isClosed() const {
165  return (closed_);
166  }
167 
172  void isClosedByPeer();
173 
179  bool isMySocket(int socket_fd) const;
180 
196  bool checkPrematureTimeout(const uint64_t transid);
197 
198 private:
199 
220  void doTransactionInternal(const HttpRequestPtr& request,
221  const HttpResponsePtr& response,
222  const long request_timeout,
223  const HttpClient::RequestHandler& callback,
224  const HttpClient::ConnectHandler& connect_callback,
225  const HttpClient::HandshakeHandler& handshake_callback,
226  const HttpClient::CloseHandler& close_callback);
227 
231  void closeInternal();
232 
239  void isClosedByPeerInternal();
240 
258  bool checkPrematureTimeoutInternal(const uint64_t transid);
259 
266  void resetState();
267 
277  void terminate(const boost::system::error_code& ec,
278  const std::string& parsing_error = "");
279 
291  void terminateInternal(const boost::system::error_code& ec,
292  const std::string& parsing_error = "");
293 
300  bool runParser(const boost::system::error_code& ec, size_t length);
301 
310  bool runParserInternal(const boost::system::error_code& ec, size_t length);
311 
315  void scheduleTimer(const long request_timeout);
316 
322  void doHandshake(const uint64_t transid);
323 
329  void doSend(const uint64_t transid);
330 
336  void doReceive(const uint64_t transid);
337 
348  void connectCallback(HttpClient::ConnectHandler connect_callback,
349  const uint64_t transid,
350  const boost::system::error_code& ec);
351 
361  void handshakeCallback(HttpClient::HandshakeHandler handshake_callback,
362  const uint64_t transid,
363  const boost::system::error_code& ec);
364 
375  void sendCallback(const uint64_t transid, const boost::system::error_code& ec,
376  size_t length);
377 
384  void receiveCallback(const uint64_t transid, const boost::system::error_code& ec,
385  size_t length);
386 
388  void timerCallback();
389 
399  void closeCallback(const bool clear = false);
400 
405  boost::weak_ptr<ConnectionPool> conn_pool_;
406 
408  Url url_;
409 
411  TlsContextPtr tls_context_;
412 
414  std::unique_ptr<TCPSocket<SocketCallback> > tcp_socket_;
415 
417  std::unique_ptr<TLSSocket<SocketCallback> > tls_socket_;
418 
420  IntervalTimer timer_;
421 
423  HttpRequestPtr current_request_;
424 
426  HttpResponsePtr current_response_;
427 
429  HttpResponseParserPtr parser_;
430 
432  HttpClient::RequestHandler current_callback_;
433 
435  std::string buf_;
436 
438  std::array<char, 32768> input_buf_;
439 
441  uint64_t current_transid_;
442 
444  HttpClient::HandshakeHandler handshake_callback_;
445 
447  HttpClient::CloseHandler close_callback_;
448 
450  std::atomic<bool> started_;
451 
453  std::atomic<bool> need_handshake_;
454 
456  std::atomic<bool> closed_;
457 
459  std::mutex mutex_;
460 };
461 
463 typedef boost::shared_ptr<Connection> ConnectionPtr;
464 
472 class ConnectionPool : public boost::enable_shared_from_this<ConnectionPool> {
473 public:
474 
481  explicit ConnectionPool(IOService& io_service, size_t max_url_connections)
482  : io_service_(io_service), destinations_(), pool_mutex_(),
483  max_url_connections_(max_url_connections) {
484  }
485 
489  ~ConnectionPool() {
490  closeAll();
491  }
492 
498  void processNextRequest(const Url& url, const TlsContextPtr& tls_context) {
499  if (MultiThreadingMgr::instance().getMode()) {
500  std::lock_guard<std::mutex> lk(pool_mutex_);
501  return (processNextRequestInternal(url, tls_context));
502  } else {
503  return (processNextRequestInternal(url, tls_context));
504  }
505  }
506 
512  void postProcessNextRequest(const Url& url,
513  const TlsContextPtr& tls_context) {
514  io_service_.post(std::bind(&ConnectionPool::processNextRequest,
515  shared_from_this(), url, tls_context));
516  }
517 
538  void queueRequest(const Url& url,
539  const TlsContextPtr& tls_context,
540  const HttpRequestPtr& request,
541  const HttpResponsePtr& response,
542  const long request_timeout,
543  const HttpClient::RequestHandler& request_callback,
544  const HttpClient::ConnectHandler& connect_callback,
545  const HttpClient::HandshakeHandler& handshake_callback,
546  const HttpClient::CloseHandler& close_callback) {
547  if (MultiThreadingMgr::instance().getMode()) {
548  std::lock_guard<std::mutex> lk(pool_mutex_);
549  return (queueRequestInternal(url, tls_context, request, response,
550  request_timeout, request_callback,
551  connect_callback, handshake_callback,
552  close_callback));
553  } else {
554  return (queueRequestInternal(url, tls_context, request, response,
555  request_timeout, request_callback,
556  connect_callback, handshake_callback,
557  close_callback));
558  }
559  }
560 
563  void closeAll() {
564  if (MultiThreadingMgr::instance().getMode()) {
565  std::lock_guard<std::mutex> lk(pool_mutex_);
566  closeAllInternal();
567  } else {
568  closeAllInternal();
569  }
570  }
571 
584  void closeIfOutOfBand(int socket_fd) {
585  if (MultiThreadingMgr::instance().getMode()) {
586  std::lock_guard<std::mutex> lk(pool_mutex_);
587  closeIfOutOfBandInternal(socket_fd);
588  } else {
589  closeIfOutOfBandInternal(socket_fd);
590  }
591  }
592 
593 private:
594 
602  void processNextRequestInternal(const Url& url,
603  const TlsContextPtr& tls_context) {
604  // Check if there is a queue for this URL. If there is no queue, there
605  // is no request queued either.
606  DestinationPtr destination = findDestination(url, tls_context);
607  if (destination) {
608  // Remove closed connections.
609  destination->garbageCollectConnections();
610  if (!destination->queueEmpty()) {
611  // We have at least one queued request. Do we have an
612  // idle connection?
613  ConnectionPtr connection = destination->getIdleConnection();
614  if (!connection) {
615  // No idle connections.
616  if (destination->connectionsFull()) {
617  return;
618  }
619  // Room to make another connection with this destination,
620  // so make one.
621  connection.reset(new Connection(io_service_, tls_context,
622  shared_from_this(), url));
623  destination->addConnection(connection);
624  }
625 
626  // Dequeue the oldest request and start a transaction for it using
627  // the idle connection.
628  RequestDescriptor desc = destination->popNextRequest();
629  connection->doTransaction(desc.request_, desc.response_,
630  desc.request_timeout_, desc.callback_,
631  desc.connect_callback_,
632  desc.handshake_callback_,
633  desc.close_callback_);
634  }
635  }
636  }
637 
660  void queueRequestInternal(const Url& url,
661  const TlsContextPtr& tls_context,
662  const HttpRequestPtr& request,
663  const HttpResponsePtr& response,
664  const long request_timeout,
665  const HttpClient::RequestHandler& request_callback,
666  const HttpClient::ConnectHandler& connect_callback,
667  const HttpClient::HandshakeHandler& handshake_callback,
668  const HttpClient::CloseHandler& close_callback) {
669  ConnectionPtr connection;
670  // Find the destination for the requested URL.
671  DestinationPtr destination = findDestination(url, tls_context);
672  if (destination) {
673  // Remove closed connections.
674  destination->garbageCollectConnections();
675  // Found it, look for an idle connection.
676  connection = destination->getIdleConnection();
677  } else {
678  // Doesn't exist yet so it's a new destination.
679  destination = addDestination(url, tls_context);
680  }
681 
682  if (!connection) {
683  if (destination->connectionsFull()) {
684  // All connections busy, queue it.
685  destination->pushRequest(RequestDescriptor(request, response,
686  request_timeout,
687  request_callback,
688  connect_callback,
689  handshake_callback,
690  close_callback));
691  return;
692  }
693 
694  // Room to make another connection with this destination, so make one.
695  connection.reset(new Connection(io_service_, tls_context,
696  shared_from_this(), url));
697  destination->addConnection(connection);
698  }
699 
700  // Use the connection to start the transaction.
701  connection->doTransaction(request, response, request_timeout, request_callback,
702  connect_callback, handshake_callback, close_callback);
703  }
704 
709  void closeAllInternal() {
710  for (auto const& destination : destinations_) {
711  destination.second->closeAllConnections();
712  }
713 
714  destinations_.clear();
715  }
716 
731  void closeIfOutOfBandInternal(int socket_fd) {
732  for (auto const& destination : destinations_) {
733  // First we look for a connection with the socket.
734  ConnectionPtr connection = destination.second->findBySocketFd(socket_fd);
735  if (connection) {
736  if (!connection->isTransactionOngoing()) {
737  // Socket has no transaction, so any ready event is
738  // out-of-band (other end probably closed), so
739  // let's close it. Note we do not remove any queued
740  // requests, as this might somehow be occurring in
741  // between them.
742  destination.second->closeConnection(connection);
743  }
744 
745  return;
746  }
747  }
748  }
749 
752  struct RequestDescriptor {
766  RequestDescriptor(const HttpRequestPtr& request,
767  const HttpResponsePtr& response,
768  const long& request_timeout,
769  const HttpClient::RequestHandler& callback,
770  const HttpClient::ConnectHandler& connect_callback,
771  const HttpClient::HandshakeHandler& handshake_callback,
772  const HttpClient::CloseHandler& close_callback)
773  : request_(request), response_(response),
774  request_timeout_(request_timeout), callback_(callback),
775  connect_callback_(connect_callback),
776  handshake_callback_(handshake_callback),
777  close_callback_(close_callback) {
778  }
779 
781  HttpRequestPtr request_;
782 
784  HttpResponsePtr response_;
785 
787  long request_timeout_;
788 
791 
793  HttpClient::ConnectHandler connect_callback_;
794 
796  HttpClient::HandshakeHandler handshake_callback_;
797 
799  HttpClient::CloseHandler close_callback_;
800  };
801 
803  typedef std::pair<Url, TlsContextPtr> DestinationDescriptor;
804 
806  class Destination {
807  public:
814  Destination(Url url, TlsContextPtr tls_context, size_t max_connections)
815  : url_(url), tls_context_(tls_context),
816  max_connections_(max_connections), connections_(), queue_() {
817  }
818 
820  ~Destination() {
821  closeAllConnections();
822  }
823 
831  void addConnection(ConnectionPtr connection) {
832  if (connectionsFull()) {
833  isc_throw(BadValue, "URL: " << url_.toText()
834  << ", already at maximum connections: "
835  << max_connections_);
836  }
837 
838  connections_.push_back(connection);
839  }
840 
845  void closeConnection(ConnectionPtr connection) {
846  for (auto it = connections_.begin(); it != connections_.end(); ++it) {
847  if (*it == connection) {
848  (*it)->close();
849  connections_.erase(it);
850  break;
851  }
852  }
853  }
854 
857  void closeAllConnections() {
858  // Flush the queue.
859  while (!queue_.empty()) {
860  queue_.pop();
861  }
862 
863  for (auto const& connection : connections_) {
864  connection->close();
865  }
866 
867  connections_.clear();
868  }
869 
893  void garbageCollectConnections() {
894  for (auto it = connections_.begin(); it != connections_.end();) {
895  (*it)->isClosedByPeer();
896  if (!(*it)->isClosed()) {
897  ++it;
898  } else {
899  it = connections_.erase(it);
900  }
901  }
902  }
903 
915  ConnectionPtr getIdleConnection() {
916  for (auto const& connection : connections_) {
917  if (!connection->isTransactionOngoing() &&
918  !connection->isClosed()) {
919  return (connection);
920  }
921  }
922 
923  return (ConnectionPtr());
924  }
925 
932  ConnectionPtr findBySocketFd(int socket_fd) {
933  for (auto const& connection : connections_) {
934  if (connection->isMySocket(socket_fd)) {
935  return (connection);
936  }
937  }
938 
939  return (ConnectionPtr());
940  }
941 
945  bool connectionsEmpty() {
946  return (connections_.empty());
947  }
948 
952  bool connectionsFull() {
953  return (connections_.size() >= max_connections_);
954  }
955 
959  size_t connectionCount() {
960  return (connections_.size());
961  }
962 
966  size_t getMaxConnections() const {
967  return (max_connections_);
968  }
969 
973  bool queueEmpty() const {
974  return (queue_.empty());
975  }
976 
980  void pushRequest(RequestDescriptor desc) {
981  queue_.push(desc);
982  }
983 
987  RequestDescriptor popNextRequest() {
988  if (queue_.empty()) {
989  isc_throw(InvalidOperation, "cannot pop, queue is empty");
990  }
991 
992  RequestDescriptor desc = queue_.front();
993  queue_.pop();
994  return (desc);
995  }
996 
997  private:
999  Url url_;
1000 
1002  TlsContextPtr tls_context_;
1003 
1005  size_t max_connections_;
1006 
1008  std::list<ConnectionPtr> connections_;
1009 
1011  std::queue<RequestDescriptor> queue_;
1012  };
1013 
1015  typedef boost::shared_ptr<Destination> DestinationPtr;
1016 
1024  DestinationPtr addDestination(const Url& url,
1025  const TlsContextPtr& tls_context) {
1026  const DestinationDescriptor& desc = std::make_pair(url, tls_context);
1027  DestinationPtr destination(new Destination(url, tls_context,
1028  max_url_connections_));
1029  destinations_[desc] = destination;
1030  return (destination);
1031  }
1032 
1041  DestinationPtr findDestination(const Url& url,
1042  const TlsContextPtr& tls_context) const {
1043  const DestinationDescriptor& desc = std::make_pair(url, tls_context);
1044  auto it = destinations_.find(desc);
1045  if (it != destinations_.end()) {
1046  return (it->second);
1047  }
1048 
1049  return (DestinationPtr());
1050  }
1051 
1063  void removeDestination(const Url& url,
1064  const TlsContextPtr& tls_context) {
1065  const DestinationDescriptor& desc = std::make_pair(url, tls_context);
1066  auto it = destinations_.find(desc);
1067  if (it != destinations_.end()) {
1068  it->second->closeAllConnections();
1069  destinations_.erase(it);
1070  }
1071  }
1072 
1074  IOService& io_service_;
1075 
1077  std::map<DestinationDescriptor, DestinationPtr> destinations_;
1078 
1080  std::mutex pool_mutex_;
1081 
1083  size_t max_url_connections_;
1084 };
1085 
1086 Connection::Connection(IOService& io_service,
1087  const TlsContextPtr& tls_context,
1088  const ConnectionPoolPtr& conn_pool,
1089  const Url& url)
1090  : conn_pool_(conn_pool), url_(url), tls_context_(tls_context),
1091  tcp_socket_(), tls_socket_(), timer_(io_service),
1092  current_request_(), current_response_(), parser_(),
1093  current_callback_(), buf_(), input_buf_(), current_transid_(0),
1094  close_callback_(), started_(false), need_handshake_(false),
1095  closed_(false) {
1096  if (!tls_context) {
1097  tcp_socket_.reset(new asiolink::TCPSocket<SocketCallback>(io_service));
1098  } else {
1099  tls_socket_.reset(new asiolink::TLSSocket<SocketCallback>(io_service,
1100  tls_context));
1101  need_handshake_ = true;
1102  }
1103 }
1104 
1105 Connection::~Connection() {
1106  close();
1107 }
1108 
1109 void
1110 Connection::resetState() {
1111  started_ = false;
1112  current_request_.reset();
1113  current_response_.reset();
1114  parser_.reset();
1115  current_callback_ = HttpClient::RequestHandler();
1116 }
1117 
1118 void
1119 Connection::closeCallback(const bool clear) {
1120  if (close_callback_) {
1121  try {
1122  if (tcp_socket_) {
1123  close_callback_(tcp_socket_->getNative());
1124  } else if (tls_socket_) {
1125  close_callback_(tls_socket_->getNative());
1126  } else {
1128  "internal error: can't find a socket to close");
1129  }
1130  } catch (...) {
1132  }
1133  }
1134 
1135  if (clear) {
1136  close_callback_ = HttpClient::CloseHandler();
1137  }
1138 }
1139 
1140 void
1141 Connection::isClosedByPeer() {
1142  // This method applies only to idle connections.
1143  if (started_ || closed_) {
1144  return;
1145  }
1146  // This code was guarded by a lock so keep this.
1147  if (MultiThreadingMgr::instance().getMode()) {
1148  std::lock_guard<std::mutex> lk(mutex_);
1149  isClosedByPeerInternal();
1150  } else {
1151  isClosedByPeerInternal();
1152  }
1153 }
1154 
1155 void
1156 Connection::isClosedByPeerInternal() {
1157  // If the socket is open we check if it is possible to transmit
1158  // the data over this socket by reading from it with message
1159  // peeking. If the socket is not usable, we close it and then
1160  // re-open it. There is a narrow window of time between checking
1161  // the socket usability and actually transmitting the data over
1162  // this socket, when the peer may close the connection. In this
1163  // case we'll need to re-transmit but we don't handle it here.
1164  if (tcp_socket_) {
1165  if (tcp_socket_->getASIOSocket().is_open() &&
1166  !tcp_socket_->isUsable()) {
1167  closeCallback();
1168  closed_ = true;
1169  tcp_socket_->close();
1170  }
1171  } else if (tls_socket_) {
1172  if (tls_socket_->getASIOSocket().is_open() &&
1173  !tls_socket_->isUsable()) {
1174  closeCallback();
1175  closed_ = true;
1176  tls_socket_->close();
1177  }
1178  } else {
1179  isc_throw(Unexpected, "internal error: can't find the sending socket");
1180  }
1181 }
1182 
1183 void
1184 Connection::doTransaction(const HttpRequestPtr& request,
1185  const HttpResponsePtr& response,
1186  const long request_timeout,
1187  const HttpClient::RequestHandler& callback,
1188  const HttpClient::ConnectHandler& connect_callback,
1189  const HttpClient::HandshakeHandler& handshake_callback,
1190  const HttpClient::CloseHandler& close_callback) {
1191  if (MultiThreadingMgr::instance().getMode()) {
1192  std::lock_guard<std::mutex> lk(mutex_);
1193  doTransactionInternal(request, response, request_timeout,
1194  callback, connect_callback, handshake_callback,
1195  close_callback);
1196  } else {
1197  doTransactionInternal(request, response, request_timeout,
1198  callback, connect_callback, handshake_callback,
1199  close_callback);
1200  }
1201 }
1202 
1203 void
1204 Connection::doTransactionInternal(const HttpRequestPtr& request,
1205  const HttpResponsePtr& response,
1206  const long request_timeout,
1207  const HttpClient::RequestHandler& callback,
1208  const HttpClient::ConnectHandler& connect_callback,
1209  const HttpClient::HandshakeHandler& handshake_callback,
1210  const HttpClient::CloseHandler& close_callback) {
1211  try {
1212  started_ = true;
1213  current_request_ = request;
1214  current_response_ = response;
1215  parser_.reset(new HttpResponseParser(*current_response_));
1216  parser_->initModel();
1217  current_callback_ = callback;
1218  handshake_callback_ = handshake_callback;
1219  close_callback_ = close_callback;
1220 
1221  // Starting new transaction. Generate new transaction id.
1222  ++current_transid_;
1223 
1224  buf_ = request->toString();
1225 
1228  .arg(request->toBriefString())
1229  .arg(url_.toText());
1230 
1233  .arg(url_.toText())
1234  .arg(HttpMessageParserBase::logFormatHttpMessage(request->toString(),
1235  MAX_LOGGED_MESSAGE_SIZE));
1236 
1237  // Setup request timer.
1238  scheduleTimer(request_timeout);
1239 
1243  TCPEndpoint endpoint(url_.getStrippedHostname(),
1244  static_cast<unsigned short>(url_.getPort()));
1245  SocketCallback socket_cb(std::bind(&Connection::connectCallback, shared_from_this(),
1246  connect_callback, current_transid_,
1247  ph::_1));
1248 
1249  // Establish new connection or use existing connection.
1250  if (tcp_socket_) {
1251  tcp_socket_->open(&endpoint, socket_cb);
1252  return;
1253  }
1254  if (tls_socket_) {
1255  tls_socket_->open(&endpoint, socket_cb);
1256  return;
1257  }
1258 
1259  // Should never reach this point.
1260  isc_throw(Unexpected, "internal error: can't find a socket to open");
1261 
1262  } catch (const std::exception& ex) {
1263  // Re-throw with the expected exception type.
1265  }
1266 }
1267 
1268 void
1269 Connection::close() {
1270  if (MultiThreadingMgr::instance().getMode()) {
1271  std::lock_guard<std::mutex> lk(mutex_);
1272  return (closeInternal());
1273  } else {
1274  return (closeInternal());
1275  }
1276 }
1277 
1278 void
1279 Connection::closeInternal() {
1280  // Pass in true to discard the callback.
1281  closeCallback(true);
1282 
1283  closed_ = true;
1284  timer_.cancel();
1285  if (tcp_socket_) {
1286  tcp_socket_->close();
1287  }
1288  if (tls_socket_) {
1289  tls_socket_->close();
1290  }
1291 
1292  resetState();
1293 }
1294 
1295 bool
1296 Connection::isMySocket(int socket_fd) const {
1297  if (tcp_socket_) {
1298  return (tcp_socket_->getNative() == socket_fd);
1299  } else if (tls_socket_) {
1300  return (tls_socket_->getNative() == socket_fd);
1301  }
1302  // Should never reach this point.
1303  std::cerr << "internal error: can't find my socket\n";
1304  return (false);
1305 }
1306 
1307 bool
1308 Connection::checkPrematureTimeout(const uint64_t transid) {
1309  if (MultiThreadingMgr::instance().getMode()) {
1310  std::lock_guard<std::mutex> lk(mutex_);
1311  return (checkPrematureTimeoutInternal(transid));
1312  } else {
1313  return (checkPrematureTimeoutInternal(transid));
1314  }
1315 }
1316 
1317 bool
1318 Connection::checkPrematureTimeoutInternal(const uint64_t transid) {
1319  // If there is no transaction but the handlers are invoked it means
1320  // that the last transaction in the queue timed out prematurely.
1321  // Also, if there is a transaction in progress but the ID of that
1322  // transaction doesn't match the one associated with the handler it,
1323  // also means that the transaction timed out prematurely.
1324  if (!isTransactionOngoing() || (transid != current_transid_)) {
1326  .arg(isTransactionOngoing())
1327  .arg(transid)
1328  .arg(current_transid_);
1329  return (true);
1330  }
1331 
1332  return (false);
1333 }
1334 
1335 void
1336 Connection::terminate(const boost::system::error_code& ec,
1337  const std::string& parsing_error) {
1338  if (MultiThreadingMgr::instance().getMode()) {
1339  std::lock_guard<std::mutex> lk(mutex_);
1340  terminateInternal(ec, parsing_error);
1341  } else {
1342  terminateInternal(ec, parsing_error);
1343  }
1344 }
1345 
1346 void
1347 Connection::terminateInternal(const boost::system::error_code& ec,
1348  const std::string& parsing_error) {
1349  HttpResponsePtr response;
1350  if (isTransactionOngoing()) {
1351 
1352  timer_.cancel();
1353  if (tcp_socket_) {
1354  tcp_socket_->cancel();
1355  }
1356  if (tls_socket_) {
1357  tls_socket_->cancel();
1358  }
1359 
1360  if (!ec && current_response_->isFinalized()) {
1361  response = current_response_;
1362 
1365  .arg(url_.toText());
1366 
1369  .arg(url_.toText())
1370  .arg(parser_ ?
1371  parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE) :
1372  "[HttpResponseParser is null]");
1373 
1374  } else {
1375  std::string err = parsing_error.empty() ? ec.message() :
1376  parsing_error;
1377 
1380  .arg(url_.toText())
1381  .arg(err);
1382 
1383  // Only log the details if we have received anything and tried
1384  // to parse it.
1385  if (!parsing_error.empty()) {
1388  .arg(url_.toText())
1389  .arg(parser_ ?
1390  parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE) :
1391  "[HttpResponseParser is null]");
1392  }
1393  }
1394 
1395  try {
1396  // The callback should take care of its own exceptions but one
1397  // never knows.
1398  if (MultiThreadingMgr::instance().getMode()) {
1399  UnlockGuard<std::mutex> lock(mutex_);
1400  current_callback_(ec, response, parsing_error);
1401  } else {
1402  current_callback_(ec, response, parsing_error);
1403  }
1404  } catch (...) {
1405  }
1406 
1407  // If we're not requesting connection persistence or the
1408  // connection has timed out, we should close the socket.
1409  if (!closed_ &&
1410  (!current_request_->isPersistent() ||
1411  (ec == boost::asio::error::timed_out))) {
1412  closeInternal();
1413  }
1414 
1415  resetState();
1416  }
1417 
1418  // Check if there are any requests queued for this destination and start
1419  // another transaction if there is at least one.
1420  ConnectionPoolPtr conn_pool = conn_pool_.lock();
1421  if (conn_pool) {
1422  conn_pool->postProcessNextRequest(url_, tls_context_);
1423  }
1424 }
1425 
1426 void
1427 Connection::scheduleTimer(const long request_timeout) {
1428  if (request_timeout > 0) {
1429  timer_.setup(std::bind(&Connection::timerCallback, this), request_timeout,
1431  }
1432 }
1433 
1434 void
1435 Connection::doHandshake(const uint64_t transid) {
1436  // Skip the handshake if it is not needed.
1437  if (!need_handshake_) {
1438  doSend(transid);
1439  return;
1440  }
1441 
1442  SocketCallback socket_cb(std::bind(&Connection::handshakeCallback,
1443  shared_from_this(),
1444  handshake_callback_,
1445  transid,
1446  ph::_1));
1447  try {
1448  tls_socket_->handshake(socket_cb);
1449 
1450  } catch (...) {
1451  terminate(boost::asio::error::not_connected);
1452  }
1453 }
1454 
1455 void
1456 Connection::doSend(const uint64_t transid) {
1457  SocketCallback socket_cb(std::bind(&Connection::sendCallback,
1458  shared_from_this(),
1459  transid,
1460  ph::_1,
1461  ph::_2));
1462  try {
1463  if (tcp_socket_) {
1464  tcp_socket_->asyncSend(&buf_[0], buf_.size(), socket_cb);
1465  return;
1466  }
1467 
1468  if (tls_socket_) {
1469  tls_socket_->asyncSend(&buf_[0], buf_.size(), socket_cb);
1470  return;
1471  }
1472 
1473  // Should never reach this point.
1474  std::cerr << "internal error: can't find a socket to send to\n";
1476  "internal error: can't find a socket to send to");
1477  } catch (...) {
1478  terminate(boost::asio::error::not_connected);
1479  }
1480 }
1481 
1482 void
1483 Connection::doReceive(const uint64_t transid) {
1484  TCPEndpoint endpoint;
1485  SocketCallback socket_cb(std::bind(&Connection::receiveCallback,
1486  shared_from_this(),
1487  transid,
1488  ph::_1,
1489  ph::_2));
1490  try {
1491  if (tcp_socket_) {
1492  tcp_socket_->asyncReceive(static_cast<void*>(input_buf_.data()),
1493  input_buf_.size(), 0,
1494  &endpoint, socket_cb);
1495  return;
1496  }
1497  if (tls_socket_) {
1498  tls_socket_->asyncReceive(static_cast<void*>(input_buf_.data()),
1499  input_buf_.size(), 0,
1500  &endpoint, socket_cb);
1501  return;
1502  }
1503  // Should never reach this point.
1504  std::cerr << "internal error: can't find a socket to receive from\n";
1506  "internal error: can't find a socket to receive from");
1507 
1508  } catch (...) {
1509  terminate(boost::asio::error::not_connected);
1510  }
1511 }
1512 
1513 void
1514 Connection::connectCallback(HttpClient::ConnectHandler connect_callback,
1515  const uint64_t transid,
1516  const boost::system::error_code& ec) {
1517  if (checkPrematureTimeout(transid)) {
1518  return;
1519  }
1520 
1521  // Run user defined connect callback if specified.
1522  if (connect_callback) {
1523  // If the user defined callback indicates that the connection
1524  // should not be continued.
1525  if (tcp_socket_) {
1526  if (!connect_callback(ec, tcp_socket_->getNative())) {
1527  return;
1528  }
1529  } else if (tls_socket_) {
1530  if (!connect_callback(ec, tls_socket_->getNative())) {
1531  return;
1532  }
1533  } else {
1534  // Should never reach this point.
1535  std::cerr << "internal error: can't find a socket to connect\n";
1536  }
1537  }
1538 
1539  if (ec && (ec.value() == boost::asio::error::operation_aborted)) {
1540  return;
1541 
1542  // In some cases the "in progress" status code may be returned. It doesn't
1543  // indicate an error. Sending the request over the socket is expected to
1544  // be successful. Getting such status appears to be highly dependent on
1545  // the operating system.
1546  } else if (ec &&
1547  (ec.value() != boost::asio::error::in_progress) &&
1548  (ec.value() != boost::asio::error::already_connected)) {
1549  terminate(ec);
1550 
1551  } else {
1552  // Start the TLS handshake asynchronously.
1553  doHandshake(transid);
1554  }
1555 }
1556 
1557 void
1558 Connection::handshakeCallback(HttpClient::ConnectHandler handshake_callback,
1559  const uint64_t transid,
1560  const boost::system::error_code& ec) {
1561  need_handshake_ = false;
1562  if (checkPrematureTimeout(transid)) {
1563  return;
1564  }
1565 
1566  // Run user defined handshake callback if specified.
1567  if (handshake_callback) {
1568  // If the user defined callback indicates that the connection
1569  // should not be continued.
1570  if (tls_socket_) {
1571  if (!handshake_callback(ec, tls_socket_->getNative())) {
1572  return;
1573  }
1574  } else {
1575  // Should never reach this point.
1576  std::cerr << "internal error: can't find TLS socket\n";
1577  }
1578  }
1579 
1580  if (ec && (ec.value() == boost::asio::error::operation_aborted)) {
1581  return;
1582  } else if (ec) {
1583  terminate(ec);
1584 
1585  } else {
1586  // Start sending the request asynchronously.
1587  doSend(transid);
1588  }
1589 }
1590 
1591 void
1592 Connection::sendCallback(const uint64_t transid,
1593  const boost::system::error_code& ec,
1594  size_t length) {
1595  if (checkPrematureTimeout(transid)) {
1596  return;
1597  }
1598 
1599  if (ec) {
1600  if (ec.value() == boost::asio::error::operation_aborted) {
1601  return;
1602 
1603  // EAGAIN and EWOULDBLOCK don't really indicate an error. The length
1604  // should be 0 in this case but let's be sure.
1605  } else if ((ec.value() == boost::asio::error::would_block) ||
1606  (ec.value() == boost::asio::error::try_again)) {
1607  length = 0;
1608 
1609  } else {
1610  // Any other error should cause the transaction to terminate.
1611  terminate(ec);
1612  return;
1613  }
1614  }
1615 
1616  // Sending is in progress, so push back the timeout.
1617  scheduleTimer(timer_.getInterval());
1618 
1619  // If any data have been sent, remove it from the buffer and only leave the
1620  // portion that still has to be sent.
1621  if (length > 0) {
1622  buf_.erase(0, length);
1623  }
1624 
1625  // If there is no more data to be sent, start receiving a response. Otherwise,
1626  // continue sending.
1627  if (buf_.empty()) {
1628  doReceive(transid);
1629 
1630  } else {
1631  doSend(transid);
1632  }
1633 }
1634 
1635 void
1636 Connection::receiveCallback(const uint64_t transid,
1637  const boost::system::error_code& ec,
1638  size_t length) {
1639  if (checkPrematureTimeout(transid)) {
1640  return;
1641  }
1642 
1643  if (ec) {
1644  if (ec.value() == boost::asio::error::operation_aborted) {
1645  return;
1646 
1647  // EAGAIN and EWOULDBLOCK don't indicate an error in this case. All
1648  // other errors should terminate the transaction.
1649  } if ((ec.value() != boost::asio::error::try_again) &&
1650  (ec.value() != boost::asio::error::would_block)) {
1651  terminate(ec);
1652  return;
1653 
1654  } else {
1655  // For EAGAIN and EWOULDBLOCK the length should be 0 anyway, but let's
1656  // make sure.
1657  length = 0;
1658  }
1659  }
1660 
1661  // Receiving is in progress, so push back the timeout.
1662  scheduleTimer(timer_.getInterval());
1663 
1664  if (runParser(ec, length)) {
1665  doReceive(transid);
1666  }
1667 }
1668 
1669 bool
1670 Connection::runParser(const boost::system::error_code& ec, size_t length) {
1671  if (MultiThreadingMgr::instance().getMode()) {
1672  std::lock_guard<std::mutex> lk(mutex_);
1673  return (runParserInternal(ec, length));
1674  } else {
1675  return (runParserInternal(ec, length));
1676  }
1677 }
1678 
1679 bool
1680 Connection::runParserInternal(const boost::system::error_code& ec,
1681  size_t length) {
1682  // If we have received any data, let's feed the parser with it.
1683  if (length != 0) {
1684  parser_->postBuffer(static_cast<void*>(input_buf_.data()), length);
1685  parser_->poll();
1686  }
1687 
1688  // If the parser still needs data, let's schedule another receive.
1689  if (parser_->needData()) {
1690  return (true);
1691 
1692  } else if (parser_->httpParseOk()) {
1693  // No more data needed and parsing has been successful so far. Let's
1694  // try to finalize the response parsing.
1695  try {
1696  current_response_->finalize();
1697  terminateInternal(ec);
1698 
1699  } catch (const std::exception& ex) {
1700  // If there is an error here, we need to return the error message.
1701  terminateInternal(ec, ex.what());
1702  }
1703 
1704  } else {
1705  // Parsing was unsuccessful. Let's pass the error message held in the
1706  // parser.
1707  terminateInternal(ec, parser_->getErrorMessage());
1708  }
1709 
1710  return (false);
1711 }
1712 
1713 void
1714 Connection::timerCallback() {
1715  // Request timeout occurred.
1716  terminate(boost::asio::error::timed_out);
1717 }
1718 
1719 }
1720 
1721 namespace isc {
1722 namespace http {
1723 
1726 public:
1750  HttpClientImpl(IOService& io_service, size_t thread_pool_size = 0,
1751  bool defer_thread_start = false)
1752  : thread_pool_size_(thread_pool_size), thread_pool_() {
1753  if (thread_pool_size_ > 0) {
1754  // Create our own private IOService.
1755  thread_io_service_.reset(new IOService());
1756 
1757  // Create the thread pool.
1758  thread_pool_.reset(new HttpThreadPool(thread_io_service_, thread_pool_size_,
1759  defer_thread_start));
1760 
1761  // Create the connection pool. Note that we use the thread_pool_size
1762  // as the maximum connections per URL value.
1763  conn_pool_.reset(new ConnectionPool(*thread_io_service_, thread_pool_size_));
1764 
1766  .arg(getThreadCount());
1767  } else {
1768  // Single-threaded mode: use the caller's IOService,
1769  // one connection per URL.
1770  conn_pool_.reset(new ConnectionPool(io_service, 1));
1771  }
1772  }
1773 
1778  stop();
1779  }
1780 
1782  void start() {
1783  if (thread_pool_) {
1784  thread_pool_->run();
1785  }
1786  }
1787 
1790  void stop() {
1791  // Close all the connections.
1792  conn_pool_->closeAll();
1793 
1794  // Stop the thread pool.
1795  if (thread_pool_) {
1796  thread_pool_->stop();
1797  }
1798  }
1799 
1804  void pause() {
1805  if (!thread_pool_) {
1806  isc_throw(InvalidOperation, "HttpClient::pause - no thread pool");
1807  }
1808 
1809  // Pause the thread pool.
1810  thread_pool_->pause();
1811  }
1812 
1817  void resume() {
1818  if (!thread_pool_) {
1819  isc_throw(InvalidOperation, "HttpClient::resume - no thread pool");
1820  }
1821 
1822  // Resume running the thread pool.
1823  thread_pool_->run();
1824  }
1825 
1830  bool isRunning() {
1831  if (thread_pool_) {
1832  return (thread_pool_->isRunning());
1833  }
1834 
1835  return (false);
1836  }
1837 
1842  bool isStopped() {
1843  if (thread_pool_) {
1844  return (thread_pool_->isStopped());
1845  }
1846 
1847  return (false);
1848  }
1849 
1854  bool isPaused() {
1855  if (thread_pool_) {
1856  return (thread_pool_->isPaused());
1857  }
1858 
1859  return (false);
1860  }
1861 
1867  return (thread_io_service_);
1868  };
1869 
1873  uint16_t getThreadPoolSize() {
1874  return (thread_pool_size_);
1875  }
1876 
1880  uint16_t getThreadCount() {
1881  if (!thread_pool_) {
1882  return (0);
1883  }
1884  return (thread_pool_->getThreadCount());
1885  }
1886 
1888  ConnectionPoolPtr conn_pool_;
1889 
1890 private:
1891 
1893  size_t thread_pool_size_;
1894 
1896  asiolink::IOServicePtr thread_io_service_;
1897 
1900  HttpThreadPoolPtr thread_pool_;
1901 };
1902 
1903 HttpClient::HttpClient(IOService& io_service, size_t thread_pool_size,
1904  bool defer_thread_start /* = false */) {
1905  if (thread_pool_size > 0) {
1906  if (!MultiThreadingMgr::instance().getMode()) {
1908  "HttpClient thread_pool_size must be zero"
1909  "when Kea core multi-threading is disabled");
1910  }
1911  }
1912 
1913  impl_.reset(new HttpClientImpl(io_service, thread_pool_size,
1914  defer_thread_start));
1915 }
1916 
1918 }
1919 
1920 void
1922  const TlsContextPtr& tls_context,
1923  const HttpRequestPtr& request,
1924  const HttpResponsePtr& response,
1925  const HttpClient::RequestHandler& request_callback,
1926  const HttpClient::RequestTimeout& request_timeout,
1927  const HttpClient::ConnectHandler& connect_callback,
1928  const HttpClient::HandshakeHandler& handshake_callback,
1929  const HttpClient::CloseHandler& close_callback) {
1930  if (!url.isValid()) {
1931  isc_throw(HttpClientError, "invalid URL specified for the HTTP client");
1932  }
1933 
1934  if ((url.getScheme() == Url::Scheme::HTTPS) && !tls_context) {
1935  isc_throw(HttpClientError, "HTTPS URL scheme but no TLS context");
1936  }
1937 
1938  if (!request) {
1939  isc_throw(HttpClientError, "HTTP request must not be null");
1940  }
1941 
1942  if (!response) {
1943  isc_throw(HttpClientError, "HTTP response must not be null");
1944  }
1945 
1946  if (!request_callback) {
1947  isc_throw(HttpClientError, "callback for HTTP transaction must not be null");
1948  }
1949 
1950  impl_->conn_pool_->queueRequest(url, tls_context, request, response,
1951  request_timeout.value_,
1952  request_callback, connect_callback,
1953  handshake_callback, close_callback);
1954 }
1955 
1956 void
1958  return (impl_->conn_pool_->closeIfOutOfBand(socket_fd));
1959 }
1960 
1961 void
1963  impl_->start();
1964 }
1965 
1966 void
1968  impl_->pause();
1969 }
1970 
1971 void
1973  impl_->resume();
1974 }
1975 
1976 void
1978  impl_->stop();
1979 }
1980 
1981 const IOServicePtr
1983  return (impl_->getThreadIOService());
1984 }
1985 
1986 uint16_t
1988  return (impl_->getThreadPoolSize());
1989 }
1990 
1991 uint16_t
1993  return (impl_->getThreadCount());
1994 }
1995 
1996 bool
1998  return (impl_->isRunning());
1999 }
2000 
2001 bool
2003  return (impl_->isStopped());
2004 }
2005 
2006 bool
2008  return (impl_->isPaused());
2009 }
2010 
2011 } // end of namespace isc::http
2012 } // end of namespace isc
void stop()
Halts client-side IO activity.
Definition: client.cc:1977
const isc::log::MessageID HTTP_BAD_SERVER_RESPONSE_RECEIVED
Definition: http_messages.h:14
bool isRunning()
Indicates if the thread pool is running.
Definition: client.cc:1830
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
Definition: macros.h:26
const isc::log::MessageID HTTP_BAD_SERVER_RESPONSE_RECEIVED_DETAILS
Definition: http_messages.h:15
std::function< bool(const boost::system::error_code &, const int)> ConnectHandler
Optional handler invoked when client connects to the server.
Definition: client.h:115
const int DBGLVL_TRACE_BASIC
Trace basic operations.
Definition: log_dbglevels.h:65
uint16_t getThreadPoolSize() const
Fetches the maximum size of the thread pool.
Definition: client.cc:1987
void asyncSendRequest(const Url &url, const asiolink::TlsContextPtr &tls_context, const HttpRequestPtr &request, const HttpResponsePtr &response, const RequestHandler &request_callback, const RequestTimeout &request_timeout=RequestTimeout(10000), const ConnectHandler &connect_callback=ConnectHandler(), const HandshakeHandler &handshake_callback=HandshakeHandler(), const CloseHandler &close_callback=CloseHandler())
Queues new asynchronous HTTP request for a given URL.
Definition: client.cc:1921
std::function< void()> callback_
The callback function.
Definition: io_service.cc:38
long value_
Timeout value specified.
Definition: client.h:97
const asiolink::IOServicePtr getThreadIOService() const
Fetches a pointer to the internal IOService used to drive the thread-pool in multi-threaded mode...
Definition: client.cc:1982
std::function< void(const boost::system::error_code &, const HttpResponsePtr &, const std::string &)> RequestHandler
Callback type used in call to HttpClient::asyncSendRequest.
Definition: client.h:103
void pause()
Pauses the client's thread pool.
Definition: client.cc:1804
HttpClient(asiolink::IOService &io_service, size_t thread_pool_size=0, bool defer_thread_start=false)
Constructor.
Definition: client.cc:1903
HTTP request/response timeout value.
Definition: client.h:90
void start()
Starts running the client's thread pool, if multi-threaded.
Definition: client.cc:1782
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition: macros.h:32
const isc::log::MessageID HTTP_SERVER_RESPONSE_RECEIVED_DETAILS
Definition: http_messages.h:34
std::function< void(const int)> CloseHandler
Optional handler invoked when client closes the connection to the server.
Definition: client.h:133
bool isValid() const
Checks if the URL is valid.
Definition: url.h:47
bool isStopped()
Indicates if the thread pool is stopped.
Definition: client.cc:2002
const isc::log::MessageID HTTP_PREMATURE_CONNECTION_TIMEOUT_OCCURRED
Definition: http_messages.h:31
const isc::log::MessageID HTTP_SERVER_RESPONSE_RECEIVED
Definition: http_messages.h:33
HttpClientImpl(IOService &io_service, size_t thread_pool_size=0, bool defer_thread_start=false)
Constructor.
Definition: client.cc:1750
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
~HttpClientImpl()
Destructor.
Definition: client.cc:1777
const isc::log::MessageID HTTP_CLIENT_REQUEST_SEND
Definition: http_messages.h:19
void resume()
Resumes running the client's thread pool.
Definition: client.cc:1972
ConnectionPoolPtr conn_pool_
Holds a pointer to the connection pool.
Definition: client.cc:1888
Represents an URL.
Definition: url.h:20
A generic parser for HTTP responses.
Definition: edns.h:19
void resume()
Resumes running the client's thread pool.
Definition: client.cc:1817
boost::shared_ptr< HttpResponse > HttpResponsePtr
Pointer to the HttpResponse object.
Definition: response.h:78
const isc::log::MessageID HTTP_CLIENT_REQUEST_SEND_DETAILS
Definition: http_messages.h:20
A generic exception that is thrown when an unexpected error condition occurs.
const int DBGLVL_TRACE_DETAIL_DATA
Trace data associated with detailed operations.
Definition: log_dbglevels.h:74
void stop()
Close all connections, and if multi-threaded, stops the client's thread pool.
Definition: client.cc:1790
void start()
Starts running the client's thread pool, if multi-threaded.
Definition: client.cc:1962
bool isPaused()
Indicates if the thread pool is paused.
Definition: client.cc:1854
~HttpClient()
Destructor.
Definition: client.cc:1917
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
const isc::log::MessageID HTTP_CLIENT_MT_STARTED
Definition: http_messages.h:16
Defines the logger used by the top-level component of kea-dhcp-ddns.
boost::shared_ptr< HttpResponseParser > HttpResponseParserPtr
Pointer to the HttpResponseParser.
std::function< bool(const boost::system::error_code &, const int)> HandshakeHandler
Optional handler invoked when client performs the TLS handshake with the server.
Definition: client.h:128
void pause()
Pauses the client's thread pool.
Definition: client.cc:1967
A generic exception that is thrown if a function is called in a prohibited way.
Unlock Guard.
Definition: unlock_guard.h:21
Scheme getScheme() const
Returns parsed scheme.
Definition: url.cc:31
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
Definition: macros.h:14
Implements a pausable pool of IOService driven threads.
isc::log::Logger http_logger("http")
Defines the logger used within libkea-http library.
Definition: http_log.h:18
const isc::log::MessageID HTTP_CONNECTION_CLOSE_CALLBACK_FAILED
Definition: http_messages.h:22
bool isPaused()
Indicates if the thread pool is paused.
Definition: client.cc:2007
bool isRunning()
Indicates if the thread pool is running.
Definition: client.cc:1997
boost::shared_ptr< HttpRequest > HttpRequestPtr
Pointer to the HttpRequest object.
Definition: request.h:27
uint16_t getThreadPoolSize()
Fetches the maximum size of the thread pool.
Definition: client.cc:1873
boost::shared_ptr< HttpThreadPool > HttpThreadPoolPtr
Defines a pointer to a thread pool.
uint16_t getThreadCount() const
Fetches the number of threads in the pool.
Definition: client.cc:1992
void closeIfOutOfBand(int socket_fd)
Closes a connection if it has an out-of-band socket event.
Definition: client.cc:1957
const int DBGLVL_TRACE_DETAIL
Trace detailed operations.
Definition: log_dbglevels.h:71
uint16_t getThreadCount()
Fetches the number of threads in the pool.
Definition: client.cc:1880
asiolink::IOServicePtr getThreadIOService()
Fetches the internal IOService used in multi-threaded mode.
Definition: client.cc:1866
bool isStopped()
Indicates if the thread pool is stopped.
Definition: client.cc:1842
A generic error raised by the HttpClient class.
Definition: client.h:27
const int DBGLVL_TRACE_BASIC_DATA
Trace data associated with the basic operations.
Definition: log_dbglevels.h:68
HttpClient implementation.
Definition: client.cc:1725