Kea  1.9.9-git
cql_connection.cc
Go to the documentation of this file.
1 // Copyright (C) 2018-2021 Internet Systems Consortium, Inc. ("ISC")
2 // Copyright (C) 2015-2018 Deutsche Telekom AG.
3 //
4 // Authors: Razvan Becheriu <razvan.becheriu@qualitance.com>
5 // Andrei Pavel <andrei.pavel@qualitance.com>
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 // http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 
19 #include <config.h>
20 
21 #include <cql/cql_connection.h>
22 #include <cql/cql_exchange.h>
23 #include <database/db_exceptions.h>
24 #include <database/db_log.h>
25 
26 #include <string>
27 
28 namespace isc {
29 namespace db {
30 
32  : DatabaseConnection(parameters), statements_(), cluster_(NULL),
33  session_(NULL), consistency_(CASS_CONSISTENCY_QUORUM),
34  serial_consistency_(CASS_CONSISTENCY_UNKNOWN), schema_meta_(NULL),
35  keyspace_meta_(NULL), force_consistency_(true) {
36 }
37 
39  // Free up the prepared statements, ignoring errors. Session and connection
40  // resources are deallocated.
41  CassError rc = CASS_OK;
42  std::string error;
43 
44  // Let's free the prepared statements.
45  for (StatementMapEntry s : statements_) {
46  CqlTaggedStatement statement = s.second;
47  if (statement.prepared_statement_) {
48  cass_prepared_free(statement.prepared_statement_);
49  }
50  }
51 
52  // If there's a session, tear it down and free the resources.
53  if (session_) {
54  cass_schema_meta_free(schema_meta_);
55  CassFuture* close_future = cass_session_close(session_);
56  cass_future_wait(close_future);
57  error = checkFutureError(
58  "CqlConnection::~CqlConnection(): cass_session_close() != CASS_OK",
59  close_future);
60  rc = cass_future_error_code(close_future);
61  cass_future_free(close_future);
62  cass_session_free(session_);
63  session_ = NULL;
64  }
65 
66  // Free the cluster if there's one.
67  if (cluster_) {
68  cass_cluster_free(cluster_);
69  cluster_ = NULL;
70  }
71 
72  if (rc != CASS_OK) {
73  // We're closing the connection anyway. Let's not throw at this stage.
75  }
76 }
77 
78 std::pair<uint32_t, uint32_t>
80  // Get a connection.
81  CqlConnection conn(parameters);
82 
83  // Open the database.
84  conn.openDatabase();
85 
86  // Prepare statement.
88 
89  std::unique_ptr<CqlVersionExchange> version_exchange(new CqlVersionExchange());
90  return version_exchange->retrieveVersion(conn);
91 }
92 
93 CassConsistency CqlConnection::parseConsistency(std::string value) {
94  static std::map<std::string, CassConsistency> consistency_map {
95  {"any", CASS_CONSISTENCY_ANY},
96  {"one", CASS_CONSISTENCY_ONE},
97  {"two", CASS_CONSISTENCY_TWO},
98  {"three", CASS_CONSISTENCY_THREE},
99  {"quorum", CASS_CONSISTENCY_QUORUM},
100  {"all", CASS_CONSISTENCY_ALL},
101  {"local-quorum", CASS_CONSISTENCY_LOCAL_QUORUM},
102  {"each-quorum", CASS_CONSISTENCY_EACH_QUORUM},
103  {"serial", CASS_CONSISTENCY_SERIAL},
104  {"local-serial", CASS_CONSISTENCY_LOCAL_SERIAL},
105  {"local-one", CASS_CONSISTENCY_LOCAL_ONE}
106  };
107  if (consistency_map.find(value) == consistency_map.end()) {
108  return CASS_CONSISTENCY_UNKNOWN;
109  }
110  return consistency_map[value];
111 }
112 
113 void
115  CassError rc;
116  // Set up the values of the parameters
117  const char* contact_points = "127.0.0.1";
118  std::string scontact_points;
119  try {
120  scontact_points = getParameter("contact-points");
121  contact_points = scontact_points.c_str();
122  } catch (...) {
123  // No host. Fine, we'll use "127.0.0.1".
124  }
125 
126  const char* port = NULL;
127  std::string sport;
128  try {
129  sport = getParameter("port");
130  port = sport.c_str();
131  } catch (...) {
132  // No port. Fine, we'll use the default "9042".
133  }
134 
135  const char* user = NULL;
136  std::string suser;
137  try {
138  suser = getParameter("user");
139  user = suser.c_str();
140  } catch (...) {
141  // No user. Fine, we'll use NULL.
142  }
143 
144  const char* password = NULL;
145  std::string spassword;
146  try {
147  spassword = getParameter("password");
148  password = spassword.c_str();
149  } catch (...) {
150  // No password. Fine, we'll use NULL.
151  }
152 
153  const char* keyspace = "keatest";
154  std::string skeyspace;
155  try {
156  skeyspace = getParameter("keyspace");
157  keyspace = skeyspace.c_str();
158  } catch (...) {
159  // No keyspace name. Fine, we'll use "keatest".
160  }
161 
162  const char* consistency = NULL;
163  std::string sconsistency;
164  try {
165  sconsistency = getParameter("consistency");
166  consistency = sconsistency.c_str();
167  } catch (...) {
168  // No consistency. Fine, we'll use "quorum".
169  }
170 
171  const char* serial_consistency = NULL;
172  std::string sserial_consistency;
173  try {
174  sserial_consistency = getParameter("serial-consistency");
175  serial_consistency = sserial_consistency.c_str();
176  } catch (...) {
177  // No serial consistency. Fine, we'll use "serial".
178  }
179 
180  const char* reconnect_wait_time = NULL;
181  std::string sreconnect_wait_time;
182  try {
183  sreconnect_wait_time = getParameter("reconnect-wait-time");
184  reconnect_wait_time = sreconnect_wait_time.c_str();
185  } catch (...) {
186  // No reconnect wait time. Fine, we'll use the default "2000".
187  }
188 
189  const char* connect_timeout = NULL;
190  std::string sconnect_timeout;
191  try {
192  sconnect_timeout = getParameter("connect-timeout");
193  connect_timeout = sconnect_timeout.c_str();
194  } catch (...) {
195  // No connect timeout. Fine, we'll use the default "5000".
196  }
197 
198  const char* request_timeout = NULL;
199  std::string srequest_timeout;
200  try {
201  srequest_timeout = getParameter("request-timeout");
202  request_timeout = srequest_timeout.c_str();
203  } catch (...) {
204  // No request timeout. Fine, we'll use the default "12000".
205  }
206 
207  const char* tcp_keepalive = NULL;
208  std::string stcp_keepalive;
209  try {
210  stcp_keepalive = getParameter("tcp-keepalive");
211  tcp_keepalive = stcp_keepalive.c_str();
212  } catch (...) {
213  // No tcp-keepalive. Fine, we'll not use TCP keepalive.
214  }
215 
216  std::string stcp_nodelay;
217  try {
218  stcp_nodelay = getParameter("tcp-nodelay");
219  } catch (...) {
220  // No tcp-nodelay. Fine, we'll use the default false.
221  }
222 
223  cluster_ = cass_cluster_new();
224  cass_cluster_set_contact_points(cluster_, contact_points);
225 
226  if (user && password) {
227  cass_cluster_set_credentials(cluster_, user, password);
228  }
229 
230  if (port) {
231  int32_t port_number;
232  try {
233  port_number = boost::lexical_cast<int32_t>(port);
234  if (port_number < 1 || port_number > 65535) {
236  "CqlConnection::openDatabase(): "
237  "port outside of range, expected "
238  "1-65535, instead got "
239  << port);
240  }
241  } catch (const boost::bad_lexical_cast& ex) {
243  "CqlConnection::openDatabase(): invalid "
244  "port, expected castable to int, instead got "
245  "\"" << port
246  << "\", " << ex.what());
247  }
248  cass_cluster_set_port(cluster_, port_number);
249  }
250 
251  if (consistency) {
252  CassConsistency desired_consistency = CqlConnection::parseConsistency(sconsistency);
253  CassConsistency desired_serial_consistency = CASS_CONSISTENCY_UNKNOWN;
254  if (serial_consistency) {
255  desired_serial_consistency = CqlConnection::parseConsistency(sserial_consistency);
256  }
257  if (desired_consistency != CASS_CONSISTENCY_UNKNOWN) {
258  setConsistency(true, desired_consistency, desired_serial_consistency);
259  }
260  }
261 
262  if (reconnect_wait_time) {
263  int32_t reconnect_wait_time_number;
264  try {
265  reconnect_wait_time_number =
266  boost::lexical_cast<int32_t>(reconnect_wait_time);
267  if (reconnect_wait_time_number < 0) {
269  "CqlConnection::openDatabase(): invalid reconnect "
270  "wait time, expected positive number, instead got "
271  << reconnect_wait_time);
272  }
273  } catch (const boost::bad_lexical_cast& ex) {
275  "CqlConnection::openDatabase(): "
276  "invalid reconnect wait time, expected "
277  "castable to int, instead got \""
278  << reconnect_wait_time << "\", " << ex.what());
279  }
280 #if (CASS_VERSION_MAJOR > 2) || \
281  ((CASS_VERSION_MAJOR == 2) && (CASS_VERSION_MINOR >= 13))
282  cass_uint64_t delay_ms =
283  static_cast<cass_uint64_t>(reconnect_wait_time_number);
284  cass_cluster_set_constant_reconnect(cluster_, delay_ms);
285 #else
286  cass_cluster_set_reconnect_wait_time(cluster_,
287  reconnect_wait_time_number);
288 #endif
289  }
290 
291  if (connect_timeout) {
292  int32_t connect_timeout_number;
293  try {
294  connect_timeout_number =
295  boost::lexical_cast<int32_t>(connect_timeout);
296  if (connect_timeout_number < 0) {
298  "CqlConnection::openDatabase(): "
299  "invalid connect timeout, expected "
300  "positive number, instead got "
301  << connect_timeout);
302  }
303  } catch (const boost::bad_lexical_cast& ex) {
305  "CqlConnection::openDatabase(): invalid connect timeout, "
306  "expected castable to int, instead got \""
307  << connect_timeout << "\", " << ex.what());
308  }
309  cass_cluster_set_connect_timeout(cluster_, connect_timeout_number);
310  }
311 
312  if (request_timeout) {
313  int32_t request_timeout_number;
314  try {
315  request_timeout_number =
316  boost::lexical_cast<int32_t>(request_timeout);
317  if (request_timeout_number < 0) {
319  "CqlConnection::openDatabase(): "
320  "invalid request timeout, expected "
321  "positive number, instead got "
322  << request_timeout);
323  }
324  } catch (const boost::bad_lexical_cast& ex) {
326  "CqlConnection::openDatabase(): invalid request timeout, "
327  "expected castable to int, instead got \""
328  << request_timeout << "\", " << ex.what());
329  }
330  cass_cluster_set_request_timeout(cluster_, request_timeout_number);
331  }
332 
333  if (tcp_keepalive) {
334  int32_t tcp_keepalive_number;
335  try {
336  tcp_keepalive_number = boost::lexical_cast<int32_t>(tcp_keepalive);
337  if (tcp_keepalive_number < 0) {
339  "CqlConnection::openDatabase(): "
340  "invalid TCP keepalive, expected "
341  "positive number, instead got "
342  << tcp_keepalive);
343  }
344  } catch (const boost::bad_lexical_cast& ex) {
346  "CqlConnection::openDatabase(): invalid TCP keepalive, "
347  "expected castable to int, instead got \""
348  << tcp_keepalive << "\", " << ex.what());
349  }
350  cass_cluster_set_tcp_keepalive(cluster_, cass_true,
351  tcp_keepalive_number);
352  }
353 
354  if (stcp_nodelay == "true") {
355  cass_cluster_set_tcp_nodelay(cluster_, cass_true);
356  }
357 
358  session_ = cass_session_new();
359 
360  CassFuture* connect_future =
361  cass_session_connect_keyspace(session_, cluster_, keyspace);
362  cass_future_wait(connect_future);
363  const std::string error =
364  checkFutureError("CqlConnection::openDatabase(): "
365  "cass_session_connect_keyspace() != CASS_OK",
366  connect_future);
367  rc = cass_future_error_code(connect_future);
368  cass_future_free(connect_future);
369  if (rc != CASS_OK) {
370  cass_session_free(session_);
371  session_ = NULL;
372  cass_cluster_free(cluster_);
373  cluster_ = NULL;
374  isc_throw(DbOpenError, error);
375  }
376 
377  // Get keyspace meta.
378  schema_meta_ = cass_session_get_schema_meta(session_);
379  keyspace_meta_ = cass_schema_meta_keyspace_by_name(schema_meta_, keyspace);
380  if (!keyspace_meta_) {
381  isc_throw(DbOpenError, "CqlConnection::openDatabase(): "
382  "!cass_schema_meta_keyspace_by_name()");
383  }
384 }
385 
386 void
388  CassError rc = CASS_OK;
389  for (StatementMapEntry it : statements) {
390  CqlTaggedStatement& tagged_statement = it.second;
391  if (statements_.find(tagged_statement.name_) != statements_.end()) {
393  "CqlConnection::prepareStatements(): "
394  "duplicate statement with name "
395  << tagged_statement.name_);
396  }
397 
398  CassFuture* future =
399  cass_session_prepare(session_, tagged_statement.text_);
400  cass_future_wait(future);
401  const std::string error =
402  checkFutureError("CqlConnection::prepareStatements():"
403  " cass_session_prepare() != CASS_OK",
404  future, tagged_statement.name_);
405  rc = cass_future_error_code(future);
406  if (rc != CASS_OK) {
407  cass_future_free(future);
408  isc_throw(DbOperationError, error);
409  }
410 
411  tagged_statement.prepared_statement_ = cass_future_get_prepared(future);
412  statements_.insert(it);
413  cass_future_free(future);
414  }
415 }
416 
417 void
419  CassConsistency consistency,
420  CassConsistency serial_consistency) {
421  force_consistency_ = force;
422  consistency_ = consistency;
423  serial_consistency_ = serial_consistency;
424 }
425 
426 void
429 }
430 
431 void
434 }
435 
436 void
439 }
440 
441 const std::string
442 CqlConnection::checkFutureError(const std::string& what,
443  CassFuture* future,
444  StatementTag statement_tag /* = NULL */) {
445  CassError cass_error = cass_future_error_code(future);
446  const char* error_message;
447  size_t error_message_size;
448  cass_future_error_message(future, &error_message, &error_message_size);
449 
450  std::stringstream stream;
451  if (statement_tag && std::strlen(statement_tag) > 0) {
452  // future is from cass_session_execute() call.
453  stream << "Statement ";
454  stream << statement_tag;
455  } else {
456  // future is from cass_session_*() call.
457  stream << "Session action ";
458  }
459  if (cass_error == CASS_OK) {
460  stream << " executed successfully.";
461  } else {
462  stream << " failed, Kea error: " << what
463  << ", Cassandra error code: " << cass_error_desc(cass_error)
464  << ", Cassandra future error: " << error_message;
465  }
466  return stream.str();
467 }
468 
469 } // namespace dhcp
470 } // namespace isc
We want to reuse the database backend connection and exchange code for other uses, in particular for hook libraries.
StatementMap statements_
Pointer to external array of tagged statements containing statement name, array of names of bind para...
static std::pair< uint32_t, uint32_t > getVersion(const ParameterMap &parameters)
Get the schema version.
StatementTag name_
Short description of the query.
virtual void commit()
Commit Transactions.
const CassPrepared * prepared_statement_
Internal Cassandra object representing the prepared statement.
DB_LOG & arg(T first, Args...args)
Pass parameters to replace logger placeholders.
Definition: db_log.h:144
bool force_consistency_
CQL consistency enabled.
void startTransaction()
Start transaction.
void prepareStatements(StatementMap &statements)
Prepare statements.
void openDatabase()
Open database.
virtual void rollback()
Rollback Transactions.
Common database connection class.
virtual ~CqlConnection()
Destructor.
Exception thrown on failure to open database.
char const *const StatementTag
Statement index representing the statement name.
const CassKeyspaceMeta * keyspace_meta_
Keyspace meta information, used for UDTs.
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
void setConsistency(bool force, CassConsistency consistency, CassConsistency serial_consistency)
Set consistency.
Defines a single statement or query.
CassConsistency consistency_
CQL consistency.
CassSession * session_
CQL session handle.
static CassConsistency parseConsistency(std::string value)
parse Consistency value
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
CassCluster * cluster_
CQL connection handle.
const int DB_DBG_TRACE_DETAIL
Database logging levels.
Definition: db_log.cc:21
Defines the logger used by the top-level component of kea-dhcp-ddns.
Exchange used to retrieve schema version from the keyspace.
Definition: cql_exchange.h:236
std::string getParameter(const std::string &name) const
Returns value of a connection parameter.
std::unordered_map< StatementTag, CqlTaggedStatement, StatementTagHash, StatementTagEqual > StatementMap
A container for all statements.
CassConsistency serial_consistency_
CQL serial consistency.
const CassSchemaMeta * schema_meta_
static StatementMap tagged_statements_
Cassandra statements.
Definition: cql_exchange.h:280
Common CQL connector pool.
std::map< std::string, std::string > ParameterMap
Database configuration parameter map.
static const std::string checkFutureError(const std::string &what, CassFuture *future, StatementTag statement_tag=NULL)
Check for errors.
std::pair< StatementTag, CqlTaggedStatement > StatementMapEntry
A type for a single entry on the statements map.
Exception thrown on failure to execute a database function.
CqlConnection(const ParameterMap &parameters)
Constructor.
char const *const text_
Text representation of the actual query.