Kea  1.9.9-git
ncr_io.cc
Go to the documentation of this file.
1 // Copyright (C) 2013-2020 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
10 #include <dhcp_ddns/ncr_io.h>
12 
13 #include <boost/algorithm/string/predicate.hpp>
14 
15 #include <mutex>
16 
17 namespace isc {
18 namespace dhcp_ddns {
19 
20 using namespace isc::util;
21 using namespace std;
22 
23 NameChangeProtocol stringToNcrProtocol(const std::string& protocol_str) {
24  if (boost::iequals(protocol_str, "UDP")) {
25  return (NCR_UDP);
26  }
27 
28  if (boost::iequals(protocol_str, "TCP")) {
29  return (NCR_TCP);
30  }
31 
33  "Invalid NameChangeRequest protocol: " << protocol_str);
34 }
35 
36 std::string ncrProtocolToString(NameChangeProtocol protocol) {
37  switch (protocol) {
38  case NCR_UDP:
39  return ("UDP");
40  case NCR_TCP:
41  return ("TCP");
42  default:
43  break;
44  }
45 
46  std::ostringstream stream;
47  stream << "UNKNOWN(" << protocol << ")";
48  return (stream.str());
49 }
50 
51 
52 //************************** NameChangeListener ***************************
53 
55  recv_handler)
56  : listening_(false), io_pending_(false), recv_handler_(recv_handler) {
57 };
58 
59 
60 void
62  if (amListening()) {
63  // This amounts to a programmatic error.
64  isc_throw(NcrListenerError, "NameChangeListener is already listening");
65  }
66 
67  // Call implementation dependent open.
68  try {
69  open(io_service);
70  } catch (const isc::Exception& ex) {
71  stopListening();
72  isc_throw(NcrListenerOpenError, "Open failed: " << ex.what());
73  }
74 
75  // Set our status to listening.
76  setListening(true);
77 
78  // Start the first asynchronous receive.
79  try {
80  receiveNext();
81  } catch (const isc::Exception& ex) {
82  stopListening();
83  isc_throw(NcrListenerReceiveError, "doReceive failed: " << ex.what());
84  }
85 }
86 
87 void
89  io_pending_ = true;
90  doReceive();
91 }
92 
93 void
95  try {
96  // Call implementation dependent close.
97  close();
98  } catch (const isc::Exception &ex) {
99  // Swallow exceptions. If we have some sort of error we'll log
100  // it but we won't propagate the throw.
102  .arg(ex.what());
103  }
104 
105  // Set it false, no matter what. This allows us to at least try to
106  // re-open via startListening().
107  setListening(false);
108 }
109 
110 void
112  NameChangeRequestPtr& ncr) {
113  // Call the registered application layer handler.
114  // Surround the invocation with a try-catch. The invoked handler is
115  // not supposed to throw, but in the event it does we will at least
116  // report it.
117  try {
118  io_pending_ = false;
119  recv_handler_(result, ncr);
120  } catch (const std::exception& ex) {
122  .arg(ex.what());
123  }
124 
125  // Start the next IO layer asynchronous receive.
126  // In the event the handler above intervened and decided to stop listening
127  // we need to check that first.
128  if (amListening()) {
129  try {
130  receiveNext();
131  } catch (const isc::Exception& ex) {
132  // It is possible though unlikely, for doReceive to fail without
133  // scheduling the read. While, unlikely, it does mean the callback
134  // will not get called with a failure. A throw here would surface
135  // at the IOService::run (or run variant) invocation. So we will
136  // close the window by invoking the application handler with
137  // a failed result, and let the application layer sort it out.
139  .arg(ex.what());
140 
141  // Call the registered application layer handler.
142  // Surround the invocation with a try-catch. The invoked handler is
143  // not supposed to throw, but in the event it does we will at least
144  // report it.
145  NameChangeRequestPtr empty;
146  try {
147  io_pending_ = false;
148  recv_handler_(ERROR, empty);
149  } catch (const std::exception& ex) {
152  .arg(ex.what());
153  }
154  }
155  }
156 }
157 
158 //************************* NameChangeSender ******************************
159 
161  size_t send_queue_max)
162  : sending_(false), send_handler_(send_handler),
163  send_queue_max_(send_queue_max), io_service_(NULL), mutex_(new mutex) {
164 
165  // Queue size must be big enough to hold at least 1 entry.
166  setQueueMaxSize(send_queue_max);
167 }
168 
169 void
171  if (amSending()) {
172  // This amounts to a programmatic error.
173  isc_throw(NcrSenderError, "NameChangeSender is already sending");
174  }
175 
176  // Call implementation dependent open.
177  try {
178  if (MultiThreadingMgr::instance().getMode()) {
179  lock_guard<mutex> lock(*mutex_);
180  startSendingInternal(io_service);
181  } else {
182  startSendingInternal(io_service);
183  }
184  } catch (const isc::Exception& ex) {
185  stopSending();
186  isc_throw(NcrSenderOpenError, "Open failed: " << ex.what());
187  }
188 }
189 
190 void
191 NameChangeSender::startSendingInternal(isc::asiolink::IOService& io_service) {
192  // Clear send marker.
193  ncr_to_send_.reset();
194 
195  // Remember io service we're given.
196  io_service_ = &io_service;
197  open(io_service);
198 
199  // Set our status to sending.
200  setSending(true);
201 
202  // If there's any queued already.. we'll start sending.
203  sendNext();
204 }
205 
206 void
208  // Set it send indicator to false, no matter what. This allows us to at
209  // least try to re-open via startSending(). Also, setting it false now,
210  // allows us to break sendNext() chain in invokeSendHandler.
211  setSending(false);
212 
213  // If there is an outstanding IO to complete, attempt to process it.
214  if (ioReady() && io_service_ != NULL) {
215  try {
216  runReadyIO();
217  } catch (const std::exception& ex) {
218  // Swallow exceptions. If we have some sort of error we'll log
219  // it but we won't propagate the throw.
221  DHCP_DDNS_NCR_FLUSH_IO_ERROR).arg(ex.what());
222  }
223  }
224 
225  try {
226  // Call implementation dependent close.
227  close();
228  } catch (const isc::Exception &ex) {
229  // Swallow exceptions. If we have some sort of error we'll log
230  // it but we won't propagate the throw.
233  }
234 
235  io_service_ = NULL;
236 }
237 
238 void
240  if (!amSending()) {
241  isc_throw(NcrSenderError, "sender is not ready to send");
242  }
243 
244  if (!ncr) {
245  isc_throw(NcrSenderError, "request to send is empty");
246  }
247 
248  if (MultiThreadingMgr::instance().getMode()) {
249  lock_guard<mutex> lock(*mutex_);
250  sendRequestInternal(ncr);
251  } else {
252  sendRequestInternal(ncr);
253  }
254 }
255 
256 void
257 NameChangeSender::sendRequestInternal(NameChangeRequestPtr& ncr) {
258  if (send_queue_.size() >= send_queue_max_) {
260  "send queue has reached maximum capacity: "
261  << send_queue_max_);
262  }
263 
264  // Put it on the queue.
265  send_queue_.push_back(ncr);
266 
267  // Call sendNext to schedule the next one to go.
268  sendNext();
269 }
270 
271 void
273  if (ncr_to_send_) {
274  // @todo Not sure if there is any risk of getting stuck here but
275  // an interval timer to defend would be good.
276  // In reality, the derivation should ensure they timeout themselves
277  return;
278  }
279 
280  // If queue isn't empty, then get one from the front. Note we leave
281  // it on the front of the queue until we successfully send it.
282  if (!send_queue_.empty()) {
283  ncr_to_send_ = send_queue_.front();
284 
285  // @todo start defense timer
286  // If a send were to hang and we timed it out, then timeout
287  // handler need to cycle thru open/close ?
288 
289  // Call implementation dependent send.
290  doSend(ncr_to_send_);
291  }
292 }
293 
294 void
296  if (MultiThreadingMgr::instance().getMode()) {
297  lock_guard<mutex> lock(*mutex_);
298  invokeSendHandlerInternal(result);
299  } else {
300  invokeSendHandlerInternal(result);
301  }
302 }
303 
304 void
305 NameChangeSender::invokeSendHandlerInternal(const NameChangeSender::Result result) {
306  // @todo reset defense timer
307  if (result == SUCCESS) {
308  // It shipped so pull it off the queue.
309  send_queue_.pop_front();
310  }
311 
312  // Invoke the completion handler passing in the result and a pointer
313  // the request involved.
314  // Surround the invocation with a try-catch. The invoked handler is
315  // not supposed to throw, but in the event it does we will at least
316  // report it.
317  try {
318  send_handler_(result, ncr_to_send_);
319  } catch (const std::exception& ex) {
321  .arg(ex.what());
322  }
323 
324  // Clear the pending ncr pointer.
325  ncr_to_send_.reset();
326 
327  // Set up the next send
328  try {
329  if (amSending()) {
330  sendNext();
331  }
332  } catch (const isc::Exception& ex) {
333  // It is possible though unlikely, for sendNext to fail without
334  // scheduling the send. While, unlikely, it does mean the callback
335  // will not get called with a failure. A throw here would surface
336  // at the IOService::run (or run variant) invocation. So we will
337  // close the window by invoking the application handler with
338  // a failed result, and let the application layer sort it out.
340  .arg(ex.what());
341 
342  // Invoke the completion handler passing in failed result.
343  // Surround the invocation with a try-catch. The invoked handler is
344  // not supposed to throw, but in the event it does we will at least
345  // report it.
346  try {
347  send_handler_(ERROR, ncr_to_send_);
348  } catch (const std::exception& ex) {
351  }
352  }
353 }
354 
355 void
357  if (MultiThreadingMgr::instance().getMode()) {
358  lock_guard<mutex> lock(*mutex_);
359  skipNextInternal();
360  } else {
361  skipNextInternal();
362  }
363 }
364 
365 void
366 NameChangeSender::skipNextInternal() {
367  if (!send_queue_.empty()) {
368  // Discards the request at the front of the queue.
369  send_queue_.pop_front();
370  }
371 }
372 
373 void
375  if (amSending()) {
376  isc_throw(NcrSenderError, "Cannot clear queue while sending");
377  }
378 
379  if (MultiThreadingMgr::instance().getMode()) {
380  lock_guard<mutex> lock(*mutex_);
381  send_queue_.clear();
382  } else {
383  send_queue_.clear();
384  }
385 }
386 
387 void
388 NameChangeSender::setQueueMaxSize(const size_t new_max) {
389  if (new_max == 0) {
390  isc_throw(NcrSenderError, "NameChangeSender:"
391  " queue size must be greater than zero");
392  }
393 
394  send_queue_max_ = new_max;
395 }
396 
397 size_t
399  if (MultiThreadingMgr::instance().getMode()) {
400  lock_guard<mutex> lock(*mutex_);
401  return (getQueueSizeInternal());
402  } else {
403  return (getQueueSizeInternal());
404  }
405 }
406 
407 size_t
408 NameChangeSender::getQueueSizeInternal() const {
409  return (send_queue_.size());
410 }
411 
413 NameChangeSender::peekAt(const size_t index) const {
414  if (MultiThreadingMgr::instance().getMode()) {
415  lock_guard<mutex> lock(*mutex_);
416  return (peekAtInternal(index));
417  } else {
418  return (peekAtInternal(index));
419  }
420 }
421 
423 NameChangeSender::peekAtInternal(const size_t index) const {
424  auto size = getQueueSizeInternal();
425  if (index >= size) {
427  "NameChangeSender::peekAt peek beyond end of queue attempted"
428  << " index: " << index << " queue size: " << size);
429  }
430 
431  return (send_queue_.at(index));
432 }
433 
434 bool
436  if (MultiThreadingMgr::instance().getMode()) {
437  lock_guard<mutex> lock(*mutex_);
438  return ((ncr_to_send_) ? true : false);
439  } else {
440  return ((ncr_to_send_) ? true : false);
441  }
442 }
443 
444 void
446  if (source_sender.amSending()) {
447  isc_throw(NcrSenderError, "Cannot assume queue:"
448  " source sender is actively sending");
449  }
450 
451  if (amSending()) {
452  isc_throw(NcrSenderError, "Cannot assume queue:"
453  " target sender is actively sending");
454  }
455 
456  if (getQueueMaxSize() < source_sender.getQueueSize()) {
457  isc_throw(NcrSenderError, "Cannot assume queue:"
458  " source queue count exceeds target queue max");
459  }
460 
461  if (MultiThreadingMgr::instance().getMode()) {
462  lock_guard<mutex> lock(*mutex_);
463  assumeQueueInternal(source_sender);
464  } else {
465  assumeQueueInternal(source_sender);
466  }
467 }
468 
469 void
470 NameChangeSender::assumeQueueInternal(NameChangeSender& source_sender) {
471  if (!send_queue_.empty()) {
472  isc_throw(NcrSenderError, "Cannot assume queue:"
473  " target queue is not empty");
474  }
475 
476  send_queue_.swap(source_sender.getSendQueue());
477 }
478 
479 int
481  isc_throw(NotImplemented, "NameChangeSender::getSelectFd is not supported");
482 }
483 
484 void
486  if (!io_service_) {
487  isc_throw(NcrSenderError, "NameChangeSender::runReadyIO"
488  " sender io service is null");
489  }
490 
491  // We shouldn't be here if IO isn't ready to execute.
492  // By running poll we're guaranteed not to hang.
495  io_service_->get_io_service().poll_one();
496 }
497 
498 } // namespace dhcp_ddns
499 } // namespace isc
Exception thrown if an error occurs initiating an IO receive.
Definition: ncr_io.h:107
A generic exception that is thrown when a function is not implemented.
NameChangeProtocol stringToNcrProtocol(const std::string &protocol_str)
Function which converts text labels to NameChangeProtocol enums.
Definition: ncr_io.cc:23
const isc::log::MessageID DHCP_DDNS_UNCAUGHT_NCR_RECV_HANDLER_ERROR
static MultiThreadingMgr & instance()
Returns a single instance of Multi Threading Manager.
This file defines abstract classes for exchanging NameChangeRequests.
virtual void close()=0
Abstract method which closes the IO source.
virtual bool ioReady()=0
Returns whether or not the sender has IO ready to process.
virtual int getSelectFd()=0
Returns a file descriptor suitable for use with select.
Definition: ncr_io.cc:480
const isc::log::MessageID DHCP_DDNS_NCR_FLUSH_IO_ERROR
void assumeQueue(NameChangeSender &source_sender)
Move all queued requests from a given sender into the send queue.
Definition: ncr_io.cc:445
Result
Defines the outcome of an asynchronous NCR send.
Definition: ncr_io.h:476
const isc::log::MessageID DHCP_DDNS_NCR_LISTEN_CLOSE_ERROR
size_t getQueueMaxSize() const
Returns the maximum number of entries allowed in the send queue.
Definition: ncr_io.h:748
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition: macros.h:32
STL namespace.
void skipNext()
Removes the request at the front of the send queue.
Definition: ncr_io.cc:356
boost::shared_ptr< NameChangeRequest > NameChangeRequestPtr
Defines a pointer to a NameChangeRequest.
Definition: ncr_msg.h:212
size_t getQueueSize() const
Returns the number of entries currently in the send queue.
Definition: ncr_io.cc:398
Thrown when a NameChangeSender encounters an error.
Definition: ncr_io.h:357
Exception thrown if an NcrListenerError encounters a general error.
Definition: ncr_io.h:93
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
SendQueue & getSendQueue()
Returns a reference to the send queue.
Definition: ncr_io.h:806
std::string ncrProtocolToString(NameChangeProtocol protocol)
Function which converts NameChangeProtocol enums to text labels.
Definition: ncr_io.cc:36
Definition: edns.h:19
void clearSendQueue()
Flushes all entries in the send queue.
Definition: ncr_io.cc:374
NameChangeListener(RequestReceiveHandler &recv_handler)
Constructor.
Definition: ncr_io.cc:54
isc::log::Logger dhcp_ddns_logger("libdhcp-ddns")
Defines the logger used within lib dhcp_ddns.
Definition: dhcp_ddns_log.h:18
Exception thrown if an error occurs during IO source open.
Definition: ncr_io.h:364
const isc::log::MessageID DHCP_DDNS_NCR_SEND_NEXT_ERROR
NameChangeSender(RequestSendHandler &send_handler, size_t send_queue_max=MAX_QUEUE_DEFAULT)
Constructor.
Definition: ncr_io.cc:160
Abstract class for defining application layer send callbacks.
Definition: ncr_io.h:488
void startListening(isc::asiolink::IOService &io_service)
Prepares the IO for reception and initiates the first receive.
Definition: ncr_io.cc:61
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
bool isSendInProgress() const
Returns true when a send is in progress.
Definition: ncr_io.cc:435
virtual void open(isc::asiolink::IOService &io_service)=0
Abstract method which opens the IO sink for transmission.
This is a base class for exceptions thrown from the DNS library module.
Defines the logger used by the top-level component of kea-dhcp-ddns.
void receiveNext()
Initiates an asynchronous receive.
Definition: ncr_io.cc:88
const NameChangeRequestPtr & peekAt(const size_t index) const
Returns the entry at a given position in the queue.
Definition: ncr_io.cc:413
const isc::log::MessageID DHCP_DDNS_NCR_RECV_NEXT_ERROR
virtual void doSend(NameChangeRequestPtr &ncr)=0
Initiates an IO layer asynchronous send.
const isc::log::MessageID DHCP_DDNS_NCR_SEND_CLOSE_ERROR
virtual void runReadyIO()
Processes sender IO events.
Definition: ncr_io.cc:485
void stopSending()
Closes the IO sink and stops send logic.
Definition: ncr_io.cc:207
virtual void open(isc::asiolink::IOService &io_service)=0
Abstract method which opens the IO source for reception.
void startSending(isc::asiolink::IOService &io_service)
Prepares the IO for transmission.
Definition: ncr_io.cc:170
Exception thrown if an error occurs initiating an IO send.
Definition: ncr_io.h:371
void sendNext()
Dequeues and sends the next request on the send queue in a thread safe context.
Definition: ncr_io.cc:272
NameChangeProtocol
Defines the list of socket protocols supported.
Definition: ncr_io.h:68
const isc::log::MessageID DHCP_DDNS_UNCAUGHT_NCR_SEND_HANDLER_ERROR
void setQueueMaxSize(const size_t new_max)
Sets the maximum queue size to the given value.
Definition: ncr_io.cc:388
bool amListening() const
Returns true if the listener is listening, false otherwise.
Definition: ncr_io.h:312
Abstract interface for sending NameChangeRequests.
Definition: ncr_io.h:466
void invokeRecvHandler(const Result result, NameChangeRequestPtr &ncr)
Calls the NCR receive handler registered with the listener.
Definition: ncr_io.cc:111
void sendRequest(NameChangeRequestPtr &ncr)
Queues the given request to be sent.
Definition: ncr_io.cc:239
void stopListening()
Closes the IO source and stops listen logic.
Definition: ncr_io.cc:94
void invokeSendHandler(const NameChangeSender::Result result)
Calls the NCR send completion handler registered with the sender.
Definition: ncr_io.cc:295
virtual void close()=0
Abstract method which closes the IO sink.
bool amSending() const
Returns true if the sender is in send mode, false otherwise.
Definition: ncr_io.h:733
Abstract class for defining application layer receive callbacks.
Definition: ncr_io.h:183
virtual void doReceive()=0
Initiates an IO layer asynchronous read.
Result
Defines the outcome of an asynchronous NCR receive.
Definition: ncr_io.h:171
Exception thrown if an error occurs during IO source open.
Definition: ncr_io.h:100