Kea  1.9.9-git
command_mgr.cc
Go to the documentation of this file.
1 // Copyright (C) 2015-2020 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
8 
11 #include <asiolink/io_service.h>
15 #include <config/command_mgr.h>
16 #include <cc/data.h>
17 #include <cc/command_interpreter.h>
18 #include <cc/json_feed.h>
19 #include <dhcp/iface_mgr.h>
20 #include <config/config_log.h>
21 #include <config/timeouts.h>
22 #include <util/watch_socket.h>
23 #include <boost/enable_shared_from_this.hpp>
24 #include <array>
25 #include <functional>
26 #include <unistd.h>
27 #include <sys/file.h>
28 
29 using namespace isc;
30 using namespace isc::asiolink;
31 using namespace isc::config;
32 using namespace isc::data;
33 namespace ph = std::placeholders;
34 
35 namespace {
36 
38 const size_t BUF_SIZE = 32768;
39 
40 class ConnectionPool;
41 
46 class Connection : public boost::enable_shared_from_this<Connection> {
47 public:
48 
66  Connection(const IOServicePtr& io_service,
67  const boost::shared_ptr<UnixDomainSocket>& socket,
68  ConnectionPool& connection_pool,
69  const long timeout)
70  : socket_(socket), timeout_timer_(*io_service), timeout_(timeout),
71  buf_(), response_(), connection_pool_(connection_pool), feed_(),
72  response_in_progress_(false), watch_socket_(new util::WatchSocket()) {
73 
75  .arg(socket_->getNative());
76 
77  // Callback value of 0 is used to indicate that callback function is
78  // not installed.
79  isc::dhcp::IfaceMgr::instance().addExternalSocket(watch_socket_->getSelectFd(), 0);
80  isc::dhcp::IfaceMgr::instance().addExternalSocket(socket_->getNative(), 0);
81 
82  // Initialize state model for receiving and preparsing commands.
83  feed_.initModel();
84 
85  // Start timer for detecting timeouts.
86  scheduleTimer();
87  }
88 
92  ~Connection() {
93  timeout_timer_.cancel();
94  }
95 
97  void scheduleTimer() {
98  timeout_timer_.setup(std::bind(&Connection::timeoutHandler, this),
99  timeout_, IntervalTimer::ONE_SHOT);
100  }
101 
108  void stop() {
109  if (!response_in_progress_) {
111  .arg(socket_->getNative());
112 
113  isc::dhcp::IfaceMgr::instance().deleteExternalSocket(watch_socket_->getSelectFd());
114  isc::dhcp::IfaceMgr::instance().deleteExternalSocket(socket_->getNative());
115 
116  // Close watch socket and log errors if occur.
117  std::string watch_error;
118  if (!watch_socket_->closeSocket(watch_error)) {
120  .arg(watch_error);
121  }
122 
123  socket_->close();
124  timeout_timer_.cancel();
125  }
126  }
127 
132  void terminate();
133 
139  void doReceive() {
140  socket_->asyncReceive(&buf_[0], sizeof(buf_),
141  std::bind(&Connection::receiveHandler,
142  shared_from_this(), ph::_1, ph::_2));
143  }
144 
152  void doSend() {
153  size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
154  socket_->asyncSend(&response_[0], chunk_size,
155  std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
156 
157  // Asynchronous send has been scheduled and we need to indicate this
158  // to break the synchronous select(). The handler should clear this
159  // status when invoked.
160  try {
161  watch_socket_->markReady();
162 
163  } catch (const std::exception& ex) {
165  .arg(ex.what());
166  }
167  }
168 
177  //
181  void receiveHandler(const boost::system::error_code& ec,
182  size_t bytes_transferred);
183 
184 
193  void sendHandler(const boost::system::error_code& ec,
194  size_t bytes_transferred);
195 
200  void timeoutHandler();
201 
202 private:
203 
205  boost::shared_ptr<UnixDomainSocket> socket_;
206 
208  IntervalTimer timeout_timer_;
209 
211  long timeout_;
212 
214  std::array<char, BUF_SIZE> buf_;
215 
217  std::string response_;
218 
220  ConnectionPool& connection_pool_;
221 
224  JSONFeed feed_;
225 
228  bool response_in_progress_;
229 
232  util::WatchSocketPtr watch_socket_;
233 };
234 
236 typedef boost::shared_ptr<Connection> ConnectionPtr;
237 
239 class ConnectionPool {
240 public:
241 
245  void start(const ConnectionPtr& connection) {
246  connection->doReceive();
247  connections_.insert(connection);
248  }
249 
253  void stop(const ConnectionPtr& connection) {
254  try {
255  connection->stop();
256  connections_.erase(connection);
257  } catch (const std::exception& ex) {
259  .arg(ex.what());
260  }
261  }
262 
264  void stopAll() {
265  for (auto conn = connections_.begin(); conn != connections_.end();
266  ++conn) {
267  (*conn)->stop();
268  }
269  connections_.clear();
270  }
271 
272 private:
273 
275  std::set<ConnectionPtr> connections_;
276 
277 };
278 
279 void
280 Connection::terminate() {
281  try {
282  socket_->shutdown();
283 
284  } catch (const std::exception& ex) {
286  .arg(ex.what());
287  }
288 }
289 
290 void
291 Connection::receiveHandler(const boost::system::error_code& ec,
292  size_t bytes_transferred) {
293  if (ec) {
294  if (ec.value() == boost::asio::error::eof) {
295  std::stringstream os;
296  if (feed_.getProcessedText().empty()) {
297  os << "no input data to discard";
298  }
299  else {
300  os << "discarding partial command of "
301  << feed_.getProcessedText().size() << " bytes";
302  }
303 
304  // Foreign host has closed the connection. We should remove it from the
305  // connection pool.
307  .arg(socket_->getNative()).arg(os.str());
308  } else if (ec.value() != boost::asio::error::operation_aborted) {
310  .arg(ec.value()).arg(socket_->getNative());
311  }
312 
313  connection_pool_.stop(shared_from_this());
314  return;
315 
316  } else if (bytes_transferred == 0) {
317  // Nothing received. Close the connection.
318  connection_pool_.stop(shared_from_this());
319  return;
320  }
321 
323  .arg(bytes_transferred).arg(socket_->getNative());
324 
325  // Reschedule the timer because the transaction is ongoing.
326  scheduleTimer();
327 
328  ConstElementPtr rsp;
329 
330  try {
331  // Received some data over the socket. Append them to the JSON feed
332  // to see if we have reached the end of command.
333  feed_.postBuffer(&buf_[0], bytes_transferred);
334  feed_.poll();
335  // If we haven't yet received the full command, continue receiving.
336  if (feed_.needData()) {
337  doReceive();
338  return;
339  }
340 
341  // Received entire command. Parse the command into JSON.
342  if (feed_.feedOk()) {
343  ConstElementPtr cmd = feed_.toElement();
344  response_in_progress_ = true;
345 
346  // Cancel the timer to make sure that long lasting command
347  // processing doesn't cause the timeout.
348  timeout_timer_.cancel();
349 
350  // If successful, then process it as a command.
351  rsp = CommandMgr::instance().processCommand(cmd);
352 
353  response_in_progress_ = false;
354 
355  } else {
356  // Failed to parse command as JSON or process the received command.
357  // This exception will be caught below and the error response will
358  // be sent.
359  isc_throw(BadValue, feed_.getErrorMessage());
360  }
361 
362  } catch (const Exception& ex) {
364  rsp = createAnswer(CONTROL_RESULT_ERROR, std::string(ex.what()));
365  }
366 
367  // No response generated. Connection will be closed.
368  if (!rsp) {
371  "internal server error: no response generated");
372 
373  } else {
374 
375  // Reschedule the timer as it may be either canceled or need to be
376  // updated to not timeout before we manage to the send the reply.
377  scheduleTimer();
378 
379  // Let's convert JSON response to text. Note that at this stage
380  // the rsp pointer is always set.
381  response_ = rsp->str();
382 
383  doSend();
384  return;
385  }
386 
387  // Close the connection if we have sent the entire response.
388  connection_pool_.stop(shared_from_this());
389 }
390 
391 void
392 Connection::sendHandler(const boost::system::error_code& ec,
393  size_t bytes_transferred) {
394  // Clear the watch socket so as the future send operation can mark it
395  // again to interrupt the synchronous select() call.
396  try {
397  watch_socket_->clearReady();
398 
399  } catch (const std::exception& ex) {
401  .arg(ex.what());
402  }
403 
404  if (ec) {
405  // If an error occurred, log this error and stop the connection.
406  if (ec.value() != boost::asio::error::operation_aborted) {
408  .arg(socket_->getNative()).arg(ec.message());
409  }
410 
411  } else {
412 
413  // Reschedule the timer because the transaction is ongoing.
414  scheduleTimer();
415 
416  // No error. We are in a process of sending a response. Need to
417  // remove the chunk that we have managed to sent with the previous
418  // attempt.
419  response_.erase(0, bytes_transferred);
420 
422  .arg(bytes_transferred).arg(response_.size())
423  .arg(socket_->getNative());
424 
425  // Check if there is any data left to be sent and sent it.
426  if (!response_.empty()) {
427  doSend();
428  return;
429  }
430 
431  // Gracefully shutdown the connection and close the socket if
432  // we have sent the whole response.
433  terminate();
434  }
435 
436  // All data sent or an error has occurred. Close the connection.
437  connection_pool_.stop(shared_from_this());
438 }
439 
440 void
441 Connection::timeoutHandler() {
443  .arg(socket_->getNative());
444 
445  try {
446  socket_->cancel();
447 
448  } catch (const std::exception& ex) {
450  .arg(socket_->getNative())
451  .arg(ex.what());
452  }
453 
454  std::stringstream os;
455  os << "Connection over control channel timed out";
456  if (!feed_.getProcessedText().empty()) {
457  os << ", discarded partial command of "
458  << feed_.getProcessedText().size() << " bytes";
459  }
460 
462  response_ = rsp->str();
463  doSend();
464 }
465 
466 
467 }
468 
469 namespace isc {
470 namespace config {
471 
474 public:
475 
478  : io_service_(), acceptor_(), socket_(), socket_name_(),
479  connection_pool_(), timeout_(TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND) {
480  }
481 
487  void openCommandSocket(const isc::data::ConstElementPtr& socket_info);
488 
490  void doAccept();
491 
493  std::string getLockName() {
494  return (std::string(socket_name_ + ".lock"));
495  }
496 
500 
502  boost::shared_ptr<UnixDomainSocketAcceptor> acceptor_;
503 
505  boost::shared_ptr<UnixDomainSocket> socket_;
506 
510  std::string socket_name_;
511 
513  ConnectionPool connection_pool_;
514 
516  long timeout_;
517 };
518 
519 void
520 CommandMgrImpl::openCommandSocket(const isc::data::ConstElementPtr& socket_info) {
521  socket_name_.clear();
522 
523  if(!socket_info) {
524  isc_throw(BadSocketInfo, "Missing socket_info parameters, can't create socket.");
525  }
526 
527  ConstElementPtr type = socket_info->get("socket-type");
528  if (!type) {
529  isc_throw(BadSocketInfo, "Mandatory 'socket-type' parameter missing");
530  }
531 
532  // Only supporting unix sockets right now.
533  if (type->stringValue() != "unix") {
534  isc_throw(BadSocketInfo, "Invalid 'socket-type' parameter value "
535  << type->stringValue());
536  }
537 
538  // UNIX socket is requested. It takes one parameter: socket-name that
539  // specifies UNIX path of the socket.
540  ConstElementPtr name = socket_info->get("socket-name");
541  if (!name) {
542  isc_throw(BadSocketInfo, "Mandatory 'socket-name' parameter missing");
543  }
544 
545  if (name->getType() != Element::string) {
546  isc_throw(BadSocketInfo, "'socket-name' parameter expected to be a string");
547  }
548 
549  socket_name_ = name->stringValue();
550 
551  // First let's open lock file.
552  std::string lock_name = getLockName();
553  int lock_fd = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
554  if (lock_fd == -1) {
555  std::string errmsg = strerror(errno);
556  isc_throw(SocketError, "cannot create socket lockfile, "
557  << lock_name << ", : " << errmsg);
558  }
559 
560  // Try to acquire lock. If we can't somebody else is actively
561  // using it.
562  int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
563  if (ret != 0) {
564  std::string errmsg = strerror(errno);
565  isc_throw(SocketError, "cannot lock socket lockfile, "
566  << lock_name << ", : " << errmsg);
567  }
568 
569  // We have the lock, so let's remove the pre-existing socket
570  // file if it exists.
571  static_cast<void>(::remove(socket_name_.c_str()));
572 
574  .arg(socket_name_);
575 
576  try {
577  // Start asynchronous acceptor service.
578  acceptor_.reset(new UnixDomainSocketAcceptor(*io_service_));
579  UnixDomainSocketEndpoint endpoint(socket_name_);
580  acceptor_->open(endpoint);
581  acceptor_->bind(endpoint);
582  acceptor_->listen();
583  // Install this socket in Interface Manager.
584  isc::dhcp::IfaceMgr::instance().addExternalSocket(acceptor_->getNative(), 0);
585 
586  doAccept();
587 
588  } catch (const std::exception& ex) {
589  isc_throw(SocketError, ex.what());
590  }
591 }
592 
593 void
594 CommandMgrImpl::doAccept() {
595  // Create a socket into which the acceptor will accept new connection.
596  socket_.reset(new UnixDomainSocket(*io_service_));
597  acceptor_->asyncAccept(*socket_, [this](const boost::system::error_code& ec) {
598  if (!ec) {
599  // New connection is arriving. Start asynchronous transmission.
600  ConnectionPtr connection(new Connection(io_service_, socket_,
601  connection_pool_,
602  timeout_));
603  connection_pool_.start(connection);
604 
605  } else if (ec.value() != boost::asio::error::operation_aborted) {
607  .arg(acceptor_->getNative()).arg(ec.message());
608  }
609 
610  // Unless we're stopping the service, start accepting connections again.
611  if (ec.value() != boost::asio::error::operation_aborted) {
612  doAccept();
613  }
614  });
615 }
616 
617 CommandMgr::CommandMgr()
618  : HookedCommandMgr(), impl_(new CommandMgrImpl()) {
619 }
620 
621 void
623  impl_->openCommandSocket(socket_info);
624 }
625 
627  // Close acceptor if the acceptor is open.
628  if (impl_->acceptor_ && impl_->acceptor_->isOpen()) {
629  isc::dhcp::IfaceMgr::instance().deleteExternalSocket(impl_->acceptor_->getNative());
630  impl_->acceptor_->close();
631  static_cast<void>(::remove(impl_->socket_name_.c_str()));
632  static_cast<void>(::remove(impl_->getLockName().c_str()));
633  }
634 
635  // Stop all connections which can be closed. The only connection that won't
636  // be closed is the one over which we have received a request to reconfigure
637  // the server. This connection will be held until the CommandMgr responds to
638  // such request.
639  impl_->connection_pool_.stopAll();
640 }
641 
642 int
644  return (impl_->acceptor_ ? impl_->acceptor_->getNative() : -1);
645 }
646 
647 
648 CommandMgr&
650  static CommandMgr cmd_mgr;
651  return (cmd_mgr);
652 }
653 
654 void
656  impl_->io_service_ = io_service;
657 }
658 
659 void
660 CommandMgr::setConnectionTimeout(const long timeout) {
661  impl_->timeout_ = timeout;
662 }
663 
664 
665 }; // end of isc::config
666 }; // end of isc
const isc::log::MessageID COMMAND_WATCH_SOCKET_MARK_READY_ERROR
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
Definition: macros.h:26
std::string socket_name_
Path to the unix domain socket descriptor.
Definition: command_mgr.cc:510
An exception indicating a problem with socket operation.
Definition: command_mgr.h:28
const isc::log::MessageID COMMAND_ACCEPTOR_START
ConnectionPool connection_pool_
Pool of connections.
Definition: command_mgr.cc:513
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
Definition: macros.h:20
ConstElementPtr createAnswer(const int status_code, const std::string &text, const ConstElementPtr &arg)
void addExternalSocket(int socketfd, SocketCallback callback)
Adds external socket and a callback.
Definition: iface_mgr.cc:324
const isc::log::MessageID COMMAND_SOCKET_READ
const isc::log::MessageID COMMAND_RESPONSE_ERROR
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition: macros.h:32
const int CONTROL_RESULT_ERROR
Status code indicating a general failure.
const int DBG_COMMAND
Definition: config_log.h:24
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_OPENED
boost::shared_ptr< WatchSocket > WatchSocketPtr
Defines a smart pointer to an instance of a WatchSocket.
Definition: watch_socket.h:138
const isc::log::MessageID COMMAND_SOCKET_ACCEPT_FAIL
An exception indicating that specified socket parameters are invalid.
Definition: command_mgr.h:21
constexpr long TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND
Timeout for the DHCP server to receive command over the unix domain socket.
Definition: timeouts.h:17
Defines the class, WatchSocket.
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLEAR_ERROR
Command Manager which can delegate commands to a hook library.
Implementation of the CommandMgr.
Definition: command_mgr.cc:473
std::string getLockName()
Returns the lock file name.
Definition: command_mgr.cc:493
long timeout_
Connection timeout.
Definition: command_mgr.cc:516
boost::shared_ptr< UnixDomainSocketAcceptor > acceptor_
Pointer to the acceptor service.
Definition: command_mgr.cc:502
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_TIMEOUT
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
CommandMgrImpl()
Constructor.
Definition: command_mgr.cc:477
isc::log::Logger command_logger("commands")
Command processing Logger.
Definition: config_log.h:21
void closeCommandSocket()
Shuts down any open control sockets.
Definition: command_mgr.cc:626
void deleteExternalSocket(int socketfd)
Deletes external socket.
Definition: iface_mgr.cc:347
IOServicePtr io_service_
Pointer to the IO service used by the server process for running asynchronous tasks.
Definition: command_mgr.cc:499
boost::shared_ptr< const Element > ConstElementPtr
Definition: data.h:23
void setConnectionTimeout(const long timeout)
Override default connection timeout.
Definition: command_mgr.cc:660
void setIOService(const asiolink::IOServicePtr &io_service)
Sets IO service to be used by the command manager.
Definition: command_mgr.cc:655
const isc::log::MessageID COMMAND_SOCKET_READ_FAIL
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
const isc::log::MessageID COMMAND_SOCKET_WRITE
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLOSE_ERROR
State model for asynchronous read of data in JSON format.
Definition: json_feed.h:71
This is a base class for exceptions thrown from the DNS library module.
void openCommandSocket(const isc::data::ConstElementPtr &socket_info)
Opens control socket with parameters specified in socket_info.
Definition: command_mgr.cc:622
Defines the logger used by the top-level component of kea-dhcp-ddns.
boost::shared_ptr< UnixDomainSocket > socket_
Pointer to the socket into which the new connection is accepted.
Definition: command_mgr.cc:505
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL
Commands Manager implementation for the Kea servers.
Definition: command_mgr.h:41
int getControlSocketFD()
Returns control socket descriptor.
Definition: command_mgr.cc:643
const isc::log::MessageID COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST
This file contains several functions and constants that are used for handling commands and responses ...
static IfaceMgr & instance()
IfaceMgr is a singleton class.
Definition: iface_mgr.cc:53
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
Definition: macros.h:14
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSE_FAIL
const isc::log::MessageID COMMAND_PROCESS_ERROR1
const isc::log::MessageID COMMAND_SOCKET_WRITE_FAIL
static CommandMgr & instance()
CommandMgr is a singleton class.
Definition: command_mgr.cc:649
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSED
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CANCEL_FAIL