23 #include <boost/enable_shared_from_this.hpp>
33 namespace ph = std::placeholders;
38 const size_t BUF_SIZE = 32768;
46 class Connection :
public boost::enable_shared_from_this<Connection> {
67 const boost::shared_ptr<UnixDomainSocket>& socket,
68 ConnectionPool& connection_pool,
70 : socket_(socket), timeout_timer_(*io_service), timeout_(timeout),
71 buf_(), response_(), connection_pool_(connection_pool), feed_(),
72 response_in_progress_(false), watch_socket_(new util::WatchSocket()) {
75 .arg(socket_->getNative());
93 timeout_timer_.cancel();
97 void scheduleTimer() {
98 timeout_timer_.setup(std::bind(&Connection::timeoutHandler,
this),
109 if (!response_in_progress_) {
111 .arg(socket_->getNative());
117 std::string watch_error;
118 if (!watch_socket_->closeSocket(watch_error)) {
124 timeout_timer_.cancel();
140 socket_->asyncReceive(&buf_[0],
sizeof(buf_),
141 std::bind(&Connection::receiveHandler,
142 shared_from_this(), ph::_1, ph::_2));
153 size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
154 socket_->asyncSend(&response_[0], chunk_size,
155 std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
161 watch_socket_->markReady();
163 }
catch (
const std::exception& ex) {
181 void receiveHandler(
const boost::system::error_code& ec,
182 size_t bytes_transferred);
193 void sendHandler(
const boost::system::error_code& ec,
194 size_t bytes_transferred);
200 void timeoutHandler();
205 boost::shared_ptr<UnixDomainSocket> socket_;
214 std::array<char, BUF_SIZE> buf_;
217 std::string response_;
220 ConnectionPool& connection_pool_;
228 bool response_in_progress_;
236 typedef boost::shared_ptr<Connection> ConnectionPtr;
239 class ConnectionPool {
245 void start(
const ConnectionPtr& connection) {
246 connection->doReceive();
247 connections_.insert(connection);
253 void stop(
const ConnectionPtr& connection) {
256 connections_.erase(connection);
257 }
catch (
const std::exception& ex) {
265 for (
auto conn = connections_.begin(); conn != connections_.end();
269 connections_.clear();
275 std::set<ConnectionPtr> connections_;
280 Connection::terminate() {
284 }
catch (
const std::exception& ex) {
291 Connection::receiveHandler(
const boost::system::error_code& ec,
292 size_t bytes_transferred) {
294 if (ec.value() == boost::asio::error::eof) {
295 std::stringstream os;
296 if (feed_.getProcessedText().empty()) {
297 os <<
"no input data to discard";
300 os <<
"discarding partial command of "
301 << feed_.getProcessedText().size() <<
" bytes";
307 .arg(socket_->getNative()).arg(os.str());
308 }
else if (ec.value() != boost::asio::error::operation_aborted) {
310 .arg(ec.value()).arg(socket_->getNative());
313 connection_pool_.stop(shared_from_this());
316 }
else if (bytes_transferred == 0) {
318 connection_pool_.stop(shared_from_this());
323 .arg(bytes_transferred).arg(socket_->getNative());
333 feed_.postBuffer(&buf_[0], bytes_transferred);
336 if (feed_.needData()) {
342 if (feed_.feedOk()) {
344 response_in_progress_ =
true;
348 timeout_timer_.cancel();
351 rsp = CommandMgr::instance().processCommand(cmd);
353 response_in_progress_ =
false;
371 "internal server error: no response generated");
381 response_ = rsp->str();
388 connection_pool_.stop(shared_from_this());
392 Connection::sendHandler(
const boost::system::error_code& ec,
393 size_t bytes_transferred) {
397 watch_socket_->clearReady();
399 }
catch (
const std::exception& ex) {
406 if (ec.value() != boost::asio::error::operation_aborted) {
408 .arg(socket_->getNative()).arg(ec.message());
419 response_.erase(0, bytes_transferred);
422 .arg(bytes_transferred).arg(response_.size())
423 .arg(socket_->getNative());
426 if (!response_.empty()) {
437 connection_pool_.stop(shared_from_this());
441 Connection::timeoutHandler() {
443 .arg(socket_->getNative());
448 }
catch (
const std::exception& ex) {
450 .arg(socket_->getNative())
454 std::stringstream os;
455 os <<
"Connection over control channel timed out";
456 if (!feed_.getProcessedText().empty()) {
457 os <<
", discarded partial command of "
458 << feed_.getProcessedText().size() <<
" bytes";
462 response_ = rsp->str();
478 : io_service_(), acceptor_(), socket_(), socket_name_(),
494 return (std::string(socket_name_ +
".lock"));
521 socket_name_.clear();
533 if (type->stringValue() !=
"unix") {
535 << type->stringValue());
545 if (name->getType() != Element::string) {
549 socket_name_ = name->stringValue();
552 std::string lock_name = getLockName();
553 int lock_fd = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
555 std::string errmsg = strerror(errno);
557 << lock_name <<
", : " << errmsg);
562 int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
564 std::string errmsg = strerror(errno);
566 << lock_name <<
", : " << errmsg);
571 static_cast<void>(::remove(socket_name_.c_str()));
580 acceptor_->open(endpoint);
581 acceptor_->bind(endpoint);
588 }
catch (
const std::exception& ex) {
594 CommandMgrImpl::doAccept() {
597 acceptor_->asyncAccept(*socket_, [
this](
const boost::system::error_code& ec) {
600 ConnectionPtr connection(
new Connection(io_service_, socket_,
603 connection_pool_.start(connection);
605 }
else if (ec.value() != boost::asio::error::operation_aborted) {
607 .arg(acceptor_->getNative()).arg(ec.message());
611 if (ec.value() != boost::asio::error::operation_aborted) {
617 CommandMgr::CommandMgr()
623 impl_->openCommandSocket(socket_info);
628 if (impl_->acceptor_ && impl_->acceptor_->isOpen()) {
630 impl_->acceptor_->close();
631 static_cast<void>(::remove(impl_->socket_name_.c_str()));
632 static_cast<void>(::remove(impl_->getLockName().c_str()));
639 impl_->connection_pool_.stopAll();
644 return (impl_->acceptor_ ? impl_->acceptor_->getNative() : -1);
656 impl_->io_service_ = io_service;
661 impl_->timeout_ = timeout;
const isc::log::MessageID COMMAND_WATCH_SOCKET_MARK_READY_ERROR
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
std::string socket_name_
Path to the unix domain socket descriptor.
An exception indicating a problem with socket operation.
const isc::log::MessageID COMMAND_ACCEPTOR_START
ConnectionPool connection_pool_
Pool of connections.
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
ConstElementPtr createAnswer(const int status_code, const std::string &text, const ConstElementPtr &arg)
void addExternalSocket(int socketfd, SocketCallback callback)
Adds external socket and a callback.
const isc::log::MessageID COMMAND_SOCKET_READ
const isc::log::MessageID COMMAND_RESPONSE_ERROR
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
boost::shared_ptr< IOService > IOServicePtr
Defines a smart pointer to an IOService instance.
const int CONTROL_RESULT_ERROR
Status code indicating a general failure.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_OPENED
boost::shared_ptr< WatchSocket > WatchSocketPtr
Defines a smart pointer to an instance of a WatchSocket.
const isc::log::MessageID COMMAND_SOCKET_ACCEPT_FAIL
An exception indicating that specified socket parameters are invalid.
constexpr long TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND
Timeout for the DHCP server to receive command over the unix domain socket.
Defines the class, WatchSocket.
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLEAR_ERROR
Command Manager which can delegate commands to a hook library.
Implementation of the CommandMgr.
std::string getLockName()
Returns the lock file name.
long timeout_
Connection timeout.
boost::shared_ptr< UnixDomainSocketAcceptor > acceptor_
Pointer to the acceptor service.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_TIMEOUT
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
CommandMgrImpl()
Constructor.
The IntervalTimer class is a wrapper for the ASIO boost::asio::deadline_timer class.
isc::log::Logger command_logger("commands")
Command processing Logger.
void closeCommandSocket()
Shuts down any open control sockets.
void deleteExternalSocket(int socketfd)
Deletes external socket.
Represents unix domain socket implemented in terms of boost asio.
Endpoint for UnixDomainSocket.
IOServicePtr io_service_
Pointer to the IO service used by the server process for running asynchronous tasks.
boost::shared_ptr< const Element > ConstElementPtr
void setConnectionTimeout(const long timeout)
Override default connection timeout.
void setIOService(const asiolink::IOServicePtr &io_service)
Sets IO service to be used by the command manager.
const isc::log::MessageID COMMAND_SOCKET_READ_FAIL
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
const isc::log::MessageID COMMAND_SOCKET_WRITE
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLOSE_ERROR
State model for asynchronous read of data in JSON format.
This is a base class for exceptions thrown from the DNS library module.
void openCommandSocket(const isc::data::ConstElementPtr &socket_info)
Opens control socket with parameters specified in socket_info.
Defines the logger used by the top-level component of kea-dhcp-ddns.
boost::shared_ptr< UnixDomainSocket > socket_
Pointer to the socket into which the new connection is accepted.
Implements acceptor service for UnixDomainSocket.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL
Commands Manager implementation for the Kea servers.
int getControlSocketFD()
Returns control socket descriptor.
const isc::log::MessageID COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST
This file contains several functions and constants that are used for handling commands and responses ...
static IfaceMgr & instance()
IfaceMgr is a singleton class.
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSE_FAIL
const isc::log::MessageID COMMAND_PROCESS_ERROR1
const isc::log::MessageID COMMAND_SOCKET_WRITE_FAIL
static CommandMgr & instance()
CommandMgr is a singleton class.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSED
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CANCEL_FAIL