33 session_(NULL), consistency_(CASS_CONSISTENCY_QUORUM),
34 serial_consistency_(CASS_CONSISTENCY_UNKNOWN), schema_meta_(NULL),
35 keyspace_meta_(NULL), force_consistency_(true) {
41 CassError rc = CASS_OK;
55 CassFuture* close_future = cass_session_close(
session_);
56 cass_future_wait(close_future);
58 "CqlConnection::~CqlConnection(): cass_session_close() != CASS_OK",
60 rc = cass_future_error_code(close_future);
61 cass_future_free(close_future);
78 std::pair<uint32_t, uint32_t>
90 return version_exchange->retrieveVersion(conn);
94 static std::map<std::string, CassConsistency> consistency_map {
95 {
"any", CASS_CONSISTENCY_ANY},
96 {
"one", CASS_CONSISTENCY_ONE},
97 {
"two", CASS_CONSISTENCY_TWO},
98 {
"three", CASS_CONSISTENCY_THREE},
99 {
"quorum", CASS_CONSISTENCY_QUORUM},
100 {
"all", CASS_CONSISTENCY_ALL},
101 {
"local-quorum", CASS_CONSISTENCY_LOCAL_QUORUM},
102 {
"each-quorum", CASS_CONSISTENCY_EACH_QUORUM},
103 {
"serial", CASS_CONSISTENCY_SERIAL},
104 {
"local-serial", CASS_CONSISTENCY_LOCAL_SERIAL},
105 {
"local-one", CASS_CONSISTENCY_LOCAL_ONE}
107 if (consistency_map.find(value) == consistency_map.end()) {
108 return CASS_CONSISTENCY_UNKNOWN;
110 return consistency_map[value];
117 const char* contact_points =
"127.0.0.1";
118 std::string scontact_points;
121 contact_points = scontact_points.c_str();
126 const char* port = NULL;
130 port = sport.c_str();
135 const char* user = NULL;
139 user = suser.c_str();
144 const char* password = NULL;
145 std::string spassword;
148 password = spassword.c_str();
153 const char* keyspace =
"keatest";
154 std::string skeyspace;
157 keyspace = skeyspace.c_str();
162 const char* consistency = NULL;
163 std::string sconsistency;
166 consistency = sconsistency.c_str();
171 const char* serial_consistency = NULL;
172 std::string sserial_consistency;
174 sserial_consistency =
getParameter(
"serial-consistency");
175 serial_consistency = sserial_consistency.c_str();
180 const char* reconnect_wait_time = NULL;
181 std::string sreconnect_wait_time;
183 sreconnect_wait_time =
getParameter(
"reconnect-wait-time");
184 reconnect_wait_time = sreconnect_wait_time.c_str();
189 const char* connect_timeout = NULL;
190 std::string sconnect_timeout;
193 connect_timeout = sconnect_timeout.c_str();
198 const char* request_timeout = NULL;
199 std::string srequest_timeout;
202 request_timeout = srequest_timeout.c_str();
207 const char* tcp_keepalive = NULL;
208 std::string stcp_keepalive;
211 tcp_keepalive = stcp_keepalive.c_str();
216 std::string stcp_nodelay;
224 cass_cluster_set_contact_points(
cluster_, contact_points);
226 if (user && password) {
227 cass_cluster_set_credentials(
cluster_, user, password);
233 port_number = boost::lexical_cast<int32_t>(port);
234 if (port_number < 1 || port_number > 65535) {
236 "CqlConnection::openDatabase(): "
237 "port outside of range, expected "
238 "1-65535, instead got "
241 }
catch (
const boost::bad_lexical_cast& ex) {
243 "CqlConnection::openDatabase(): invalid "
244 "port, expected castable to int, instead got "
246 <<
"\", " << ex.
what());
248 cass_cluster_set_port(
cluster_, port_number);
253 CassConsistency desired_serial_consistency = CASS_CONSISTENCY_UNKNOWN;
254 if (serial_consistency) {
257 if (desired_consistency != CASS_CONSISTENCY_UNKNOWN) {
258 setConsistency(
true, desired_consistency, desired_serial_consistency);
262 if (reconnect_wait_time) {
263 int32_t reconnect_wait_time_number;
265 reconnect_wait_time_number =
266 boost::lexical_cast<int32_t>(reconnect_wait_time);
267 if (reconnect_wait_time_number < 0) {
269 "CqlConnection::openDatabase(): invalid reconnect "
270 "wait time, expected positive number, instead got "
271 << reconnect_wait_time);
273 }
catch (
const boost::bad_lexical_cast& ex) {
275 "CqlConnection::openDatabase(): "
276 "invalid reconnect wait time, expected "
277 "castable to int, instead got \""
278 << reconnect_wait_time <<
"\", " << ex.
what());
280 #if (CASS_VERSION_MAJOR > 2) || \
281 ((CASS_VERSION_MAJOR == 2) && (CASS_VERSION_MINOR >= 13))
282 cass_uint64_t delay_ms =
283 static_cast<cass_uint64_t
>(reconnect_wait_time_number);
284 cass_cluster_set_constant_reconnect(
cluster_, delay_ms);
286 cass_cluster_set_reconnect_wait_time(
cluster_,
287 reconnect_wait_time_number);
291 if (connect_timeout) {
292 int32_t connect_timeout_number;
294 connect_timeout_number =
295 boost::lexical_cast<int32_t>(connect_timeout);
296 if (connect_timeout_number < 0) {
298 "CqlConnection::openDatabase(): "
299 "invalid connect timeout, expected "
300 "positive number, instead got "
303 }
catch (
const boost::bad_lexical_cast& ex) {
305 "CqlConnection::openDatabase(): invalid connect timeout, "
306 "expected castable to int, instead got \""
307 << connect_timeout <<
"\", " << ex.
what());
309 cass_cluster_set_connect_timeout(
cluster_, connect_timeout_number);
312 if (request_timeout) {
313 int32_t request_timeout_number;
315 request_timeout_number =
316 boost::lexical_cast<int32_t>(request_timeout);
317 if (request_timeout_number < 0) {
319 "CqlConnection::openDatabase(): "
320 "invalid request timeout, expected "
321 "positive number, instead got "
324 }
catch (
const boost::bad_lexical_cast& ex) {
326 "CqlConnection::openDatabase(): invalid request timeout, "
327 "expected castable to int, instead got \""
328 << request_timeout <<
"\", " << ex.
what());
330 cass_cluster_set_request_timeout(
cluster_, request_timeout_number);
334 int32_t tcp_keepalive_number;
336 tcp_keepalive_number = boost::lexical_cast<int32_t>(tcp_keepalive);
337 if (tcp_keepalive_number < 0) {
339 "CqlConnection::openDatabase(): "
340 "invalid TCP keepalive, expected "
341 "positive number, instead got "
344 }
catch (
const boost::bad_lexical_cast& ex) {
346 "CqlConnection::openDatabase(): invalid TCP keepalive, "
347 "expected castable to int, instead got \""
348 << tcp_keepalive <<
"\", " << ex.
what());
350 cass_cluster_set_tcp_keepalive(
cluster_, cass_true,
351 tcp_keepalive_number);
354 if (stcp_nodelay ==
"true") {
355 cass_cluster_set_tcp_nodelay(
cluster_, cass_true);
360 CassFuture* connect_future =
362 cass_future_wait(connect_future);
363 const std::string
error =
365 "cass_session_connect_keyspace() != CASS_OK",
367 rc = cass_future_error_code(connect_future);
368 cass_future_free(connect_future);
382 "!cass_schema_meta_keyspace_by_name()");
388 CassError rc = CASS_OK;
393 "CqlConnection::prepareStatements(): "
394 "duplicate statement with name "
395 << tagged_statement.
name_);
400 cass_future_wait(future);
401 const std::string
error =
403 " cass_session_prepare() != CASS_OK",
404 future, tagged_statement.
name_);
405 rc = cass_future_error_code(future);
407 cass_future_free(future);
413 cass_future_free(future);
419 CassConsistency consistency,
420 CassConsistency serial_consistency) {
445 CassError cass_error = cass_future_error_code(future);
446 const char* error_message;
447 size_t error_message_size;
448 cass_future_error_message(future, &error_message, &error_message_size);
450 std::stringstream stream;
451 if (statement_tag && std::strlen(statement_tag) > 0) {
453 stream <<
"Statement ";
454 stream << statement_tag;
457 stream <<
"Session action ";
459 if (cass_error == CASS_OK) {
460 stream <<
" executed successfully.";
462 stream <<
" failed, Kea error: " << what
463 <<
", Cassandra error code: " << cass_error_desc(cass_error)
464 <<
", Cassandra future error: " << error_message;
We want to reuse the database backend connection and exchange code for other uses, in particular for hook libraries.
StatementMap statements_
Pointer to external array of tagged statements containing statement name, array of names of bind para...
static std::pair< uint32_t, uint32_t > getVersion(const ParameterMap ¶meters)
Get the schema version.
StatementTag name_
Short description of the query.
virtual void commit()
Commit Transactions.
const CassPrepared * prepared_statement_
Internal Cassandra object representing the prepared statement.
DB_LOG & arg(T first, Args...args)
Pass parameters to replace logger placeholders.
bool force_consistency_
CQL consistency enabled.
void startTransaction()
Start transaction.
void prepareStatements(StatementMap &statements)
Prepare statements.
void openDatabase()
Open database.
virtual void rollback()
Rollback Transactions.
Common database connection class.
virtual ~CqlConnection()
Destructor.
Exception thrown on failure to open database.
char const *const StatementTag
Statement index representing the statement name.
const CassKeyspaceMeta * keyspace_meta_
Keyspace meta information, used for UDTs.
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
void setConsistency(bool force, CassConsistency consistency, CassConsistency serial_consistency)
Set consistency.
Defines a single statement or query.
CassConsistency consistency_
CQL consistency.
CassSession * session_
CQL session handle.
static CassConsistency parseConsistency(std::string value)
parse Consistency value
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
CassCluster * cluster_
CQL connection handle.
const int DB_DBG_TRACE_DETAIL
Database logging levels.
Defines the logger used by the top-level component of kea-dhcp-ddns.
Exchange used to retrieve schema version from the keyspace.
std::string getParameter(const std::string &name) const
Returns value of a connection parameter.
std::unordered_map< StatementTag, CqlTaggedStatement, StatementTagHash, StatementTagEqual > StatementMap
A container for all statements.
CassConsistency serial_consistency_
CQL serial consistency.
const CassSchemaMeta * schema_meta_
static StatementMap tagged_statements_
Cassandra statements.
Common CQL connector pool.
std::map< std::string, std::string > ParameterMap
Database configuration parameter map.
static const std::string checkFutureError(const std::string &what, CassFuture *future, StatementTag statement_tag=NULL)
Check for errors.
std::pair< StatementTag, CqlTaggedStatement > StatementMapEntry
A type for a single entry on the statements map.
Exception thrown on failure to execute a database function.
CqlConnection(const ParameterMap ¶meters)
Constructor.
char const *const text_
Text representation of the actual query.