Kea  1.9.9-git
client_connection.cc
Go to the documentation of this file.
1 // Copyright (C) 2017-2020 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
8 
12 #include <cc/json_feed.h>
14 #include <boost/enable_shared_from_this.hpp>
15 #include <array>
16 #include <functional>
17 
18 using namespace isc::asiolink;
19 
20 namespace isc {
21 namespace config {
22 
24 class ClientConnectionImpl : public boost::enable_shared_from_this<ClientConnectionImpl> {
25 public:
26 
30  explicit ClientConnectionImpl(IOService& io_service);
31 
37  void scheduleTimer(ClientConnection::Handler handler);
38 
50  void start(const ClientConnection::SocketPath& socket_path,
51  const ClientConnection::ControlCommand& command,
53  const ClientConnection::Timeout& timeout);
54 
56  void stop();
57 
67  void doSend(const void* buffer, const size_t length,
69 
79  void doReceive(ClientConnection::Handler handler);
80 
86  void terminate(const boost::system::error_code& ec,
88 
92  void timeoutCallback(ClientConnection::Handler handler);
93 
94 private:
95 
97  UnixDomainSocket socket_;
98 
102  JSONFeedPtr feed_;
103 
106  std::string current_command_;
107 
109  std::array<char, 32768> read_buf_;
110 
112  IntervalTimer timer_;
113 
115  long timeout_;
116 };
117 
118 ClientConnectionImpl::ClientConnectionImpl(IOService& io_service)
119  : socket_(io_service), feed_(), current_command_(), timer_(io_service),
120  timeout_(0) {
121 }
122 
123 void
125  if (timeout_ > 0) {
127  this, handler),
128  timeout_, IntervalTimer::ONE_SHOT);
129  }
130 }
131 
132 void
134  const ClientConnection::ControlCommand& command,
136  const ClientConnection::Timeout& timeout) {
137  // Start the timer protecting against timeouts.
138  timeout_ = timeout.timeout_;
139  scheduleTimer(handler);
140 
141  // Store the command in the class member to make sure it is valid
142  // the entire time.
143  current_command_.assign(command.control_command_);
144 
145  // Pass self to lambda to make sure that the instance of this class
146  // lives as long as the lambda is held for async connect.
147  auto self(shared_from_this());
148  // Start asynchronous connect. This will return immediately.
149  socket_.asyncConnect(socket_path.socket_path_,
150  [this, self, command, handler](const boost::system::error_code& ec) {
151  // We failed to connect so we can't proceed. Simply clean up
152  // and invoke the user callback to signal an error.
153  if (ec) {
154  // This doesn't throw.
155  terminate(ec, handler);
156 
157  } else {
158  // Connection successful. Transmit the command to the remote
159  // endpoint asynchronously.
160  doSend(current_command_.c_str(), current_command_.length(),
161  handler);
162  }
163  });
164 }
165 
166 void
167 ClientConnectionImpl::doSend(const void* buffer, const size_t length,
168  ClientConnection::Handler handler) {
169  // Pass self to lambda to make sure that the instance of this class
170  // lives as long as the lambda is held for async send.
171  auto self(shared_from_this());
172  // Start asynchronous transmission of the command. This will return
173  // immediately.
174  socket_.asyncSend(buffer, length,
175  [this, self, buffer, length, handler]
176  (const boost::system::error_code& ec, size_t bytes_transferred) {
177  // An error has occurred while sending. Close the connection and
178  // signal an error.
179  if (ec) {
180  // This doesn't throw.
181  terminate(ec, handler);
182 
183  } else {
184  // Sending is in progress, so push back the timeout.
185  scheduleTimer(handler);
186 
187  // If the number of bytes we have managed to send so far is
188  // lower than the amount of data we're trying to send, we
189  // have to schedule another send to deliver the rest of
190  // the data.
191  if (bytes_transferred < length) {
192  doSend(static_cast<const char*>(buffer) + bytes_transferred,
193  length - bytes_transferred, handler);
194 
195  } else {
196  // We have sent all the data. Start receiving a response.
197  doReceive(handler);
198  }
199  }
200  });
201 }
202 
203 void
204 ClientConnectionImpl::doReceive(ClientConnection::Handler handler) {
205  // Pass self to lambda to make sure that the instance of this class
206  // lives as long as the lambda is held for async receive.
207  auto self(shared_from_this());
208  socket_.asyncReceive(&read_buf_[0], read_buf_.size(),
209  [this, self, handler]
210  (const boost::system::error_code& ec, size_t length) {
211  // An error has occurred while receiving the data. Close the connection
212  // and signal an error.
213  if (ec) {
214  // This doesn't throw.
215  terminate(ec, handler);
216 
217  } else {
218  // Receiving is in progress, so push back the timeout.
219  scheduleTimer(handler);
220 
221  std::string x(&read_buf_[0], length);
222  // Lazy initialization of the JSONFeed. The feed will be "parsing"
223  // received JSON stream and will detect when the whole response
224  // has been received.
225  if (!feed_) {
226  feed_.reset(new JSONFeed());
227  feed_->initModel();
228  }
229  // Put everything we have received so far into the feed and process
230  // the data.
231  feed_->postBuffer(&read_buf_[0], length);
232  feed_->poll();
233  // If the feed indicates that only a part of the response has been
234  // received, schedule another receive to get more data.
235  if (feed_->needData()) {
236  doReceive(handler);
237 
238  } else {
239  // We have received the entire response, let's call the handler
240  // and indicate success.
241  terminate(ec, handler);
242  }
243  }
244  });
245 }
246 
247 void
248 ClientConnectionImpl::terminate(const boost::system::error_code& ec,
249  ClientConnection::Handler handler) {
250  try {
251  timer_.cancel();
252  socket_.close();
253  current_command_.clear();
254  handler(ec, feed_);
255 
256  } catch (...) {
257  // None of these operations should throw. In particular, the handler
258  // should not throw but if it has been misimplemented, we want to make
259  // sure we don't emit any exceptions from here.
260  }
261 }
262 
263 void
264 ClientConnectionImpl::timeoutCallback(ClientConnection::Handler handler) {
265  // Timeout has occurred. The remote server didn't provide the entire
266  // response within the given time frame. Let's close the connection
267  // and signal the timeout.
268  terminate(boost::asio::error::timed_out, handler);
269 }
270 
271 ClientConnection::ClientConnection(asiolink::IOService& io_service)
272  : impl_(new ClientConnectionImpl(io_service)) {
273 }
274 
275 void
277  const ClientConnection::ControlCommand& command,
279  const ClientConnection::Timeout& timeout) {
280  impl_->start(socket_path, command, handler, timeout);
281 }
282 
283 
284 } // end of namespace config
285 } // end of namespace isc
void timeoutCallback(ClientConnection::Handler handler)
Callback invoked when the timeout occurs.
boost::shared_ptr< JSONFeed > JSONFeedPtr
Pointer to the JSONFeed.
Definition: json_feed.h:21
Implementation of the ClientConnection.
void start(const ClientConnection::SocketPath &socket_path, const ClientConnection::ControlCommand &command, ClientConnection::Handler handler, const ClientConnection::Timeout &timeout)
Starts asynchronous transaction with a remote endpoint.
State model for asynchronous read of data in JSON format.
Definition: json_feed.h:71
Defines the logger used by the top-level component of kea-dhcp-ddns.
void scheduleTimer(ClientConnection::Handler handler)
This method schedules timer or reschedules existing timer.
Encapsulates timeout value.
void start(const SocketPath &socket_path, const ControlCommand &command, Handler handler, const Timeout &timeout=Timeout(5000))
Starts asynchronous transaction with a remote endpoint.
std::function< void(const boost::system::error_code &ec, const ConstJSONFeedPtr &feed)> Handler
Type of the callback invoked when the communication with the server is complete or an error has occur...