Patrick Williams | b48b7b4 | 2016-08-17 15:04:38 -0500 | [diff] [blame] | 1 | From 23e80b75508187baaa823a68ea019b72e0b2305c Mon Sep 17 00:00:00 2001 |
| 2 | From: Budai Laszlo <lbudai@balabit.hu> |
| 3 | Date: Tue, 12 Nov 2013 13:19:04 +0100 |
| 4 | Subject: [PATCH] afsql: afsql_dd_insert_db() refactor |
| 5 | |
| 6 | Upstream-Status: Backport |
| 7 | |
| 8 | A lot of the code that was previously in afsql_dd_insert_db() have been |
| 9 | extracted to smaller functions, and afsql_dd_insert_db() was rebuilt on |
| 10 | top of these. At the same time, memory leaks were plugged, and in case |
| 11 | of a transaction error, backlog rewinding has been fixed too, to not |
| 12 | loose messages since the last BEGIN command. |
| 13 | |
| 14 | Signed-off-by: Juhasz Viktor <jviktor@balabit.hu> |
| 15 | Signed-off-by: Laszlo Budai <lbudai@balabit.hu> |
| 16 | --- |
| 17 | modules/afsql/afsql.c | 301 ++++++++++++++++++++++++++++++++------------------ |
| 18 | 1 file changed, 192 insertions(+), 109 deletions(-) |
| 19 | |
| 20 | diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c |
| 21 | index 12f6aab..a6a8190 100644 |
| 22 | --- a/modules/afsql/afsql.c |
| 23 | +++ b/modules/afsql/afsql.c |
| 24 | @@ -456,24 +456,21 @@ afsql_dd_create_index(AFSqlDestDriver *s |
| 25 | * |
| 26 | * NOTE: This function can only be called from the database thread. |
| 27 | **/ |
| 28 | -static GString * |
| 29 | -afsql_dd_validate_table(AFSqlDestDriver *self, LogMessage *msg) |
| 30 | +static gboolean |
| 31 | +afsql_dd_validate_table(AFSqlDestDriver *self, GString *table) |
| 32 | { |
| 33 | - GString *query_string, *table; |
| 34 | + GString *query_string; |
| 35 | dbi_result db_res; |
| 36 | gboolean success = FALSE; |
| 37 | gint i; |
| 38 | |
| 39 | - table = g_string_sized_new(32); |
| 40 | - log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table); |
| 41 | - |
| 42 | if (self->flags & AFSQL_DDF_DONT_CREATE_TABLES) |
| 43 | - return table; |
| 44 | + return TRUE; |
| 45 | |
| 46 | afsql_dd_check_sql_identifier(table->str, TRUE); |
| 47 | |
| 48 | if (g_hash_table_lookup(self->validated_tables, table->str)) |
| 49 | - return table; |
| 50 | + return TRUE; |
| 51 | |
| 52 | query_string = g_string_sized_new(32); |
| 53 | g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table->str); |
| 54 | @@ -544,14 +541,9 @@ afsql_dd_validate_table(AFSqlDestDriver |
| 55 | /* we have successfully created/altered the destination table, record this information */ |
| 56 | g_hash_table_insert(self->validated_tables, g_strdup(table->str), GUINT_TO_POINTER(TRUE)); |
| 57 | } |
| 58 | - else |
| 59 | - { |
| 60 | - g_string_free(table, TRUE); |
| 61 | - table = NULL; |
| 62 | - } |
| 63 | g_string_free(query_string, TRUE); |
| 64 | |
| 65 | - return table; |
| 66 | + return success; |
| 67 | } |
| 68 | |
| 69 | /** |
| 70 | @@ -581,6 +573,20 @@ afsql_dd_begin_txn(AFSqlDestDriver *self |
| 71 | } |
| 72 | |
| 73 | /** |
| 74 | + * afsql_dd_handle_transaction_error: |
| 75 | + * |
| 76 | + * Handle errors inside during a SQL transaction (e.g. INSERT or COMMIT failures). |
| 77 | + * |
| 78 | + * NOTE: This function can only be called from the database thread. |
| 79 | + **/ |
| 80 | +static void |
| 81 | +afsql_dd_handle_transaction_error(AFSqlDestDriver *self) |
| 82 | +{ |
| 83 | + log_queue_rewind_backlog(self->queue); |
| 84 | + self->flush_lines_queued = 0; |
| 85 | +} |
| 86 | + |
| 87 | +/** |
| 88 | * afsql_dd_begin_txn: |
| 89 | * |
| 90 | * Commit SQL transaction. |
| 91 | @@ -596,14 +602,14 @@ afsql_dd_commit_txn(AFSqlDestDriver *sel |
| 92 | if (success) |
| 93 | { |
| 94 | log_queue_ack_backlog(self->queue, self->flush_lines_queued); |
| 95 | + self->flush_lines_queued = 0; |
| 96 | } |
| 97 | else |
| 98 | { |
| 99 | - msg_notice("SQL transaction commit failed, rewinding backlog and starting again", |
| 100 | - NULL); |
| 101 | - log_queue_rewind_backlog(self->queue); |
| 102 | + msg_error("SQL transaction commit failed, rewinding backlog and starting again", |
| 103 | + NULL); |
| 104 | + afsql_dd_handle_transaction_error(self); |
| 105 | } |
| 106 | - self->flush_lines_queued = 0; |
| 107 | return success; |
| 108 | } |
| 109 | |
| 110 | @@ -644,12 +650,13 @@ afsql_dd_set_dbd_opt_numeric(gpointer ke |
| 111 | } |
| 112 | |
| 113 | static gboolean |
| 114 | -afsql_dd_connect(AFSqlDestDriver *self) |
| 115 | +afsql_dd_ensure_initialized_connection(AFSqlDestDriver *self) |
| 116 | { |
| 117 | if (self->dbi_ctx) |
| 118 | return TRUE; |
| 119 | |
| 120 | self->dbi_ctx = dbi_conn_new(self->type); |
| 121 | + |
| 122 | if (!self->dbi_ctx) |
| 123 | { |
| 124 | msg_error("No such DBI driver", |
| 125 | @@ -659,10 +666,12 @@ afsql_dd_connect(AFSqlDestDriver *self) |
| 126 | } |
| 127 | |
| 128 | dbi_conn_set_option(self->dbi_ctx, "host", self->host); |
| 129 | + |
| 130 | if (strcmp(self->type, "mysql")) |
| 131 | dbi_conn_set_option(self->dbi_ctx, "port", self->port); |
| 132 | else |
| 133 | dbi_conn_set_option_numeric(self->dbi_ctx, "port", atoi(self->port)); |
| 134 | + |
| 135 | dbi_conn_set_option(self->dbi_ctx, "username", self->user); |
| 136 | dbi_conn_set_option(self->dbi_ctx, "password", self->password); |
| 137 | dbi_conn_set_option(self->dbi_ctx, "dbname", self->database); |
| 138 | @@ -691,6 +700,7 @@ afsql_dd_connect(AFSqlDestDriver *self) |
| 139 | evt_tag_str("database", self->database), |
| 140 | evt_tag_str("error", dbi_error), |
| 141 | NULL); |
| 142 | + |
| 143 | return FALSE; |
| 144 | } |
| 145 | |
| 146 | @@ -713,104 +723,145 @@ afsql_dd_connect(AFSqlDestDriver *self) |
| 147 | return TRUE; |
| 148 | } |
| 149 | |
| 150 | -static gboolean |
| 151 | -afsql_dd_insert_fail_handler(AFSqlDestDriver *self, LogMessage *msg, |
| 152 | - LogPathOptions *path_options) |
| 153 | +static GString * |
| 154 | +afsql_dd_ensure_accessible_database_table(AFSqlDestDriver *self, LogMessage *msg) |
| 155 | { |
| 156 | - if (self->failed_message_counter < self->num_retries - 1) |
| 157 | - { |
| 158 | - log_queue_push_head(self->queue, msg, path_options); |
| 159 | - |
| 160 | - /* database connection status sanity check after failed query */ |
| 161 | - if (dbi_conn_ping(self->dbi_ctx) != 1) |
| 162 | - { |
| 163 | - const gchar *dbi_error; |
| 164 | - |
| 165 | - dbi_conn_error(self->dbi_ctx, &dbi_error); |
| 166 | - msg_error("Error, no SQL connection after failed query attempt", |
| 167 | - evt_tag_str("type", self->type), |
| 168 | - evt_tag_str("host", self->host), |
| 169 | - evt_tag_str("port", self->port), |
| 170 | - evt_tag_str("username", self->user), |
| 171 | - evt_tag_str("database", self->database), |
| 172 | - evt_tag_str("error", dbi_error), |
| 173 | - NULL); |
| 174 | - return FALSE; |
| 175 | - } |
| 176 | + GString *table = g_string_sized_new(32); |
| 177 | + log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table); |
| 178 | |
| 179 | - self->failed_message_counter++; |
| 180 | - return FALSE; |
| 181 | + if (!afsql_dd_validate_table(self, table)) |
| 182 | + { |
| 183 | + /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */ |
| 184 | + msg_error("Error checking table, disconnecting from database, trying again shortly", |
| 185 | + evt_tag_int("time_reopen", self->time_reopen), |
| 186 | + NULL); |
| 187 | + g_string_free(table, TRUE); |
| 188 | + return NULL; |
| 189 | } |
| 190 | |
| 191 | - msg_error("Multiple failures while inserting this record into the database, message dropped", |
| 192 | - evt_tag_int("attempts", self->num_retries), |
| 193 | - NULL); |
| 194 | - stats_counter_inc(self->dropped_messages); |
| 195 | - log_msg_drop(msg, path_options); |
| 196 | - self->failed_message_counter = 0; |
| 197 | - return TRUE; |
| 198 | + return table; |
| 199 | } |
| 200 | |
| 201 | static GString * |
| 202 | -afsql_dd_construct_query(AFSqlDestDriver *self, GString *table, |
| 203 | - LogMessage *msg) |
| 204 | +afsql_dd_build_insert_command(AFSqlDestDriver *self, LogMessage *msg, GString *table) |
| 205 | { |
| 206 | - GString *value; |
| 207 | - GString *query_string; |
| 208 | - gint i; |
| 209 | + GString *insert_command = g_string_sized_new(256); |
| 210 | + GString *value = g_string_sized_new(512); |
| 211 | + gint i, j; |
| 212 | |
| 213 | - value = g_string_sized_new(256); |
| 214 | - query_string = g_string_sized_new(512); |
| 215 | + g_string_printf(insert_command, "INSERT INTO %s (", table->str); |
| 216 | |
| 217 | - g_string_printf(query_string, "INSERT INTO %s (", table->str); |
| 218 | for (i = 0; i < self->fields_len; i++) |
| 219 | { |
| 220 | - g_string_append(query_string, self->fields[i].name); |
| 221 | - if (i != self->fields_len - 1) |
| 222 | - g_string_append(query_string, ", "); |
| 223 | + if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL) |
| 224 | + { |
| 225 | + g_string_append(insert_command, self->fields[i].name); |
| 226 | + |
| 227 | + j = i + 1; |
| 228 | + while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT) |
| 229 | + j++; |
| 230 | + |
| 231 | + if (j < self->fields_len) |
| 232 | + g_string_append(insert_command, ", "); |
| 233 | + } |
| 234 | } |
| 235 | - g_string_append(query_string, ") VALUES ("); |
| 236 | + |
| 237 | + g_string_append(insert_command, ") VALUES ("); |
| 238 | |
| 239 | for (i = 0; i < self->fields_len; i++) |
| 240 | { |
| 241 | gchar *quoted; |
| 242 | |
| 243 | - if (self->fields[i].value == NULL) |
| 244 | - { |
| 245 | - /* the config used the 'default' value for this column -> the fields[i].value is NULL, use SQL default */ |
| 246 | - g_string_append(query_string, "DEFAULT"); |
| 247 | - } |
| 248 | - else |
| 249 | + if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL) |
| 250 | { |
| 251 | log_template_format(self->fields[i].value, msg, &self->template_options, LTZ_SEND, self->seq_num, NULL, value); |
| 252 | - |
| 253 | if (self->null_value && strcmp(self->null_value, value->str) == 0) |
| 254 | { |
| 255 | - g_string_append(query_string, "NULL"); |
| 256 | + g_string_append(insert_command, "NULL"); |
| 257 | } |
| 258 | else |
| 259 | { |
| 260 | dbi_conn_quote_string_copy(self->dbi_ctx, value->str, "ed); |
| 261 | if (quoted) |
| 262 | { |
| 263 | - g_string_append(query_string, quoted); |
| 264 | + g_string_append(insert_command, quoted); |
| 265 | free(quoted); |
| 266 | } |
| 267 | else |
| 268 | { |
| 269 | - g_string_append(query_string, "''"); |
| 270 | + g_string_append(insert_command, "''"); |
| 271 | } |
| 272 | } |
| 273 | - } |
| 274 | |
| 275 | - if (i != self->fields_len - 1) |
| 276 | - g_string_append(query_string, ", "); |
| 277 | + j = i + 1; |
| 278 | + while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT) |
| 279 | + j++; |
| 280 | + if (j < self->fields_len) |
| 281 | + g_string_append(insert_command, ", "); |
| 282 | + } |
| 283 | } |
| 284 | - g_string_append(query_string, ")"); |
| 285 | + |
| 286 | + g_string_append(insert_command, ")"); |
| 287 | |
| 288 | g_string_free(value, TRUE); |
| 289 | |
| 290 | - return query_string; |
| 291 | + return insert_command; |
| 292 | +} |
| 293 | + |
| 294 | +static inline gboolean |
| 295 | +afsql_dd_is_transaction_handling_enabled(const AFSqlDestDriver *self) |
| 296 | +{ |
| 297 | + return self->flush_lines_queued != -1; |
| 298 | +} |
| 299 | + |
| 300 | +static inline gboolean |
| 301 | +afsql_dd_should_start_new_transaction(const AFSqlDestDriver *self) |
| 302 | +{ |
| 303 | + return self->flush_lines_queued == 0; |
| 304 | +} |
| 305 | + |
| 306 | +static inline gboolean |
| 307 | +afsql_dd_should_commit_transaction(const AFSqlDestDriver *self) |
| 308 | +{ |
| 309 | + return afsql_dd_is_transaction_handling_enabled(self) && self->flush_lines_queued == self->flush_lines; |
| 310 | +} |
| 311 | + |
| 312 | +static inline gboolean |
| 313 | +afsql_dd_handle_insert_row_error_depending_on_connection_availability(AFSqlDestDriver *self, |
| 314 | + LogMessage *msg, |
| 315 | + LogPathOptions *path_options) |
| 316 | +{ |
| 317 | + const gchar *dbi_error, *error_message; |
| 318 | + |
| 319 | + if (dbi_conn_ping(self->dbi_ctx) == 1) |
| 320 | + { |
| 321 | + log_queue_push_head(self->queue, msg, path_options); |
| 322 | + return TRUE; |
| 323 | + } |
| 324 | + |
| 325 | + if (afsql_dd_is_transaction_handling_enabled(self)) |
| 326 | + { |
| 327 | + error_message = "SQL connection lost in the middle of a transaction," |
| 328 | + " rewinding backlog and starting again"; |
| 329 | + afsql_dd_handle_transaction_error(self); |
| 330 | + } |
| 331 | + else |
| 332 | + { |
| 333 | + error_message = "Error, no SQL connection after failed query attempt"; |
| 334 | + log_queue_push_head(self->queue, msg, path_options); |
| 335 | + } |
| 336 | + |
| 337 | + dbi_conn_error(self->dbi_ctx, &dbi_error); |
| 338 | + msg_error(error_message, |
| 339 | + evt_tag_str("type", self->type), |
| 340 | + evt_tag_str("host", self->host), |
| 341 | + evt_tag_str("port", self->port), |
| 342 | + evt_tag_str("username", self->user), |
| 343 | + evt_tag_str("database", self->database), |
| 344 | + evt_tag_str("error", dbi_error), |
| 345 | + NULL); |
| 346 | + |
| 347 | + return FALSE; |
| 348 | } |
| 349 | |
| 350 | /** |
| 351 | @@ -824,61 +875,93 @@ afsql_dd_construct_query(AFSqlDestDriver |
| 352 | static gboolean |
| 353 | afsql_dd_insert_db(AFSqlDestDriver *self) |
| 354 | { |
| 355 | - GString *table, *query_string; |
| 356 | + GString *table = NULL; |
| 357 | + GString *insert_command = NULL; |
| 358 | LogMessage *msg; |
| 359 | gboolean success; |
| 360 | LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; |
| 361 | |
| 362 | - afsql_dd_connect(self); |
| 363 | + if (!afsql_dd_ensure_initialized_connection(self)) |
| 364 | + return FALSE; |
| 365 | |
| 366 | - success = log_queue_pop_head(self->queue, &msg, &path_options, (self->flags & AFSQL_DDF_EXPLICIT_COMMITS), FALSE); |
| 367 | + /* connection established, try to insert a message */ |
| 368 | + success = log_queue_pop_head(self->queue, &msg, &path_options, FALSE, self->flags & AFSQL_DDF_EXPLICIT_COMMITS); |
| 369 | if (!success) |
| 370 | return TRUE; |
| 371 | |
| 372 | msg_set_context(msg); |
| 373 | |
| 374 | - table = afsql_dd_validate_table(self, msg); |
| 375 | + table = afsql_dd_ensure_accessible_database_table(self, msg); |
| 376 | + |
| 377 | if (!table) |
| 378 | { |
| 379 | - /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */ |
| 380 | - msg_error("Error checking table, disconnecting from database, trying again shortly", |
| 381 | - evt_tag_int("time_reopen", self->time_reopen), |
| 382 | - NULL); |
| 383 | - msg_set_context(NULL); |
| 384 | - g_string_free(table, TRUE); |
| 385 | - return afsql_dd_insert_fail_handler(self, msg, &path_options); |
| 386 | + success = FALSE; |
| 387 | + goto out; |
| 388 | } |
| 389 | |
| 390 | - query_string = afsql_dd_construct_query(self, table, msg); |
| 391 | + if (afsql_dd_should_start_new_transaction(self) && !afsql_dd_begin_txn(self)) |
| 392 | + { |
| 393 | + success = FALSE; |
| 394 | + goto out; |
| 395 | + } |
| 396 | |
| 397 | - if (self->flush_lines_queued == 0 && !afsql_dd_begin_txn(self)) |
| 398 | - return FALSE; |
| 399 | + insert_command = afsql_dd_build_insert_command(self, msg, table); |
| 400 | + success = afsql_dd_run_query(self, insert_command->str, FALSE, NULL); |
| 401 | |
| 402 | - success = afsql_dd_run_query(self, query_string->str, FALSE, NULL); |
| 403 | if (success && self->flush_lines_queued != -1) |
| 404 | { |
| 405 | self->flush_lines_queued++; |
| 406 | |
| 407 | - if (self->flush_lines && self->flush_lines_queued == self->flush_lines && !afsql_dd_commit_txn(self)) |
| 408 | - return FALSE; |
| 409 | + if (afsql_dd_should_commit_transaction(self) && !afsql_dd_commit_txn(self)) |
| 410 | + { |
| 411 | + /* Assuming that in case of error, the queue is rewound by afsql_dd_commit_txn() */ |
| 412 | + |
| 413 | + g_string_free(insert_command, TRUE); |
| 414 | + msg_set_context(NULL); |
| 415 | + |
| 416 | + return FALSE; |
| 417 | + } |
| 418 | } |
| 419 | |
| 420 | - g_string_free(table, TRUE); |
| 421 | - g_string_free(query_string, TRUE); |
| 422 | + out: |
| 423 | + |
| 424 | + if (table != NULL) |
| 425 | + g_string_free(table, TRUE); |
| 426 | + |
| 427 | + if (insert_command != NULL) |
| 428 | + g_string_free(insert_command, TRUE); |
| 429 | |
| 430 | msg_set_context(NULL); |
| 431 | |
| 432 | - if (!success) |
| 433 | - return afsql_dd_insert_fail_handler(self, msg, &path_options); |
| 434 | + if (success) |
| 435 | + { |
| 436 | + log_msg_ack(msg, &path_options); |
| 437 | + log_msg_unref(msg); |
| 438 | + step_sequence_number(&self->seq_num); |
| 439 | + self->failed_message_counter = 0; |
| 440 | + } |
| 441 | + else |
| 442 | + { |
| 443 | + if (self->failed_message_counter < self->num_retries - 1) |
| 444 | + { |
| 445 | + if (!afsql_dd_handle_insert_row_error_depending_on_connection_availability(self, msg, &path_options)) |
| 446 | + return FALSE; |
| 447 | |
| 448 | - /* we only ACK if each INSERT is a separate transaction */ |
| 449 | - if ((self->flags & AFSQL_DDF_EXPLICIT_COMMITS) == 0) |
| 450 | - log_msg_ack(msg, &path_options); |
| 451 | - log_msg_unref(msg); |
| 452 | - step_sequence_number(&self->seq_num); |
| 453 | - self->failed_message_counter = 0; |
| 454 | + self->failed_message_counter++; |
| 455 | + } |
| 456 | + else |
| 457 | + { |
| 458 | + msg_error("Multiple failures while inserting this record into the database, message dropped", |
| 459 | + evt_tag_int("attempts", self->num_retries), |
| 460 | + NULL); |
| 461 | + stats_counter_inc(self->dropped_messages); |
| 462 | + log_msg_drop(msg, &path_options); |
| 463 | + self->failed_message_counter = 0; |
| 464 | + success = TRUE; |
| 465 | + } |
| 466 | + } |
| 467 | |
| 468 | - return TRUE; |
| 469 | + return success; |
| 470 | } |
| 471 | |
| 472 | static void |
| 473 | @@ -895,7 +978,7 @@ afsql_dd_message_became_available_in_the |
| 474 | static void |
| 475 | afsql_dd_wait_for_suspension_wakeup(AFSqlDestDriver *self) |
| 476 | { |
| 477 | - /* we got suspended, probably because of a connection error, |
| 478 | + /* we got suspended, probably because of a connection error, |
| 479 | * during this time we only get wakeups if we need to be |
| 480 | * terminated. */ |
| 481 | if (!self->db_thread_terminate) |
| 482 | @@ -974,8 +1057,7 @@ afsql_dd_database_thread(gpointer arg) |
| 483 | |
| 484 | afsql_dd_commit_txn(self); |
| 485 | } |
| 486 | - |
| 487 | - exit: |
| 488 | +exit: |
| 489 | afsql_dd_disconnect(self); |
| 490 | |
| 491 | msg_verbose("Database thread finished", |
| 492 | -- |
| 493 | 1.8.4.1 |
| 494 | |