Kea  1.9.9-git
cql_exchange.cc
Go to the documentation of this file.
1 // Copyright (C) 2018-2020 Internet Systems Consortium, Inc. ("ISC")
2 // Copyright (C) 2016-2017 Deutsche Telekom AG.
3 //
4 // Authors: Razvan Becheriu <razvan.becheriu@qualitance.com>
5 // Andrei Pavel <andrei.pavel@qualitance.com>
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 // http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 
19 #include <config.h>
20 
21 #include <cql/cql_connection.h>
22 #include <cql/cql_exchange.h>
23 #include <cql/sql_common.h>
24 #include <database/db_exceptions.h>
25 
26 #include <boost/multi_index/hashed_index.hpp>
27 #include <boost/multi_index/member.hpp>
28 #include <boost/multi_index/sequenced_index.hpp>
29 #include <boost/multi_index_container.hpp>
30 #include <boost/noncopyable.hpp>
31 #include <boost/shared_ptr.hpp>
32 
33 #include <fstream>
34 #include <iostream>
35 #include <map>
36 #include <string>
37 #include <utility>
38 #include <vector>
39 
40 namespace isc {
41 namespace db {
42 
44 #define KEA_CASS_CHECK(cass_error) \
45  { \
46  if (cass_error != CASS_OK) { \
47  return cass_error; \
48  } \
49  }
50 
54 public:
55  size_t operator()(const ExchangeDataType& key) const {
56  return std::hash<size_t>{}(static_cast<size_t>(key));
57  }
58 };
59 
61 typedef std::unordered_map<ExchangeDataType, CqlFunction, ExchangeDataTypeHash>
64 
72 std::size_t
73 hash_value(const CassValueType& key) {
74  return key;
75 }
76 
79 
81 typedef std::unordered_map<std::type_index, ExchangeDataType> AnyTypeMap;
82 
83 // Declare uint8_t as key here for compatibility with g++ version 5. Ideally,
84 // it would be CassValueType
85 typedef std::unordered_map<uint8_t, ExchangeDataType> CassTypeMap;
87 
89 static AnyTypeMap ANY_TYPE_MAP = {
90  {typeid(NULL), EXCHANGE_DATA_TYPE_NONE},
91  {typeid(cass_bool_t*), EXCHANGE_DATA_TYPE_BOOL},
92  {typeid(cass_int8_t*), EXCHANGE_DATA_TYPE_INT8},
93  {typeid(cass_int16_t*), EXCHANGE_DATA_TYPE_INT16},
94  {typeid(cass_int32_t*), EXCHANGE_DATA_TYPE_INT32},
95  {typeid(cass_int64_t*), EXCHANGE_DATA_TYPE_INT64},
96  {typeid(std::string*), EXCHANGE_DATA_TYPE_STRING},
98  {typeid(CassUuid*), EXCHANGE_DATA_TYPE_UUID},
99  {typeid(Udt*), EXCHANGE_DATA_TYPE_UDT}, // user data type
101 
103 static CassTypeMap CASS_TYPE_MAP = {
104  {CASS_VALUE_TYPE_CUSTOM, EXCHANGE_DATA_TYPE_UDT},
105  {CASS_VALUE_TYPE_ASCII, EXCHANGE_DATA_TYPE_STRING},
106  {CASS_VALUE_TYPE_BIGINT, EXCHANGE_DATA_TYPE_INT64},
107  {CASS_VALUE_TYPE_BLOB, EXCHANGE_DATA_TYPE_BYTES},
108  {CASS_VALUE_TYPE_BOOLEAN, EXCHANGE_DATA_TYPE_BOOL},
109  {CASS_VALUE_TYPE_COUNTER, EXCHANGE_DATA_TYPE_INT32},
110  {CASS_VALUE_TYPE_DECIMAL, EXCHANGE_DATA_TYPE_INT32},
111  {CASS_VALUE_TYPE_DOUBLE, EXCHANGE_DATA_TYPE_INT64},
112  {CASS_VALUE_TYPE_FLOAT, EXCHANGE_DATA_TYPE_INT32},
113  {CASS_VALUE_TYPE_INT, EXCHANGE_DATA_TYPE_INT32},
114  {CASS_VALUE_TYPE_TEXT, EXCHANGE_DATA_TYPE_STRING},
115  {CASS_VALUE_TYPE_TIMESTAMP, EXCHANGE_DATA_TYPE_INT64},
116  {CASS_VALUE_TYPE_UUID, EXCHANGE_DATA_TYPE_UUID},
117  {CASS_VALUE_TYPE_VARCHAR, EXCHANGE_DATA_TYPE_STRING},
118  {CASS_VALUE_TYPE_VARINT, EXCHANGE_DATA_TYPE_INT32},
119  {CASS_VALUE_TYPE_TIMEUUID, EXCHANGE_DATA_TYPE_INT64},
120  {CASS_VALUE_TYPE_INET, EXCHANGE_DATA_TYPE_NONE},
121  {CASS_VALUE_TYPE_DATE, EXCHANGE_DATA_TYPE_INT64},
122  {CASS_VALUE_TYPE_TIME, EXCHANGE_DATA_TYPE_INT64},
123  {CASS_VALUE_TYPE_SMALL_INT, EXCHANGE_DATA_TYPE_INT16},
124  {CASS_VALUE_TYPE_TINY_INT, EXCHANGE_DATA_TYPE_INT8},
125  {CASS_VALUE_TYPE_LIST, EXCHANGE_DATA_TYPE_COLLECTION},
126  {CASS_VALUE_TYPE_MAP, EXCHANGE_DATA_TYPE_COLLECTION},
127  {CASS_VALUE_TYPE_SET, EXCHANGE_DATA_TYPE_COLLECTION},
128  {CASS_VALUE_TYPE_UDT, EXCHANGE_DATA_TYPE_UDT},
129  {CASS_VALUE_TYPE_TUPLE, EXCHANGE_DATA_TYPE_UDT}};
130 
133 Udt::Udt(const CqlConnection& connection, const std::string& name)
134  : AnyArray(), connection_(connection), name_(name) {
135  // Create type.
136  cass_data_type_ = cass_keyspace_meta_user_type_by_name(
137  connection_.keyspace_meta_, name_.c_str());
138  if (!cass_data_type_) {
140  "Udt::Udt(): UDT " << name_ << " does not exist ");
141  }
142  // Create container.
143  cass_user_type_ = cass_user_type_new_from_data_type(cass_data_type_);
144  if (!cass_user_type_) {
146  "Udt::Udt(): Type " << name_
147  << " is not a UDT as expected. ");
148  }
149 }
150 
153  //
154  // Bug: it seems that if there is no call to
155  // cass_user_type_set_*(cass_user_type_), then
156  // cass_user_type_free(cass_user_type_) might SIGSEGV, so we never
157  // free. Udt objects should have application scope though.
158  // cass_user_type_free(cass_user_type_);
159 }
161 
164 void
165 AnyArray::add(const boost::any& value) {
166  push_back(value);
167 }
168 
169 void
170 AnyArray::remove(const size_t& index) {
171  if (size() <= index) {
173  "AnyArray::remove(): index "
174  << index << " out of bounds: [0, " << (size() - 1)
175  << "]");
176  }
177  erase(begin() + index);
178 }
180 
184 static CassError
185 CqlBindNone(const boost::any& /* value */,
186  const size_t& index,
187  CassStatement* statement) {
188  return cass_statement_bind_null(statement, index);
189 }
190 
191 static CassError
192 CqlBindBool(const boost::any& value,
193  const size_t& index,
194  CassStatement* statement) {
195  return cass_statement_bind_bool(statement, index,
196  *boost::any_cast<cass_bool_t*>(value));
197 }
198 
199 static CassError
200 CqlBindInt8(const boost::any& value,
201  const size_t& index,
202  CassStatement* statement) {
203  return cass_statement_bind_int8(statement, index,
204  *boost::any_cast<cass_int8_t*>(value));
205 }
206 
207 static CassError
208 CqlBindInt16(const boost::any& value,
209  const size_t& index,
210  CassStatement* statement) {
211  return cass_statement_bind_int16(statement, index,
212  *boost::any_cast<cass_int16_t*>(value));
213 }
214 
215 static CassError
216 CqlBindInt32(const boost::any& value,
217  const size_t& index,
218  CassStatement* statement) {
219  return cass_statement_bind_int32(statement, index,
220  *boost::any_cast<cass_int32_t*>(value));
221 }
222 
223 static CassError
224 CqlBindInt64(const boost::any& value,
225  const size_t& index,
226  CassStatement* statement) {
227  return cass_statement_bind_int64(statement, index,
228  *boost::any_cast<cass_int64_t*>(value));
229 }
230 
231 static CassError
232 CqlBindString(const boost::any& value,
233  const size_t& index,
234  CassStatement* statement) {
235  return cass_statement_bind_string(
236  statement, index, boost::any_cast<std::string*>(value)->c_str());
237 }
238 
239 static CassError
240 CqlBindBytes(const boost::any& value,
241  const size_t& index,
242  CassStatement* statement) {
243  CassBlob* blob_value = boost::any_cast<CassBlob*>(value);
244  return cass_statement_bind_bytes(statement, index, blob_value->data(),
245  blob_value->size());
246 }
247 
248 static CassError
249 CqlBindUuid(const boost::any& value,
250  const size_t& index,
251  CassStatement* statement) {
252  return cass_statement_bind_uuid(statement, index,
253  *boost::any_cast<CassUuid*>(value));
254 }
255 
256 static CassError
257 CqlBindUdt(const boost::any& value,
258  const size_t& index,
259  CassStatement* statement) {
260  Udt* udt = boost::any_cast<Udt*>(value);
261 
262  if (!udt) {
263  isc_throw(BadValue, "Invalid value specified, not an Udt object");
264  }
265 
266  size_t i = 0u;
267 
268  // Let's iterate over all elements in udt and check that we indeed
269  // can assign the set function for each specified type.
270  for (boost::any& element : *udt) {
271  try {
273  CQL_FUNCTIONS[exchangeType(element)].cqlUdtSetFunction_(
274  element, i, udt->cass_user_type_));
275  } catch (const boost::bad_any_cast& exception) {
276  isc_throw(DbOperationError,
277  "CqlCommon::udtSetData(): "
278  << exception.what()
279  << " when binding parameter of type "
280  << element.type().name()
281  << "in UDT with function CQL_FUNCTIONS["
282  << exchangeType(element) << "].cqlUdtSetFunction_");
283  }
284  ++i;
285  }
286 
287  return cass_statement_bind_user_type(statement, index,
288  udt->cass_user_type_);
289 }
290 
291 static CassError
292 CqlBindCollection(const boost::any& value,
293  const size_t& index,
294  CassStatement* statement) {
295  AnyCollection* elements = boost::any_cast<AnyCollection*>(value);
296 
297  CassCollection* collection =
298  cass_collection_new(CASS_COLLECTION_TYPE_SET, elements->size());
299 
300  // Iterate over all elements and assign appropriate append function
301  // for each.
302  for (boost::any& element : *elements) {
303  ExchangeDataType type = exchangeType(element);
304  KEA_CASS_CHECK(CQL_FUNCTIONS[type].cqlCollectionAppendFunction_(
305  element, collection));
306  }
307 
308  const CassError cass_error =
309  cass_statement_bind_collection(statement, index, collection);
310  cass_collection_free(collection);
311 
312  return cass_error;
313 }
315 
319 static CassError
320 CqlUdtSetNone(const boost::any& /* udt_member */,
321  const size_t& position,
322  CassUserType* cass_user_type) {
323  return cass_user_type_set_null(cass_user_type, position);
324 }
325 
326 static CassError
327 CqlUdtSetBool(const boost::any& udt_member,
328  const size_t& position,
329  CassUserType* cass_user_type) {
330  return cass_user_type_set_bool(cass_user_type, position,
331  *boost::any_cast<cass_bool_t*>(udt_member));
332 }
333 
334 static CassError
335 CqlUdtSetInt8(const boost::any& udt_member,
336  const size_t& position,
337  CassUserType* cass_user_type) {
338  return cass_user_type_set_int8(cass_user_type, position,
339  *boost::any_cast<cass_int8_t*>(udt_member));
340 }
341 
342 static CassError
343 CqlUdtSetInt16(const boost::any& udt_member,
344  const size_t& position,
345  CassUserType* cass_user_type) {
346  return cass_user_type_set_int16(
347  cass_user_type, position, *boost::any_cast<cass_int16_t*>(udt_member));
348 }
349 
350 static CassError
351 CqlUdtSetInt32(const boost::any& udt_member,
352  const size_t& position,
353  CassUserType* cass_user_type) {
354  return cass_user_type_set_int32(
355  cass_user_type, position, *boost::any_cast<cass_int32_t*>(udt_member));
356 }
357 
358 static CassError
359 CqlUdtSetInt64(const boost::any& udt_member,
360  const size_t& position,
361  CassUserType* cass_user_type) {
362  return cass_user_type_set_int64(
363  cass_user_type, position, *boost::any_cast<cass_int64_t*>(udt_member));
364 }
365 
366 static CassError
367 CqlUdtSetString(const boost::any& udt_member,
368  const size_t& position,
369  CassUserType* cass_user_type) {
370  return cass_user_type_set_string(
371  cass_user_type, position,
372  boost::any_cast<std::string*>(udt_member)->c_str());
373 }
374 
375 static CassError
376 CqlUdtSetBytes(const boost::any& udt_member,
377  const size_t& position,
378  CassUserType* cass_user_type) {
379  CassBlob* blob_value = boost::any_cast<CassBlob*>(udt_member);
380  return cass_user_type_set_bytes(cass_user_type, position,
381  blob_value->data(), blob_value->size());
382 }
383 
384 static CassError
385 CqlUdtSetUuid(const boost::any& udt_member,
386  const size_t& position,
387  CassUserType* cass_user_type) {
388  return cass_user_type_set_uuid(cass_user_type, position,
389  *boost::any_cast<CassUuid*>(udt_member));
390 }
391 
392 static CassError
393 CqlUdtSetUdt(const boost::any& udt_member,
394  const size_t& position,
395  CassUserType* cass_user_type) {
396  return cass_user_type_set_user_type(
397  cass_user_type, position,
398  boost::any_cast<Udt*>(udt_member)->cass_user_type_);
399 }
400 
401 static CassError
402 CqlUdtSetCollection(const boost::any& udt_member,
403  const size_t& position,
404  CassUserType* cass_user_type) {
405  return cass_user_type_set_collection(
406  cass_user_type, position, boost::any_cast<CassCollection*>(udt_member));
407 }
409 
413 static CassError
414 CqlCollectionAppendNone(const boost::any& /* value */,
415  CassCollection* /* collection */) {
416  return CASS_OK;
417 }
418 
419 static CassError
420 CqlCollectionAppendBool(const boost::any& value, CassCollection* collection) {
421  return cass_collection_append_bool(collection,
422  *boost::any_cast<cass_bool_t*>(value));
423 }
424 
425 static CassError
426 CqlCollectionAppendInt8(const boost::any& value, CassCollection* collection) {
427  return cass_collection_append_int8(collection,
428  *boost::any_cast<cass_int8_t*>(value));
429 }
430 
431 static CassError
432 CqlCollectionAppendInt16(const boost::any& value, CassCollection* collection) {
433  return cass_collection_append_int16(collection,
434  *boost::any_cast<cass_int16_t*>(value));
435 }
436 
437 static CassError
438 CqlCollectionAppendInt32(const boost::any& value, CassCollection* collection) {
439  return cass_collection_append_int32(collection,
440  *boost::any_cast<cass_int32_t*>(value));
441 }
442 
443 static CassError
444 CqlCollectionAppendInt64(const boost::any& value, CassCollection* collection) {
445  return cass_collection_append_int64(collection,
446  *boost::any_cast<cass_int64_t*>(value));
447 }
448 
449 static CassError
450 CqlCollectionAppendString(const boost::any& value, CassCollection* collection) {
451  return cass_collection_append_string(
452  collection, boost::any_cast<std::string*>(value)->c_str());
453 }
454 
455 static CassError
456 CqlCollectionAppendBytes(const boost::any& value, CassCollection* collection) {
457  CassBlob* blob_value = boost::any_cast<CassBlob*>(value);
458  return cass_collection_append_bytes(collection, blob_value->data(),
459  blob_value->size());
460 }
461 
462 static CassError
463 CqlCollectionAppendUuid(const boost::any& value, CassCollection* collection) {
464  return cass_collection_append_uuid(collection,
465  *boost::any_cast<CassUuid*>(value));
466 }
467 
468 static CassError
469 CqlCollectionAppendUdt(const boost::any& value, CassCollection* collection) {
470  Udt* udt = boost::any_cast<Udt*>(value);
471  size_t i = 0u;
472  for (boost::any& element : *udt) {
473  KEA_CASS_CHECK(CQL_FUNCTIONS[exchangeType(element)].cqlUdtSetFunction_(
474  element, i, udt->cass_user_type_));
475  ++i;
476  }
477  return cass_collection_append_user_type(collection, udt->cass_user_type_);
478 }
479 
480 static CassError
481 CqlCollectionAppendCollection(const boost::any& value,
482  CassCollection* collection) {
483  return cass_collection_append_collection(
484  collection, boost::any_cast<CassCollection*>(value));
485 }
486 // @}
487 
490 static CassError
491 CqlGetNone(const boost::any& /* data */, const CassValue* /* value */) {
492  return CASS_OK;
493 }
494 
495 static CassError
496 CqlGetBool(const boost::any& data, const CassValue* value) {
497  return cass_value_get_bool(value, boost::any_cast<cass_bool_t*>(data));
498 }
499 
500 static CassError
501 CqlGetInt8(const boost::any& data, const CassValue* value) {
502  return cass_value_get_int8(value, boost::any_cast<cass_int8_t*>(data));
503 }
504 
505 static CassError
506 CqlGetInt16(const boost::any& data, const CassValue* value) {
507  return cass_value_get_int16(value, boost::any_cast<cass_int16_t*>(data));
508 }
509 
510 static CassError
511 CqlGetInt32(const boost::any& data, const CassValue* value) {
512  return cass_value_get_int32(value, boost::any_cast<cass_int32_t*>(data));
513 }
514 
515 static CassError
516 CqlGetInt64(const boost::any& data, const CassValue* value) {
517  return cass_value_get_int64(value, boost::any_cast<cass_int64_t*>(data));
518 }
519 
520 static CassError
521 CqlGetString(const boost::any& data, const CassValue* value) {
522  char const* data_value;
523  size_t size_value;
524  CassError cass_error = cass_value_get_string(
525  value, static_cast<char const**>(&data_value), &size_value);
526  boost::any_cast<std::string*>(data)->assign(data_value,
527  data_value + size_value);
528  return cass_error;
529 }
530 
531 static CassError
532 CqlGetBytes(const boost::any& data, const CassValue* value) {
533  const cass_byte_t* data_value;
534  size_t size_value;
535  CassError cass_error = cass_value_get_bytes(
536  value, static_cast<const cass_byte_t**>(&data_value), &size_value);
537  boost::any_cast<CassBlob*>(data)->assign(data_value,
538  data_value + size_value);
539  return cass_error;
540 }
541 
542 static CassError
543 CqlGetUuid(const boost::any& data, const CassValue* value) {
544  return cass_value_get_uuid(value, boost::any_cast<CassUuid*>(data));
545 }
546 
547 static CassError
548 CqlGetUdt(const boost::any& data, const CassValue* value) {
549  Udt* udt = boost::any_cast<Udt*>(data);
550 
551  CassIterator* fields = cass_iterator_fields_from_user_type(value);
552  if (!fields) {
553  isc_throw(DbOperationError, "CqlGetUdt(): column is not a UDT");
554  }
555  Udt::const_iterator it = udt->begin();
556  while (cass_iterator_next(fields)) {
557  const CassValue* field_value =
558  cass_iterator_get_user_type_field_value(fields);
559  if (cass_value_is_null(field_value)) {
560  isc_throw(DbOperationError,
561  "CqlGetUdt(): null value returned in UDT");
562  }
563  const CassValueType& type = cass_value_type(field_value);
564  KEA_CASS_CHECK(CQL_FUNCTIONS[exchangeType(type)].cqlGetFunction_(
565  *it, field_value));
566  ++it;
567  // If cqlGetFunction_() returns != CASS_OK, don't
568  // cass_iterator_free(items_iterator) because we're returning from this
569  // function and throwing from the callee.
570  }
571  cass_iterator_free(fields);
572  return CASS_OK;
573 }
574 
575 static CassError
576 CqlGetCollection(const boost::any& data, const CassValue* value) {
577  AnyCollection* collection = boost::any_cast<AnyCollection*>(data);
578  if (!collection) {
579  isc_throw(DbOperationError, "CqlGetCollection(): column is not a collection");
580  }
581 
582  BOOST_ASSERT(collection->size() == 1);
583 
586  boost::any underlying_object = *collection->begin();
587 
588  collection->clear();
589 
590  CassIterator* items = cass_iterator_from_collection(value);
591  if (!items) {
592  isc_throw(DbOperationError,
593  "CqlGetCollection(): column is not a collection");
594  }
595  while (cass_iterator_next(items)) {
596  const CassValue* item_value = cass_iterator_get_value(items);
597  if (cass_value_is_null(item_value)) {
598  isc_throw(DbOperationError,
599  "CqlGetCollection(): null value returned in collection");
600  }
601  const CassValueType& type = cass_value_type(item_value);
602 
603  collection->push_back(underlying_object);
604  KEA_CASS_CHECK(CQL_FUNCTIONS[exchangeType(type)].cqlGetFunction_(
605  *collection->rbegin(), item_value));
606  // If cqlGetFunction_() returns != CASS_OK, don't call
607  // cass_iterator_free(items_iterator) because we're returning from this
608  // function and throwing from the callee.
609  }
610  cass_iterator_free(items);
611  return CASS_OK;
612 }
614 
616 CqlFunctionMap CQL_FUNCTIONS = //
618  {CqlBindNone, CqlUdtSetNone, CqlCollectionAppendNone, CqlGetNone}},
620  {CqlBindBool, CqlUdtSetBool, CqlCollectionAppendBool, CqlGetBool}},
622  {CqlBindInt8, CqlUdtSetInt8, CqlCollectionAppendInt8, CqlGetInt8}},
624  {CqlBindInt16, CqlUdtSetInt16, CqlCollectionAppendInt16, CqlGetInt16}},
626  {CqlBindInt32, CqlUdtSetInt32, CqlCollectionAppendInt32, CqlGetInt32}},
628  {CqlBindInt64, CqlUdtSetInt64, CqlCollectionAppendInt64, CqlGetInt64}},
630  {CqlBindString, CqlUdtSetString, CqlCollectionAppendString,
631  CqlGetString}},
633  {CqlBindBytes, CqlUdtSetBytes, CqlCollectionAppendBytes, CqlGetBytes}},
635  {CqlBindUuid, CqlUdtSetUuid, CqlCollectionAppendUuid, CqlGetUuid}},
637  {CqlBindUdt, CqlUdtSetUdt, CqlCollectionAppendUdt, CqlGetUdt}},
639  {CqlBindCollection, CqlUdtSetCollection, CqlCollectionAppendCollection,
640  CqlGetCollection}}};
641 
643 exchangeType(const boost::any& object) {
644  const std::type_index type = object.type();
645  AnyTypeMap::const_iterator exchange_type_it = ANY_TYPE_MAP.find(type);
646  if (exchange_type_it == ANY_TYPE_MAP.end()) {
648  "exchangeType(): boost::any type "
649  << type.name() << " does not map to any exchange type");
650  }
651  const ExchangeDataType exchange_type = exchange_type_it->second;
652  if (exchange_type >= CQL_FUNCTIONS.size()) {
654  "exchangeType(): index " << exchange_type << " out of bounds "
655  << 0 << " - "
656  << (CQL_FUNCTIONS.size() - 1));
657  }
658  return exchange_type;
659 }
660 
662 exchangeType(const CassValueType& type) {
663  CassTypeMap::const_iterator exchange_type_it = CASS_TYPE_MAP.find(type);
664  if (exchange_type_it == CASS_TYPE_MAP.end()) {
666  "exchangeType(): Cassandra value type "
667  << type << " does not map to any exchange type");
668  }
669  const ExchangeDataType exchange_type = exchange_type_it->second;
670  if (exchange_type >= CQL_FUNCTIONS.size()) {
672  "exchangeType(): index " << exchange_type << " out of bounds "
673  << 0 << " - "
674  << CQL_FUNCTIONS.size() - 1);
675  }
676  return exchange_type;
677 }
678 
679 void
680 CqlCommon::bindData(const AnyArray& data, CassStatement* statement) {
681  size_t i = 0u;
682  for (const boost::any& element : data) {
683  CassError cass_error;
684  try {
685  cass_error = CQL_FUNCTIONS[exchangeType(element)].cqlBindFunction_(
686  element, i, statement);
687  } catch (const boost::bad_any_cast& exception) {
689  "CqlCommon::bindData(): "
690  << exception.what() << " when binding parameter " << i
691  << " which is of type " << element.type().name()
692  << " with function CQL_FUNCTIONS["
693  << exchangeType(element) << "].cqlBindFunction_()");
694  }
695  if (cass_error != CASS_OK) {
697  "CqlCommon::bindData(): unable to bind parameter "
698  << i << " which is of type " << element.type().name()
699  << " with function CQL_FUNCTIONS["
700  << exchangeType(element)
701  << "].cqlBindFunction_(), Cassandra error code: "
702  << cass_error_desc(cass_error));
703  }
704  ++i;
705  }
706 }
707 
708 void
709 CqlCommon::getData(const CassRow* row, AnyArray& data) {
710  size_t i = 0u;
711  for (boost::any& element : data) {
712  const CassValue* value = cass_row_get_column(row, i);
713  CassError cass_error;
714  try {
715  cass_error = CQL_FUNCTIONS[exchangeType(element)].cqlGetFunction_(
716  element, value);
717  } catch (const boost::bad_any_cast& exception) {
719  "CqlCommon::getData(): "
720  << exception.what() << " when retrieving parameter "
721  << i << " which is of type " << element.type().name()
722  << " with function CQL_FUNCTIONS["
723  << exchangeType(element) << "].cqlGetFunction_()");
724  }
725  if (cass_error != CASS_OK) {
726  isc_throw(
728  "CqlCommon::getData(): Cassandra error when retrieving column "
729  << i << ", Cassandra error code: "
730  << cass_error_desc(cass_error));
731  }
732  ++i;
733  }
734 }
735 
737 }
738 
740 }
741 
742 void
744  const uint32_t& valid_lifetime,
745  cass_int64_t& expire) {
746  // Calculate expire time.
747  cass_int64_t expire_time = static_cast<cass_int64_t>(cltt) +
748  static_cast<cass_int64_t>(valid_lifetime);
749 
750  expire = expire_time;
751 }
752 
753 void
754 CqlExchange::convertFromDatabaseTime(const cass_int64_t& expire,
755  const cass_int64_t& valid_lifetime,
756  time_t& cltt) {
757  // Convert to local time.
758  cltt = static_cast<time_t>(expire - valid_lifetime);
759 }
760 
761 AnyArray
762 CqlExchange::executeSelect(const CqlConnection& connection, const AnyArray& data,
763  StatementTag statement_tag, const bool& single /* = false */) {
764  CassError rc;
765  CassStatement* statement = NULL;
766  CassFuture* future = NULL;
767  AnyArray local_data = data;
768 
769  // Find the query statement first.
770  StatementMap::const_iterator it = connection.statements_.find(statement_tag);
771  if (it == connection.statements_.end()) {
773  "CqlExchange::executeSelect(): Statement "
774  << statement_tag << "has not been prepared.");
775  }
776 
777  // Bind the data before the query is executed.
778  CqlTaggedStatement tagged_statement = it->second;
779  if (tagged_statement.is_raw_) {
780  // The entire query is the first element in data.
781  std::string* query = boost::any_cast<std::string*>(local_data.back());
782  local_data.pop_back();
783  statement = cass_statement_new(query->c_str(), local_data.size());
784  } else {
785  statement = cass_prepared_bind(tagged_statement.prepared_statement_);
786  if (!statement) {
788  "CqlExchange::executeSelect(): unable to bind statement "
789  << tagged_statement.name_);
790  }
791  }
792 
793  // Set specific level of consistency if we're told to do so.
794  if (connection.force_consistency_) {
795  rc = cass_statement_set_consistency(statement, connection.consistency_);
796  if (rc != CASS_OK) {
797  cass_statement_free(statement);
799  "CqlExchange::executeSelect(): unable to set statement "
800  "consistency for statement "
801  << tagged_statement.name_
802  << ", Cassandra error code: " << cass_error_desc(rc));
803  }
804  if (connection.serial_consistency_ != CASS_CONSISTENCY_UNKNOWN) {
805  rc = cass_statement_set_serial_consistency(statement, connection.serial_consistency_);
806  if (rc != CASS_OK) {
807  cass_statement_free(statement);
809  "CqlExchange::executeSelect(): unable to set statement "
810  "serial consistency for statement "
811  << tagged_statement.name_
812  << ", Cassandra error code: " << cass_error_desc(rc));
813  }
814  }
815  }
816 
817  CqlCommon::bindData(local_data, statement);
818 
819  // Everything's ready. Call the actual statement.
820  future = cass_session_execute(connection.session_, statement);
821  if (!future) {
822  cass_statement_free(statement);
824  "CqlExchange::executeSelect(): no CassFuture for statement "
825  << tagged_statement.name_);
826  }
827 
828  // Wait for the statement execution to complete.
829  cass_future_wait(future);
830  const std::string error = connection.checkFutureError(
831  "CqlExchange::executeSelect(): cass_session_execute() != CASS_OK",
832  future, statement_tag);
833  rc = cass_future_error_code(future);
834  if (rc != CASS_OK) {
835  cass_future_free(future);
836  cass_statement_free(statement);
837  isc_throw(DbOperationError, error);
838  }
839 
840  // Get column values.
841  const CassResult* result_collection = cass_future_get_result(future);
842  if (single && cass_result_row_count(result_collection) > 1) {
843  cass_result_free(result_collection);
844  cass_future_free(future);
845  cass_statement_free(statement);
846  isc_throw(
848  "CqlExchange::executeSelect(): multiple records were found in "
849  "the database where only one was expected for statement "
850  << tagged_statement.name_);
851  }
852 
853  // Get results.
854  AnyArray return_values;
855  AnyArray collection;
856  CassIterator* rows = cass_iterator_from_result(result_collection);
857  while (cass_iterator_next(rows)) {
858  const CassRow* row = cass_iterator_get_row(rows);
859  createBindForSelect(return_values, statement_tag);
860  CqlCommon::getData(row, return_values);
861  collection.add(retrieve());
862  }
863 
864  // Free resources.
865  cass_iterator_free(rows);
866  cass_result_free(result_collection);
867  cass_future_free(future);
868  cass_statement_free(statement);
869 
870  return collection;
871 }
872 
873 void
874 CqlExchange::executeMutation(const CqlConnection& connection, const AnyArray& data,
875  StatementTag statement_tag) {
876  CassError rc;
877  CassStatement* statement = NULL;
878  CassFuture* future = NULL;
879 
880  // Find the statement on a list of prepared statements.
881  StatementMap::const_iterator it =
882  connection.statements_.find(statement_tag);
883  if (it == connection.statements_.end()) {
884  isc_throw(DbOperationError, "CqlExchange::executeSelect(): Statement "
885  << statement_tag << "has not been prepared.");
886  }
887  // Bind the statement.
888  CqlTaggedStatement tagged_statement = it->second;
889  statement = cass_prepared_bind(tagged_statement.prepared_statement_);
890  if (!statement) {
892  "CqlExchange::executeMutation(): unable to bind statement "
893  << tagged_statement.name_);
894  }
895 
896  // Set specific level of consistency, if told to do so.
897  if (connection.force_consistency_) {
898  rc = cass_statement_set_consistency(statement, connection.consistency_);
899  if (rc != CASS_OK) {
900  cass_statement_free(statement);
901  isc_throw(DbOperationError, "CqlExchange::executeMutation(): unable to set"
902  " statement consistency for statement " << tagged_statement.name_
903  << ", Cassandra error code: " << cass_error_desc(rc));
904  }
905  if (connection.serial_consistency_ != CASS_CONSISTENCY_UNKNOWN) {
906  rc = cass_statement_set_serial_consistency(statement, connection.serial_consistency_);
907  if (rc != CASS_OK) {
908  cass_statement_free(statement);
910  "CqlExchange::executeMutation(): unable to set statement "
911  "serial consistency for statement "
912  << tagged_statement.name_
913  << ", Cassandra error code: " << cass_error_desc(rc));
914  }
915  }
916  }
917 
918  CqlCommon::bindData(data, statement);
919 
920  future = cass_session_execute(connection.session_, statement);
921  if (!future) {
922  cass_statement_free(statement);
924  "CqlExchange::executeMutation(): unable to execute statement "
925  << tagged_statement.name_);
926  }
927  cass_future_wait(future);
928  const std::string error = connection.checkFutureError("CqlExchange::executeMutation():"
929  "cass_session_execute() != CASS_OK", future, statement_tag);
930  rc = cass_future_error_code(future);
931  if (rc != CASS_OK) {
932  cass_future_free(future);
933  cass_statement_free(statement);
934  isc_throw(DbOperationError, error);
935  }
936 
937  // Check if statement has been applied.
938  bool applied = statementApplied(future);
939 
940  // Free resources.
941  cass_future_free(future);
942  cass_statement_free(statement);
943 
944  if (!applied) {
945  isc_throw(
947  "CqlExchange::executeMutation(): [applied] is false for statement "
948  << tagged_statement.name_);
949  }
950 }
951 
952 bool
953 CqlExchange::statementApplied(CassFuture* future,
954  size_t* row_count,
955  size_t* column_count) {
956  const CassResult* result_collection = cass_future_get_result(future);
957  if (!result_collection) {
958  isc_throw(DbOperationError, "CqlExchange::statementApplied(): unable to get"
959  " results collection");
960  }
961  if (row_count) {
962  *row_count = cass_result_row_count(result_collection);
963  }
964  if (column_count) {
965  *column_count = cass_result_column_count(result_collection);
966  }
967  CassIterator* rows = cass_iterator_from_result(result_collection);
968  AnyArray data;
969  cass_bool_t applied = cass_true;
970  while (cass_iterator_next(rows)) {
971  const CassRow* row = cass_iterator_get_row(rows);
972  // [applied]: bool
973  data.add(&applied);
974  CqlCommon::getData(row, data);
975  }
976  cass_iterator_free(rows);
977  cass_result_free(result_collection);
978  return applied == cass_true;
979 }
980 
982 
984  {GET_VERSION, {GET_VERSION, "SELECT version, minor FROM schema_version "}}
985 };
986 
988 }
989 
991 }
992 
993 void
995  data.clear(); // Start with a fresh array.
996  data.add(&version_); // first column is a major version
997  data.add(&minor_); // second column is a minor version
998 }
999 
1000 boost::any
1002  pair_ = VersionPair(version_, minor_);
1003  return &pair_;
1004 }
1005 
1008  // Run statement.
1009  const AnyArray where_values;
1010  AnyArray version_collection =
1011  executeSelect(connection, where_values, GET_VERSION, true);
1012 
1013  if (!version_collection.empty()) {
1014  return *boost::any_cast<VersionPair*>(*version_collection.begin());
1015  }
1016 
1017  return VersionPair();
1018 }
1019 
1020 } // namespace db
1021 } // namespace isc
Database statement not applied.
Definition: db_exceptions.h:20
#define KEA_CASS_CHECK(cass_error)
Macro to return directly from caller function.
Definition: cql_exchange.cc:44
StatementMap statements_
Pointer to external array of tagged statements containing statement name, array of names of bind para...
void executeMutation(const CqlConnection &connection, const AnyArray &assigned_values, StatementTag statement_tag)
Executes INSERT, UPDATE or DELETE statements.
virtual void createBindForSelect(AnyArray &data, StatementTag statement_tag=NULL) override
Create BIND array to receive C++ data.
ExchangeDataType exchangeType(const boost::any &object)
Determine exchange type based on boost::any type.
const CassDataType * cass_data_type_
Internal Cassandra driver object representing a Cassandra data type.
Definition: cql_exchange.h:83
Structure used to bind C++ input values to dynamic CQL parameters.
Definition: cql_exchange.h:50
StatementTag name_
Short description of the query.
const CassPrepared * prepared_statement_
Internal Cassandra object representing the prepared statement.
std::unordered_map< std::type_index, ExchangeDataType > AnyTypeMap
Map types used to determine exchange type.
Definition: cql_exchange.cc:81
bool force_consistency_
CQL consistency enabled.
static constexpr StatementTag GET_VERSION
Statement tags definitions.
Definition: cql_exchange.h:276
virtual ~CqlVersionExchange()
Destructor.
size_t operator()(const ExchangeDataType &key) const
Definition: cql_exchange.cc:55
static void getData(const CassRow *row, AnyArray &data)
Retrieves data returned by Cassandra.
const CqlConnection & connection_
Connection to the Cassandra database.
Definition: cql_exchange.h:76
std::unordered_map< ExchangeDataType, CqlFunction, ExchangeDataTypeHash > CqlFunctionMap
Defines a type for storing aux. Cassandra functions.
Definition: cql_exchange.cc:62
CqlFunctionMap CQL_FUNCTIONS
Functions used to interface with the Cassandra C++ driver.
char const *const StatementTag
Statement index representing the statement name.
const CassKeyspaceMeta * keyspace_meta_
Keyspace meta information, used for UDTs.
bool statementApplied(CassFuture *future, size_t *row_count=NULL, size_t *column_count=NULL)
Check if CQL statement has been applied.
Multiple lease records found where one expected.
Definition: db_exceptions.h:28
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
CassUserType * cass_user_type_
Internal Cassandra driver object representing a user defined type.
Definition: cql_exchange.h:86
Defines a single statement or query.
void add(const boost::any &value)
Add a value at the end of the vector.
static void convertToDatabaseTime(const time_t &cltt, const uint32_t &valid_lifetime, cass_int64_t &expire)
CassConsistency consistency_
CQL consistency.
virtual boost::any retrieve() override
Copy received data into the pair.
std::vector< cass_byte_t > CassBlob
Host identifier converted to Cassandra data type.
Definition: cql_exchange.h:37
CassSession * session_
CQL session handle.
std::pair< uint32_t, uint32_t > VersionPair
Pair containing major and minor versions.
ExchangeDataType
Used to map server data types with internal backend storage data types.
Definition: sql_common.h:26
Udt(const CqlConnection &connection, const std::string &name)
Parameterized constructor.
const std::string name_
Name of the UDT in the schema: CREATE TYPE ___ { ... }.
Definition: cql_exchange.h:79
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
Collection (used in Cassandra)
Definition: sql_common.h:38
Defines the logger used by the top-level component of kea-dhcp-ddns.
static void convertFromDatabaseTime(const cass_int64_t &expire, const cass_int64_t &valid_lifetime, time_t &cltt)
Converts time from Cassandra format.
std::unordered_map< uint8_t, ExchangeDataType > CassTypeMap
Definition: cql_exchange.cc:85
const Name & name_
Definition: dns/message.cc:693
static void bindData(const AnyArray &data, CassStatement *statement)
Assigns values to every column of an INSERT or an UPDATE statement.
std::unordered_map< StatementTag, CqlTaggedStatement, StatementTagHash, StatementTagEqual > StatementMap
A container for all statements.
CassConsistency serial_consistency_
CQL serial consistency.
AnyArray executeSelect(const CqlConnection &connection, const AnyArray &where_values, StatementTag statement_tag, const bool &single=false)
Executes SELECT statements.
void remove(const size_t &index)
Remove the void pointer to the data value from a specified position inside the vector.
a helper structure with a function call operator that returns key value in a format expected by std::...
Definition: cql_exchange.cc:53
virtual boost::any retrieve()=0
Copy received data into the derived class' object.
virtual ~CqlExchange()
Destructor.
std::size_t hash_value(const CassValueType &key)
hash function for CassTypeMap
Definition: cql_exchange.cc:73
static StatementMap tagged_statements_
Cassandra statements.
Definition: cql_exchange.h:280
Common CQL connector pool.
static const std::string checkFutureError(const std::string &what, CassFuture *future, StatementTag statement_tag=NULL)
Check for errors.
~Udt()
Destructor.
User-Defined Type (used in Cassandra)
Definition: sql_common.h:37
virtual VersionPair retrieveVersion(const CqlConnection &connection)
Standalone method used to retrieve schema version.
Exception thrown on failure to execute a database function.
bool is_raw_
Should the statement be executed raw or with binds?
CqlExchange()
Constructor.
CqlVersionExchange()
Constructor.
AnyArray AnyCollection
Defines an array of arbitrary objects (used by Cassandra backend)
Definition: cql_exchange.h:90
virtual void createBindForSelect(AnyArray &data, StatementTag statement_tag=NULL)=0
Create BIND array to receive C++ data.