| From 23e80b75508187baaa823a68ea019b72e0b2305c Mon Sep 17 00:00:00 2001 |
| From: Budai Laszlo <lbudai@balabit.hu> |
| Date: Tue, 12 Nov 2013 13:19:04 +0100 |
| Subject: [PATCH] afsql: afsql_dd_insert_db() refactor |
| |
| Upstream-Status: Backport |
| |
| A lot of the code that was previously in afsql_dd_insert_db() have been |
| extracted to smaller functions, and afsql_dd_insert_db() was rebuilt on |
| top of these. At the same time, memory leaks were plugged, and in case |
| of a transaction error, backlog rewinding has been fixed too, to not |
| loose messages since the last BEGIN command. |
| |
| Signed-off-by: Juhasz Viktor <jviktor@balabit.hu> |
| Signed-off-by: Laszlo Budai <lbudai@balabit.hu> |
| --- |
| modules/afsql/afsql.c | 301 ++++++++++++++++++++++++++++++++------------------ |
| 1 file changed, 192 insertions(+), 109 deletions(-) |
| |
| diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c |
| index 12f6aab..a6a8190 100644 |
| --- a/modules/afsql/afsql.c |
| +++ b/modules/afsql/afsql.c |
| @@ -456,24 +456,21 @@ afsql_dd_create_index(AFSqlDestDriver *s |
| * |
| * NOTE: This function can only be called from the database thread. |
| **/ |
| -static GString * |
| -afsql_dd_validate_table(AFSqlDestDriver *self, LogMessage *msg) |
| +static gboolean |
| +afsql_dd_validate_table(AFSqlDestDriver *self, GString *table) |
| { |
| - GString *query_string, *table; |
| + GString *query_string; |
| dbi_result db_res; |
| gboolean success = FALSE; |
| gint i; |
| |
| - table = g_string_sized_new(32); |
| - log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table); |
| - |
| if (self->flags & AFSQL_DDF_DONT_CREATE_TABLES) |
| - return table; |
| + return TRUE; |
| |
| afsql_dd_check_sql_identifier(table->str, TRUE); |
| |
| if (g_hash_table_lookup(self->validated_tables, table->str)) |
| - return table; |
| + return TRUE; |
| |
| query_string = g_string_sized_new(32); |
| g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table->str); |
| @@ -544,14 +541,9 @@ afsql_dd_validate_table(AFSqlDestDriver |
| /* we have successfully created/altered the destination table, record this information */ |
| g_hash_table_insert(self->validated_tables, g_strdup(table->str), GUINT_TO_POINTER(TRUE)); |
| } |
| - else |
| - { |
| - g_string_free(table, TRUE); |
| - table = NULL; |
| - } |
| g_string_free(query_string, TRUE); |
| |
| - return table; |
| + return success; |
| } |
| |
| /** |
| @@ -581,6 +573,20 @@ afsql_dd_begin_txn(AFSqlDestDriver *self |
| } |
| |
| /** |
| + * afsql_dd_handle_transaction_error: |
| + * |
| + * Handle errors inside during a SQL transaction (e.g. INSERT or COMMIT failures). |
| + * |
| + * NOTE: This function can only be called from the database thread. |
| + **/ |
| +static void |
| +afsql_dd_handle_transaction_error(AFSqlDestDriver *self) |
| +{ |
| + log_queue_rewind_backlog(self->queue); |
| + self->flush_lines_queued = 0; |
| +} |
| + |
| +/** |
| * afsql_dd_begin_txn: |
| * |
| * Commit SQL transaction. |
| @@ -596,14 +602,14 @@ afsql_dd_commit_txn(AFSqlDestDriver *sel |
| if (success) |
| { |
| log_queue_ack_backlog(self->queue, self->flush_lines_queued); |
| + self->flush_lines_queued = 0; |
| } |
| else |
| { |
| - msg_notice("SQL transaction commit failed, rewinding backlog and starting again", |
| - NULL); |
| - log_queue_rewind_backlog(self->queue); |
| + msg_error("SQL transaction commit failed, rewinding backlog and starting again", |
| + NULL); |
| + afsql_dd_handle_transaction_error(self); |
| } |
| - self->flush_lines_queued = 0; |
| return success; |
| } |
| |
| @@ -644,12 +650,13 @@ afsql_dd_set_dbd_opt_numeric(gpointer ke |
| } |
| |
| static gboolean |
| -afsql_dd_connect(AFSqlDestDriver *self) |
| +afsql_dd_ensure_initialized_connection(AFSqlDestDriver *self) |
| { |
| if (self->dbi_ctx) |
| return TRUE; |
| |
| self->dbi_ctx = dbi_conn_new(self->type); |
| + |
| if (!self->dbi_ctx) |
| { |
| msg_error("No such DBI driver", |
| @@ -659,10 +666,12 @@ afsql_dd_connect(AFSqlDestDriver *self) |
| } |
| |
| dbi_conn_set_option(self->dbi_ctx, "host", self->host); |
| + |
| if (strcmp(self->type, "mysql")) |
| dbi_conn_set_option(self->dbi_ctx, "port", self->port); |
| else |
| dbi_conn_set_option_numeric(self->dbi_ctx, "port", atoi(self->port)); |
| + |
| dbi_conn_set_option(self->dbi_ctx, "username", self->user); |
| dbi_conn_set_option(self->dbi_ctx, "password", self->password); |
| dbi_conn_set_option(self->dbi_ctx, "dbname", self->database); |
| @@ -691,6 +700,7 @@ afsql_dd_connect(AFSqlDestDriver *self) |
| evt_tag_str("database", self->database), |
| evt_tag_str("error", dbi_error), |
| NULL); |
| + |
| return FALSE; |
| } |
| |
| @@ -713,104 +723,145 @@ afsql_dd_connect(AFSqlDestDriver *self) |
| return TRUE; |
| } |
| |
| -static gboolean |
| -afsql_dd_insert_fail_handler(AFSqlDestDriver *self, LogMessage *msg, |
| - LogPathOptions *path_options) |
| +static GString * |
| +afsql_dd_ensure_accessible_database_table(AFSqlDestDriver *self, LogMessage *msg) |
| { |
| - if (self->failed_message_counter < self->num_retries - 1) |
| - { |
| - log_queue_push_head(self->queue, msg, path_options); |
| - |
| - /* database connection status sanity check after failed query */ |
| - if (dbi_conn_ping(self->dbi_ctx) != 1) |
| - { |
| - const gchar *dbi_error; |
| - |
| - dbi_conn_error(self->dbi_ctx, &dbi_error); |
| - msg_error("Error, no SQL connection after failed query attempt", |
| - evt_tag_str("type", self->type), |
| - evt_tag_str("host", self->host), |
| - evt_tag_str("port", self->port), |
| - evt_tag_str("username", self->user), |
| - evt_tag_str("database", self->database), |
| - evt_tag_str("error", dbi_error), |
| - NULL); |
| - return FALSE; |
| - } |
| + GString *table = g_string_sized_new(32); |
| + log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table); |
| |
| - self->failed_message_counter++; |
| - return FALSE; |
| + if (!afsql_dd_validate_table(self, table)) |
| + { |
| + /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */ |
| + msg_error("Error checking table, disconnecting from database, trying again shortly", |
| + evt_tag_int("time_reopen", self->time_reopen), |
| + NULL); |
| + g_string_free(table, TRUE); |
| + return NULL; |
| } |
| |
| - msg_error("Multiple failures while inserting this record into the database, message dropped", |
| - evt_tag_int("attempts", self->num_retries), |
| - NULL); |
| - stats_counter_inc(self->dropped_messages); |
| - log_msg_drop(msg, path_options); |
| - self->failed_message_counter = 0; |
| - return TRUE; |
| + return table; |
| } |
| |
| static GString * |
| -afsql_dd_construct_query(AFSqlDestDriver *self, GString *table, |
| - LogMessage *msg) |
| +afsql_dd_build_insert_command(AFSqlDestDriver *self, LogMessage *msg, GString *table) |
| { |
| - GString *value; |
| - GString *query_string; |
| - gint i; |
| + GString *insert_command = g_string_sized_new(256); |
| + GString *value = g_string_sized_new(512); |
| + gint i, j; |
| |
| - value = g_string_sized_new(256); |
| - query_string = g_string_sized_new(512); |
| + g_string_printf(insert_command, "INSERT INTO %s (", table->str); |
| |
| - g_string_printf(query_string, "INSERT INTO %s (", table->str); |
| for (i = 0; i < self->fields_len; i++) |
| { |
| - g_string_append(query_string, self->fields[i].name); |
| - if (i != self->fields_len - 1) |
| - g_string_append(query_string, ", "); |
| + if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL) |
| + { |
| + g_string_append(insert_command, self->fields[i].name); |
| + |
| + j = i + 1; |
| + while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT) |
| + j++; |
| + |
| + if (j < self->fields_len) |
| + g_string_append(insert_command, ", "); |
| + } |
| } |
| - g_string_append(query_string, ") VALUES ("); |
| + |
| + g_string_append(insert_command, ") VALUES ("); |
| |
| for (i = 0; i < self->fields_len; i++) |
| { |
| gchar *quoted; |
| |
| - if (self->fields[i].value == NULL) |
| - { |
| - /* the config used the 'default' value for this column -> the fields[i].value is NULL, use SQL default */ |
| - g_string_append(query_string, "DEFAULT"); |
| - } |
| - else |
| + if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL) |
| { |
| log_template_format(self->fields[i].value, msg, &self->template_options, LTZ_SEND, self->seq_num, NULL, value); |
| - |
| if (self->null_value && strcmp(self->null_value, value->str) == 0) |
| { |
| - g_string_append(query_string, "NULL"); |
| + g_string_append(insert_command, "NULL"); |
| } |
| else |
| { |
| dbi_conn_quote_string_copy(self->dbi_ctx, value->str, "ed); |
| if (quoted) |
| { |
| - g_string_append(query_string, quoted); |
| + g_string_append(insert_command, quoted); |
| free(quoted); |
| } |
| else |
| { |
| - g_string_append(query_string, "''"); |
| + g_string_append(insert_command, "''"); |
| } |
| } |
| - } |
| |
| - if (i != self->fields_len - 1) |
| - g_string_append(query_string, ", "); |
| + j = i + 1; |
| + while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT) |
| + j++; |
| + if (j < self->fields_len) |
| + g_string_append(insert_command, ", "); |
| + } |
| } |
| - g_string_append(query_string, ")"); |
| + |
| + g_string_append(insert_command, ")"); |
| |
| g_string_free(value, TRUE); |
| |
| - return query_string; |
| + return insert_command; |
| +} |
| + |
| +static inline gboolean |
| +afsql_dd_is_transaction_handling_enabled(const AFSqlDestDriver *self) |
| +{ |
| + return self->flush_lines_queued != -1; |
| +} |
| + |
| +static inline gboolean |
| +afsql_dd_should_start_new_transaction(const AFSqlDestDriver *self) |
| +{ |
| + return self->flush_lines_queued == 0; |
| +} |
| + |
| +static inline gboolean |
| +afsql_dd_should_commit_transaction(const AFSqlDestDriver *self) |
| +{ |
| + return afsql_dd_is_transaction_handling_enabled(self) && self->flush_lines_queued == self->flush_lines; |
| +} |
| + |
| +static inline gboolean |
| +afsql_dd_handle_insert_row_error_depending_on_connection_availability(AFSqlDestDriver *self, |
| + LogMessage *msg, |
| + LogPathOptions *path_options) |
| +{ |
| + const gchar *dbi_error, *error_message; |
| + |
| + if (dbi_conn_ping(self->dbi_ctx) == 1) |
| + { |
| + log_queue_push_head(self->queue, msg, path_options); |
| + return TRUE; |
| + } |
| + |
| + if (afsql_dd_is_transaction_handling_enabled(self)) |
| + { |
| + error_message = "SQL connection lost in the middle of a transaction," |
| + " rewinding backlog and starting again"; |
| + afsql_dd_handle_transaction_error(self); |
| + } |
| + else |
| + { |
| + error_message = "Error, no SQL connection after failed query attempt"; |
| + log_queue_push_head(self->queue, msg, path_options); |
| + } |
| + |
| + dbi_conn_error(self->dbi_ctx, &dbi_error); |
| + msg_error(error_message, |
| + evt_tag_str("type", self->type), |
| + evt_tag_str("host", self->host), |
| + evt_tag_str("port", self->port), |
| + evt_tag_str("username", self->user), |
| + evt_tag_str("database", self->database), |
| + evt_tag_str("error", dbi_error), |
| + NULL); |
| + |
| + return FALSE; |
| } |
| |
| /** |
| @@ -824,61 +875,93 @@ afsql_dd_construct_query(AFSqlDestDriver |
| static gboolean |
| afsql_dd_insert_db(AFSqlDestDriver *self) |
| { |
| - GString *table, *query_string; |
| + GString *table = NULL; |
| + GString *insert_command = NULL; |
| LogMessage *msg; |
| gboolean success; |
| LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; |
| |
| - afsql_dd_connect(self); |
| + if (!afsql_dd_ensure_initialized_connection(self)) |
| + return FALSE; |
| |
| - success = log_queue_pop_head(self->queue, &msg, &path_options, (self->flags & AFSQL_DDF_EXPLICIT_COMMITS), FALSE); |
| + /* connection established, try to insert a message */ |
| + success = log_queue_pop_head(self->queue, &msg, &path_options, FALSE, self->flags & AFSQL_DDF_EXPLICIT_COMMITS); |
| if (!success) |
| return TRUE; |
| |
| msg_set_context(msg); |
| |
| - table = afsql_dd_validate_table(self, msg); |
| + table = afsql_dd_ensure_accessible_database_table(self, msg); |
| + |
| if (!table) |
| { |
| - /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */ |
| - msg_error("Error checking table, disconnecting from database, trying again shortly", |
| - evt_tag_int("time_reopen", self->time_reopen), |
| - NULL); |
| - msg_set_context(NULL); |
| - g_string_free(table, TRUE); |
| - return afsql_dd_insert_fail_handler(self, msg, &path_options); |
| + success = FALSE; |
| + goto out; |
| } |
| |
| - query_string = afsql_dd_construct_query(self, table, msg); |
| + if (afsql_dd_should_start_new_transaction(self) && !afsql_dd_begin_txn(self)) |
| + { |
| + success = FALSE; |
| + goto out; |
| + } |
| |
| - if (self->flush_lines_queued == 0 && !afsql_dd_begin_txn(self)) |
| - return FALSE; |
| + insert_command = afsql_dd_build_insert_command(self, msg, table); |
| + success = afsql_dd_run_query(self, insert_command->str, FALSE, NULL); |
| |
| - success = afsql_dd_run_query(self, query_string->str, FALSE, NULL); |
| if (success && self->flush_lines_queued != -1) |
| { |
| self->flush_lines_queued++; |
| |
| - if (self->flush_lines && self->flush_lines_queued == self->flush_lines && !afsql_dd_commit_txn(self)) |
| - return FALSE; |
| + if (afsql_dd_should_commit_transaction(self) && !afsql_dd_commit_txn(self)) |
| + { |
| + /* Assuming that in case of error, the queue is rewound by afsql_dd_commit_txn() */ |
| + |
| + g_string_free(insert_command, TRUE); |
| + msg_set_context(NULL); |
| + |
| + return FALSE; |
| + } |
| } |
| |
| - g_string_free(table, TRUE); |
| - g_string_free(query_string, TRUE); |
| + out: |
| + |
| + if (table != NULL) |
| + g_string_free(table, TRUE); |
| + |
| + if (insert_command != NULL) |
| + g_string_free(insert_command, TRUE); |
| |
| msg_set_context(NULL); |
| |
| - if (!success) |
| - return afsql_dd_insert_fail_handler(self, msg, &path_options); |
| + if (success) |
| + { |
| + log_msg_ack(msg, &path_options); |
| + log_msg_unref(msg); |
| + step_sequence_number(&self->seq_num); |
| + self->failed_message_counter = 0; |
| + } |
| + else |
| + { |
| + if (self->failed_message_counter < self->num_retries - 1) |
| + { |
| + if (!afsql_dd_handle_insert_row_error_depending_on_connection_availability(self, msg, &path_options)) |
| + return FALSE; |
| |
| - /* we only ACK if each INSERT is a separate transaction */ |
| - if ((self->flags & AFSQL_DDF_EXPLICIT_COMMITS) == 0) |
| - log_msg_ack(msg, &path_options); |
| - log_msg_unref(msg); |
| - step_sequence_number(&self->seq_num); |
| - self->failed_message_counter = 0; |
| + self->failed_message_counter++; |
| + } |
| + else |
| + { |
| + msg_error("Multiple failures while inserting this record into the database, message dropped", |
| + evt_tag_int("attempts", self->num_retries), |
| + NULL); |
| + stats_counter_inc(self->dropped_messages); |
| + log_msg_drop(msg, &path_options); |
| + self->failed_message_counter = 0; |
| + success = TRUE; |
| + } |
| + } |
| |
| - return TRUE; |
| + return success; |
| } |
| |
| static void |
| @@ -895,7 +978,7 @@ afsql_dd_message_became_available_in_the |
| static void |
| afsql_dd_wait_for_suspension_wakeup(AFSqlDestDriver *self) |
| { |
| - /* we got suspended, probably because of a connection error, |
| + /* we got suspended, probably because of a connection error, |
| * during this time we only get wakeups if we need to be |
| * terminated. */ |
| if (!self->db_thread_terminate) |
| @@ -974,8 +1057,7 @@ afsql_dd_database_thread(gpointer arg) |
| |
| afsql_dd_commit_txn(self); |
| } |
| - |
| - exit: |
| +exit: |
| afsql_dd_disconnect(self); |
| |
| msg_verbose("Database thread finished", |
| -- |
| 1.8.4.1 |
| |