Kea  1.9.9-git
io_fetch.cc
Go to the documentation of this file.
1 // Copyright (C) 2011-2020 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
9 #include <asiolink/io_address.h>
11 #include <asiolink/io_endpoint.h>
12 #include <asiolink/io_service.h>
13 #include <asiolink/tcp_endpoint.h>
14 #include <asiolink/tcp_socket.h>
15 #include <asiolink/udp_endpoint.h>
16 #include <asiolink/udp_socket.h>
17 #include <asiodns/io_fetch.h>
18 #include <asiodns/logger.h>
19 #include <dns/messagerenderer.h>
20 #include <dns/opcode.h>
21 #include <dns/rcode.h>
22 #include <util/buffer.h>
23 #include <util/random/qid_gen.h>
24 
25 #include <boost/scoped_ptr.hpp>
26 #include <boost/date_time/posix_time/posix_time_types.hpp>
27 
28 #include <functional>
29 #include <unistd.h> // for some IPC/network system calls
30 #include <netinet/in.h>
31 #include <stdint.h>
32 #include <sys/socket.h>
33 
34 using namespace boost::asio;
35 using namespace isc::asiolink;
36 using namespace isc::dns;
37 using namespace isc::util;
38 using namespace isc::util::random;
39 using namespace isc::log;
40 using namespace std;
41 
42 namespace isc {
43 namespace asiodns {
44 
45 // Log debug verbosity
46 
49 const int DBG_ALL = DBGLVL_TRACE_DETAIL + 20;
50 
58 struct IOFetchData {
59 
60  // The first two members are shared pointers to a base class because what is
61  // actually instantiated depends on whether the fetch is over UDP or TCP,
62  // which is not known until construction of the IOFetch. Use of a shared
63  // pointer here is merely to ensure deletion when the data object is deleted.
64  boost::scoped_ptr<IOAsioSocket<IOFetch> > socket;
66  boost::scoped_ptr<IOEndpoint> remote_snd;
67  boost::scoped_ptr<IOEndpoint> remote_rcv;
71  boost::asio::deadline_timer timer;
73  size_t cumulative;
74  size_t expected;
75  size_t offset;
76  bool stopped;
77  int timeout;
78  bool packet;
79 
80  // In case we need to log an error, the origin of the last asynchronous
81  // I/O is recorded. To save time and simplify the code, this is recorded
82  // as the ID of the error message that would be generated if the I/O failed.
83  // This means that we must make sure that all possible "origins" take the
84  // same arguments in their message in the same order.
86  uint8_t staging[IOFetch::STAGING_LENGTH];
89 
108  const IOAddress& address, uint16_t port, OutputBufferPtr& buff,
109  IOFetch::Callback* cb, int wait)
110  :
111  socket((proto == IOFetch::UDP) ?
112  static_cast<IOAsioSocket<IOFetch>*>(
113  new UDPSocket<IOFetch>(service)) :
114  static_cast<IOAsioSocket<IOFetch>*>(
115  new TCPSocket<IOFetch>(service))
116  ),
117  remote_snd((proto == IOFetch::UDP) ?
118  static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
119  static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
120  ),
121  remote_rcv((proto == IOFetch::UDP) ?
122  static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
123  static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
124  ),
125  msgbuf(new OutputBuffer(512)),
126  received(buff),
127  callback(cb),
128  timer(service.get_io_service()),
129  protocol(proto),
130  cumulative(0),
131  expected(0),
132  offset(0),
133  stopped(false),
134  timeout(wait),
135  packet(false),
136  origin(ASIODNS_UNKNOWN_ORIGIN),
137  staging(),
138  qid(QidGenerator::getInstance().generateQid())
139  {}
140 
141  // Checks if the response we received was ok;
142  // - data contains the buffer we read, as well as the address
143  // we sent to and the address we received from.
144  // length is provided by the operator() in IOFetch.
145  // Addresses must match, number of octets read must be at least
146  // 2, and the first two octets must match the qid of the message
147  // we sent.
148  bool responseOK() {
149  return (*remote_snd == *remote_rcv && cumulative >= 2 &&
150  readUint16(received->getData(), received->getLength()) == qid);
151  }
152 };
153 
155 
156 IOFetch::IOFetch(Protocol protocol, IOService& service,
157  const isc::dns::Question& question, const IOAddress& address,
158  uint16_t port, OutputBufferPtr& buff, Callback* cb, int wait, bool edns)
159 {
160  MessagePtr query_msg(new Message(Message::RENDER));
161  initIOFetch(query_msg, protocol, service, question, address, port, buff,
162  cb, wait, edns);
163 }
164 
165 IOFetch::IOFetch(Protocol protocol, IOService& service,
166  OutputBufferPtr& outpkt, const IOAddress& address, uint16_t port,
167  OutputBufferPtr& buff, Callback* cb, int wait)
168  :
169  data_(new IOFetchData(protocol, service,
170  address, port, buff, cb, wait))
171 {
172  data_->msgbuf = outpkt;
173  data_->packet = true;
174 }
175 
177  ConstMessagePtr query_message, const IOAddress& address, uint16_t port,
178  OutputBufferPtr& buff, Callback* cb, int wait)
179 {
180  MessagePtr msg(new Message(Message::RENDER));
181 
182  msg->setHeaderFlag(Message::HEADERFLAG_RD,
183  query_message->getHeaderFlag(Message::HEADERFLAG_RD));
184  msg->setHeaderFlag(Message::HEADERFLAG_CD,
185  query_message->getHeaderFlag(Message::HEADERFLAG_CD));
186 
187  initIOFetch(msg, protocol, service,
188  **(query_message->beginQuestion()),
189  address, port, buff, cb, wait);
190 }
191 
192 void
193 IOFetch::initIOFetch(MessagePtr& query_msg, Protocol protocol,
194  IOService& service,
195  const isc::dns::Question& question,
196  const IOAddress& address, uint16_t port,
197  OutputBufferPtr& buff, Callback* cb, int wait, bool edns)
198 {
199  data_ = boost::shared_ptr<IOFetchData>(new IOFetchData(
200  protocol, service, address, port, buff, cb, wait));
201 
202  query_msg->setQid(data_->qid);
203  query_msg->setOpcode(Opcode::QUERY());
204  query_msg->setRcode(Rcode::NOERROR());
205  query_msg->setHeaderFlag(Message::HEADERFLAG_RD);
206  query_msg->addQuestion(question);
207 
208  if (edns) {
209  EDNSPtr edns_query(new EDNS());
210  edns_query->setUDPSize(Message::DEFAULT_MAX_EDNS0_UDPSIZE);
211  query_msg->setEDNS(edns_query);
212  }
213 
214  MessageRenderer renderer;
215  renderer.setBuffer(data_->msgbuf.get());
216  query_msg->toWire(renderer);
217  renderer.setBuffer(NULL);
218 }
219 
220 // Return protocol in use.
221 
224  return (data_->protocol);
225 }
226 
229 
230 void
231 IOFetch::operator()(boost::system::error_code ec, size_t length) {
232 
233  if (data_->stopped) {
234  return;
235 
236  // On Debian it has been often observed that boost::asio async
237  // operations result in EINPROGRESS. This doesn't necessarily
238  // indicate an issue. Thus, we continue as if no error occurred.
239  } else if (ec && (ec.value() != boost::asio::error::in_progress)) {
240  logIOFailure(ec);
241  return;
242  }
243 
244  BOOST_ASIO_CORO_REENTER (this) {
245 
249  {
250  if (data_->packet) {
251  // A packet was given, overwrite the QID (which is in the
252  // first two bytes of the packet).
253  data_->msgbuf->writeUint16At(data_->qid, 0);
254 
255  }
256  }
257 
258  // If we timeout, we stop, which will can cancel outstanding I/Os and
259  // shutdown everything.
260  if (data_->timeout != -1) {
261  data_->timer.expires_from_now(boost::posix_time::milliseconds(
262  data_->timeout));
263  data_->timer.async_wait(std::bind(&IOFetch::stop, *this,
264  TIME_OUT));
265  }
266 
267  // Open a connection to the target system. For speed, if the operation
268  // is synchronous (i.e. UDP operation) we bypass the yield.
269  data_->origin = ASIODNS_OPEN_SOCKET;
270  if (data_->socket->isOpenSynchronous()) {
271  data_->socket->open(data_->remote_snd.get(), *this);
272  } else {
273  BOOST_ASIO_CORO_YIELD data_->socket->open(data_->remote_snd.get(), *this);
274  }
275 
276  do {
277  // Begin an asynchronous send, and then yield. When the send completes,
278  // we will resume immediately after this point.
279  data_->origin = ASIODNS_SEND_DATA;
280  BOOST_ASIO_CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
281  data_->msgbuf->getLength(), data_->remote_snd.get(), *this);
282 
283  // Now receive the response. Since TCP may not receive the entire
284  // message in one operation, we need to loop until we have received
285  // it. (This can't be done within the asyncReceive() method because
286  // each I/O operation will be done asynchronously and between each one
287  // we need to yield ... and we *really* don't want to set up another
288  // coroutine within that method.) So after each receive (and yield),
289  // we check if the operation is complete and if not, loop to read again.
290  //
291  // Another concession to TCP is that the amount of is contained in the
292  // first two bytes. This leads to two problems:
293  //
294  // a) We don't want those bytes in the return buffer.
295  // b) They may not both arrive in the first I/O.
296  //
297  // So... we need to loop until we have at least two bytes, then store
298  // the expected amount of data. Then we need to loop until we have
299  // received all the data before copying it back to the user's buffer.
300  // And we want to minimize the amount of copying...
301 
302  data_->origin = ASIODNS_READ_DATA;
303  data_->cumulative = 0; // No data yet received
304  data_->offset = 0; // First data into start of buffer
305  data_->received->clear(); // Clear the receive buffer
306  do {
307  BOOST_ASIO_CORO_YIELD data_->socket->asyncReceive(data_->staging,
308  static_cast<size_t>(STAGING_LENGTH),
309  data_->offset,
310  data_->remote_rcv.get(), *this);
311  } while (!data_->socket->processReceivedData(data_->staging, length,
312  data_->cumulative, data_->offset,
313  data_->expected, data_->received));
314  } while (!data_->responseOK());
315 
316  // Finished with this socket, so close it. This will not generate an
317  // I/O error, but reset the origin to unknown in case we change this.
318  data_->origin = ASIODNS_UNKNOWN_ORIGIN;
319  data_->socket->close();
320 
322  stop(SUCCESS);
323  }
324 }
325 
326 // Function that stops the coroutine sequence. It is called either when the
327 // query finishes or when the timer times out. Either way, it sets the
328 // "stopped_" flag and cancels anything that is in progress.
329 //
330 // As the function may be entered multiple times as things wind down, it checks
331 // if the stopped_ flag is already set. If it is, the call is a no-op.
332 
333 void
335 
336  if (!data_->stopped) {
337 
338  // Mark the fetch as stopped to prevent other completion callbacks
339  // (invoked because of the calls to cancel()) from executing the
340  // cancel calls again.
341  //
342  // In a single threaded environment, the callbacks won't be invoked
343  // until this one completes. In a multi-threaded environment, they may
344  // well be, in which case the testing (and setting) of the stopped_
345  // variable should be done inside a mutex (and the stopped_ variable
346  // declared as "volatile").
347  //
348  // TODO: Update testing of stopped_ if threads are used.
349  data_->stopped = true;
350  switch (result) {
351  case TIME_OUT:
352  LOG_DEBUG(logger, DBG_COMMON, ASIODNS_READ_TIMEOUT).
353  arg(data_->remote_snd->getAddress().toText()).
354  arg(data_->remote_snd->getPort());
355  break;
356 
357  case SUCCESS:
359  arg(data_->remote_rcv->getAddress().toText()).
360  arg(data_->remote_rcv->getPort());
361  break;
362 
363  case STOPPED:
364  // Fetch has been stopped for some other reason. This is
365  // allowed but as it is unusual it is logged, but with a lower
366  // debug level than a timeout (which is totally normal).
367  LOG_DEBUG(logger, DBG_IMPORTANT, ASIODNS_FETCH_STOPPED).
368  arg(data_->remote_snd->getAddress().toText()).
369  arg(data_->remote_snd->getPort());
370  break;
371 
372  default:
374  arg(data_->remote_snd->getAddress().toText()).
375  arg(data_->remote_snd->getPort());
376  }
377 
378  // Stop requested, cancel and I/O's on the socket and shut it down,
379  // and cancel the timer.
380  data_->socket->cancel();
381  data_->socket->close();
382 
383  data_->timer.cancel();
384 
385  // Execute the I/O completion callback (if present).
386  if (data_->callback) {
387  (*(data_->callback))(result);
388  }
389  }
390 }
391 
392 // Log an error - called on I/O failure
393 
394 void IOFetch::logIOFailure(boost::system::error_code ec) {
395 
396  // Should only get here with a known error code.
397  if ((data_->origin != ASIODNS_OPEN_SOCKET) &&
398  (data_->origin != ASIODNS_SEND_DATA) &&
399  (data_->origin != ASIODNS_READ_DATA) &&
400  (data_->origin != ASIODNS_UNKNOWN_ORIGIN)) {
401  isc_throw(isc::Unexpected, "impossible error code " << data_->origin);
402  }
403 
404  LOG_ERROR(logger, data_->origin).arg(ec.value()).
405  arg((data_->remote_snd->getProtocol() == IPPROTO_TCP) ?
406  "TCP" : "UDP").
407  arg(data_->remote_snd->getAddress().toText()).
408  arg(data_->remote_snd->getPort());
409 }
410 
411 } // namespace asiodns
412 } // namespace isc {
void setBuffer(isc::util::OutputBuffer *buffer)
Set or reset a temporary output buffer.
uint16_t qid_t
Definition: message.h:75
const isc::log::MessageID ASIODNS_SEND_DATA
boost::asio::deadline_timer timer
Timer to measure timeouts.
Definition: io_fetch.cc:71
Upstream Fetch Processing.
Definition: io_fetch.h:45
isc::log::MessageID origin
Origin of last asynchronous I/O.
Definition: io_fetch.cc:85
void stop(Result reason=STOPPED)
Terminate query.
Definition: io_fetch.cc:334
Control code, fetch has been stopped.
Definition: io_fetch.h:73
Protocol
Protocol to use on the fetch.
Definition: io_fetch.h:48
const int DBGLVL_TRACE_BASIC
Trace basic operations.
Definition: log_dbglevels.h:65
The Question class encapsulates the common search key of DNS lookup, consisting of owner name...
Definition: question.h:95
const isc::log::MessageID ASIODNS_UNKNOWN_ORIGIN
const isc::log::MessageID ASIODNS_FETCH_STOPPED
size_t offset
Offset to receive data.
Definition: io_fetch.cc:75
Size of staging buffer.
Definition: io_fetch.h:82
void operator()(boost::system::error_code ec=boost::system::error_code(), size_t length=0)
Coroutine entry point.
Definition: io_fetch.cc:231
const isc::log::MessageID ASIODNS_FETCH_COMPLETED
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition: macros.h:32
STL namespace.
const int DBG_COMMON
Definition: io_fetch.cc:48
Failure, fetch timed out.
Definition: io_fetch.h:72
isc::dns::qid_t qid
The QID set in the query.
Definition: io_fetch.cc:88
The Message class encapsulates a standard DNS message.
Definition: message.h:143
Protocol getProtocol() const
Return Current Protocol.
Definition: io_fetch.cc:223
bool stopped
Have we stopped running?
Definition: io_fetch.cc:76
IOFetch(Protocol protocol, isc::asiolink::IOService &service, const isc::dns::Question &question, const isc::asiolink::IOAddress &address, uint16_t port, isc::util::OutputBufferPtr &buff, Callback *cb, int wait=-1, bool edns=true)
Constructor.
Definition: io_fetch.cc:156
boost::shared_ptr< Message > MessagePtr
Pointer-like type pointing to a Message.
Definition: message.h:662
OutputBufferPtr msgbuf
Wire buffer for question.
Definition: io_fetch.cc:68
size_t cumulative
Cumulative received amount.
Definition: io_fetch.cc:73
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
boost::scoped_ptr< IOEndpoint > remote_snd
Where the fetch is sent.
Definition: io_fetch.cc:66
Definition: edns.h:19
boost::shared_ptr< EDNS > EDNSPtr
A pointer-like type pointing to an EDNS object.
Definition: edns.h:31
IOFetch Data.
Definition: io_fetch.cc:58
A generic exception that is thrown when an unexpected error condition occurs.
The EDNS class represents the EDNS OPT RR defined in RFC2671.
Definition: edns.h:123
IOFetch::Protocol protocol
Protocol being used.
Definition: io_fetch.cc:72
const int DBG_ALL
Definition: io_fetch.cc:49
The OutputBuffer class is a buffer abstraction for manipulating mutable data.
Definition: buffer.h:294
OutputBufferPtr received
Received data put here.
Definition: io_fetch.cc:69
Defines the logger used by the top-level component of kea-dhcp-ddns.
isc::log::Logger logger("asiodns")
Use the ASIO logger.
const isc::log::MessageID ASIODNS_READ_TIMEOUT
boost::scoped_ptr< IOAsioSocket< IOFetch > > socket
Socket to use for I/O.
Definition: io_fetch.cc:64
The MessageRenderer is a concrete derived class of AbstractMessageRenderer as a general purpose imple...
boost::scoped_ptr< IOEndpoint > remote_rcv
Where the response came from.
Definition: io_fetch.cc:67
size_t expected
Expected amount of data.
Definition: io_fetch.cc:74
uint16_t readUint16(const void *buffer, size_t length)
Read Unsigned 16-Bit Integer from Buffer.
Definition: io_utilities.h:28
Result
Result of Upstream Fetch.
Definition: io_fetch.h:70
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
Definition: macros.h:14
IOFetchData(IOFetch::Protocol proto, IOService &service, const IOAddress &address, uint16_t port, OutputBufferPtr &buff, IOFetch::Callback *cb, int wait)
Constructor.
Definition: io_fetch.cc:107
int timeout
Timeout in ms.
Definition: io_fetch.cc:77
IOFetch::Callback * callback
Called on I/O Completion.
Definition: io_fetch.cc:70
boost::shared_ptr< OutputBuffer > OutputBufferPtr
Definition: buffer.h:599
const int DBGLVL_TRACE_DETAIL
Trace detailed operations.
Definition: log_dbglevels.h:71
const isc::log::MessageID ASIODNS_UNKNOWN_RESULT
Success, fetch completed.
Definition: io_fetch.h:71
This class generates Qids for outgoing queries.
Definition: qid_gen.h:33
const isc::log::MessageID ASIODNS_READ_DATA
bool packet
true if packet was supplied
Definition: io_fetch.cc:78
boost::shared_ptr< const Message > ConstMessagePtr
Definition: message.h:663
const char * MessageID
Definition: message_types.h:15
I/O Fetch Callback.
Definition: io_fetch.h:98
const int DBG_IMPORTANT
Definition: io_fetch.cc:47
const isc::log::MessageID ASIODNS_OPEN_SOCKET