blob: 42e181bb1f4733becae439d09db6a9087e41a354 [file] [log] [blame]
Patrick Williamsb48b7b42016-08-17 15:04:38 -05001From 23e80b75508187baaa823a68ea019b72e0b2305c Mon Sep 17 00:00:00 2001
2From: Budai Laszlo <lbudai@balabit.hu>
3Date: Tue, 12 Nov 2013 13:19:04 +0100
4Subject: [PATCH] afsql: afsql_dd_insert_db() refactor
5
6Upstream-Status: Backport
7
8A lot of the code that was previously in afsql_dd_insert_db() have been
9extracted to smaller functions, and afsql_dd_insert_db() was rebuilt on
10top of these. At the same time, memory leaks were plugged, and in case
11of a transaction error, backlog rewinding has been fixed too, to not
12loose messages since the last BEGIN command.
13
14Signed-off-by: Juhasz Viktor <jviktor@balabit.hu>
15Signed-off-by: Laszlo Budai <lbudai@balabit.hu>
16---
17 modules/afsql/afsql.c | 301 ++++++++++++++++++++++++++++++++------------------
18 1 file changed, 192 insertions(+), 109 deletions(-)
19
20diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c
21index 12f6aab..a6a8190 100644
22--- a/modules/afsql/afsql.c
23+++ b/modules/afsql/afsql.c
24@@ -456,24 +456,21 @@ afsql_dd_create_index(AFSqlDestDriver *s
25 *
26 * NOTE: This function can only be called from the database thread.
27 **/
28-static GString *
29-afsql_dd_validate_table(AFSqlDestDriver *self, LogMessage *msg)
30+static gboolean
31+afsql_dd_validate_table(AFSqlDestDriver *self, GString *table)
32 {
33- GString *query_string, *table;
34+ GString *query_string;
35 dbi_result db_res;
36 gboolean success = FALSE;
37 gint i;
38
39- table = g_string_sized_new(32);
40- log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table);
41-
42 if (self->flags & AFSQL_DDF_DONT_CREATE_TABLES)
43- return table;
44+ return TRUE;
45
46 afsql_dd_check_sql_identifier(table->str, TRUE);
47
48 if (g_hash_table_lookup(self->validated_tables, table->str))
49- return table;
50+ return TRUE;
51
52 query_string = g_string_sized_new(32);
53 g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table->str);
54@@ -544,14 +541,9 @@ afsql_dd_validate_table(AFSqlDestDriver
55 /* we have successfully created/altered the destination table, record this information */
56 g_hash_table_insert(self->validated_tables, g_strdup(table->str), GUINT_TO_POINTER(TRUE));
57 }
58- else
59- {
60- g_string_free(table, TRUE);
61- table = NULL;
62- }
63 g_string_free(query_string, TRUE);
64
65- return table;
66+ return success;
67 }
68
69 /**
70@@ -581,6 +573,20 @@ afsql_dd_begin_txn(AFSqlDestDriver *self
71 }
72
73 /**
74+ * afsql_dd_handle_transaction_error:
75+ *
76+ * Handle errors inside during a SQL transaction (e.g. INSERT or COMMIT failures).
77+ *
78+ * NOTE: This function can only be called from the database thread.
79+ **/
80+static void
81+afsql_dd_handle_transaction_error(AFSqlDestDriver *self)
82+{
83+ log_queue_rewind_backlog(self->queue);
84+ self->flush_lines_queued = 0;
85+}
86+
87+/**
88 * afsql_dd_begin_txn:
89 *
90 * Commit SQL transaction.
91@@ -596,14 +602,14 @@ afsql_dd_commit_txn(AFSqlDestDriver *sel
92 if (success)
93 {
94 log_queue_ack_backlog(self->queue, self->flush_lines_queued);
95+ self->flush_lines_queued = 0;
96 }
97 else
98 {
99- msg_notice("SQL transaction commit failed, rewinding backlog and starting again",
100- NULL);
101- log_queue_rewind_backlog(self->queue);
102+ msg_error("SQL transaction commit failed, rewinding backlog and starting again",
103+ NULL);
104+ afsql_dd_handle_transaction_error(self);
105 }
106- self->flush_lines_queued = 0;
107 return success;
108 }
109
110@@ -644,12 +650,13 @@ afsql_dd_set_dbd_opt_numeric(gpointer ke
111 }
112
113 static gboolean
114-afsql_dd_connect(AFSqlDestDriver *self)
115+afsql_dd_ensure_initialized_connection(AFSqlDestDriver *self)
116 {
117 if (self->dbi_ctx)
118 return TRUE;
119
120 self->dbi_ctx = dbi_conn_new(self->type);
121+
122 if (!self->dbi_ctx)
123 {
124 msg_error("No such DBI driver",
125@@ -659,10 +666,12 @@ afsql_dd_connect(AFSqlDestDriver *self)
126 }
127
128 dbi_conn_set_option(self->dbi_ctx, "host", self->host);
129+
130 if (strcmp(self->type, "mysql"))
131 dbi_conn_set_option(self->dbi_ctx, "port", self->port);
132 else
133 dbi_conn_set_option_numeric(self->dbi_ctx, "port", atoi(self->port));
134+
135 dbi_conn_set_option(self->dbi_ctx, "username", self->user);
136 dbi_conn_set_option(self->dbi_ctx, "password", self->password);
137 dbi_conn_set_option(self->dbi_ctx, "dbname", self->database);
138@@ -691,6 +700,7 @@ afsql_dd_connect(AFSqlDestDriver *self)
139 evt_tag_str("database", self->database),
140 evt_tag_str("error", dbi_error),
141 NULL);
142+
143 return FALSE;
144 }
145
146@@ -713,104 +723,145 @@ afsql_dd_connect(AFSqlDestDriver *self)
147 return TRUE;
148 }
149
150-static gboolean
151-afsql_dd_insert_fail_handler(AFSqlDestDriver *self, LogMessage *msg,
152- LogPathOptions *path_options)
153+static GString *
154+afsql_dd_ensure_accessible_database_table(AFSqlDestDriver *self, LogMessage *msg)
155 {
156- if (self->failed_message_counter < self->num_retries - 1)
157- {
158- log_queue_push_head(self->queue, msg, path_options);
159-
160- /* database connection status sanity check after failed query */
161- if (dbi_conn_ping(self->dbi_ctx) != 1)
162- {
163- const gchar *dbi_error;
164-
165- dbi_conn_error(self->dbi_ctx, &dbi_error);
166- msg_error("Error, no SQL connection after failed query attempt",
167- evt_tag_str("type", self->type),
168- evt_tag_str("host", self->host),
169- evt_tag_str("port", self->port),
170- evt_tag_str("username", self->user),
171- evt_tag_str("database", self->database),
172- evt_tag_str("error", dbi_error),
173- NULL);
174- return FALSE;
175- }
176+ GString *table = g_string_sized_new(32);
177+ log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table);
178
179- self->failed_message_counter++;
180- return FALSE;
181+ if (!afsql_dd_validate_table(self, table))
182+ {
183+ /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */
184+ msg_error("Error checking table, disconnecting from database, trying again shortly",
185+ evt_tag_int("time_reopen", self->time_reopen),
186+ NULL);
187+ g_string_free(table, TRUE);
188+ return NULL;
189 }
190
191- msg_error("Multiple failures while inserting this record into the database, message dropped",
192- evt_tag_int("attempts", self->num_retries),
193- NULL);
194- stats_counter_inc(self->dropped_messages);
195- log_msg_drop(msg, path_options);
196- self->failed_message_counter = 0;
197- return TRUE;
198+ return table;
199 }
200
201 static GString *
202-afsql_dd_construct_query(AFSqlDestDriver *self, GString *table,
203- LogMessage *msg)
204+afsql_dd_build_insert_command(AFSqlDestDriver *self, LogMessage *msg, GString *table)
205 {
206- GString *value;
207- GString *query_string;
208- gint i;
209+ GString *insert_command = g_string_sized_new(256);
210+ GString *value = g_string_sized_new(512);
211+ gint i, j;
212
213- value = g_string_sized_new(256);
214- query_string = g_string_sized_new(512);
215+ g_string_printf(insert_command, "INSERT INTO %s (", table->str);
216
217- g_string_printf(query_string, "INSERT INTO %s (", table->str);
218 for (i = 0; i < self->fields_len; i++)
219 {
220- g_string_append(query_string, self->fields[i].name);
221- if (i != self->fields_len - 1)
222- g_string_append(query_string, ", ");
223+ if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL)
224+ {
225+ g_string_append(insert_command, self->fields[i].name);
226+
227+ j = i + 1;
228+ while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT)
229+ j++;
230+
231+ if (j < self->fields_len)
232+ g_string_append(insert_command, ", ");
233+ }
234 }
235- g_string_append(query_string, ") VALUES (");
236+
237+ g_string_append(insert_command, ") VALUES (");
238
239 for (i = 0; i < self->fields_len; i++)
240 {
241 gchar *quoted;
242
243- if (self->fields[i].value == NULL)
244- {
245- /* the config used the 'default' value for this column -> the fields[i].value is NULL, use SQL default */
246- g_string_append(query_string, "DEFAULT");
247- }
248- else
249+ if ((self->fields[i].flags & AFSQL_FF_DEFAULT) == 0 && self->fields[i].value != NULL)
250 {
251 log_template_format(self->fields[i].value, msg, &self->template_options, LTZ_SEND, self->seq_num, NULL, value);
252-
253 if (self->null_value && strcmp(self->null_value, value->str) == 0)
254 {
255- g_string_append(query_string, "NULL");
256+ g_string_append(insert_command, "NULL");
257 }
258 else
259 {
260 dbi_conn_quote_string_copy(self->dbi_ctx, value->str, &quoted);
261 if (quoted)
262 {
263- g_string_append(query_string, quoted);
264+ g_string_append(insert_command, quoted);
265 free(quoted);
266 }
267 else
268 {
269- g_string_append(query_string, "''");
270+ g_string_append(insert_command, "''");
271 }
272 }
273- }
274
275- if (i != self->fields_len - 1)
276- g_string_append(query_string, ", ");
277+ j = i + 1;
278+ while (j < self->fields_len && (self->fields[j].flags & AFSQL_FF_DEFAULT) == AFSQL_FF_DEFAULT)
279+ j++;
280+ if (j < self->fields_len)
281+ g_string_append(insert_command, ", ");
282+ }
283 }
284- g_string_append(query_string, ")");
285+
286+ g_string_append(insert_command, ")");
287
288 g_string_free(value, TRUE);
289
290- return query_string;
291+ return insert_command;
292+}
293+
294+static inline gboolean
295+afsql_dd_is_transaction_handling_enabled(const AFSqlDestDriver *self)
296+{
297+ return self->flush_lines_queued != -1;
298+}
299+
300+static inline gboolean
301+afsql_dd_should_start_new_transaction(const AFSqlDestDriver *self)
302+{
303+ return self->flush_lines_queued == 0;
304+}
305+
306+static inline gboolean
307+afsql_dd_should_commit_transaction(const AFSqlDestDriver *self)
308+{
309+ return afsql_dd_is_transaction_handling_enabled(self) && self->flush_lines_queued == self->flush_lines;
310+}
311+
312+static inline gboolean
313+afsql_dd_handle_insert_row_error_depending_on_connection_availability(AFSqlDestDriver *self,
314+ LogMessage *msg,
315+ LogPathOptions *path_options)
316+{
317+ const gchar *dbi_error, *error_message;
318+
319+ if (dbi_conn_ping(self->dbi_ctx) == 1)
320+ {
321+ log_queue_push_head(self->queue, msg, path_options);
322+ return TRUE;
323+ }
324+
325+ if (afsql_dd_is_transaction_handling_enabled(self))
326+ {
327+ error_message = "SQL connection lost in the middle of a transaction,"
328+ " rewinding backlog and starting again";
329+ afsql_dd_handle_transaction_error(self);
330+ }
331+ else
332+ {
333+ error_message = "Error, no SQL connection after failed query attempt";
334+ log_queue_push_head(self->queue, msg, path_options);
335+ }
336+
337+ dbi_conn_error(self->dbi_ctx, &dbi_error);
338+ msg_error(error_message,
339+ evt_tag_str("type", self->type),
340+ evt_tag_str("host", self->host),
341+ evt_tag_str("port", self->port),
342+ evt_tag_str("username", self->user),
343+ evt_tag_str("database", self->database),
344+ evt_tag_str("error", dbi_error),
345+ NULL);
346+
347+ return FALSE;
348 }
349
350 /**
351@@ -824,61 +875,93 @@ afsql_dd_construct_query(AFSqlDestDriver
352 static gboolean
353 afsql_dd_insert_db(AFSqlDestDriver *self)
354 {
355- GString *table, *query_string;
356+ GString *table = NULL;
357+ GString *insert_command = NULL;
358 LogMessage *msg;
359 gboolean success;
360 LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
361
362- afsql_dd_connect(self);
363+ if (!afsql_dd_ensure_initialized_connection(self))
364+ return FALSE;
365
366- success = log_queue_pop_head(self->queue, &msg, &path_options, (self->flags & AFSQL_DDF_EXPLICIT_COMMITS), FALSE);
367+ /* connection established, try to insert a message */
368+ success = log_queue_pop_head(self->queue, &msg, &path_options, FALSE, self->flags & AFSQL_DDF_EXPLICIT_COMMITS);
369 if (!success)
370 return TRUE;
371
372 msg_set_context(msg);
373
374- table = afsql_dd_validate_table(self, msg);
375+ table = afsql_dd_ensure_accessible_database_table(self, msg);
376+
377 if (!table)
378 {
379- /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */
380- msg_error("Error checking table, disconnecting from database, trying again shortly",
381- evt_tag_int("time_reopen", self->time_reopen),
382- NULL);
383- msg_set_context(NULL);
384- g_string_free(table, TRUE);
385- return afsql_dd_insert_fail_handler(self, msg, &path_options);
386+ success = FALSE;
387+ goto out;
388 }
389
390- query_string = afsql_dd_construct_query(self, table, msg);
391+ if (afsql_dd_should_start_new_transaction(self) && !afsql_dd_begin_txn(self))
392+ {
393+ success = FALSE;
394+ goto out;
395+ }
396
397- if (self->flush_lines_queued == 0 && !afsql_dd_begin_txn(self))
398- return FALSE;
399+ insert_command = afsql_dd_build_insert_command(self, msg, table);
400+ success = afsql_dd_run_query(self, insert_command->str, FALSE, NULL);
401
402- success = afsql_dd_run_query(self, query_string->str, FALSE, NULL);
403 if (success && self->flush_lines_queued != -1)
404 {
405 self->flush_lines_queued++;
406
407- if (self->flush_lines && self->flush_lines_queued == self->flush_lines && !afsql_dd_commit_txn(self))
408- return FALSE;
409+ if (afsql_dd_should_commit_transaction(self) && !afsql_dd_commit_txn(self))
410+ {
411+ /* Assuming that in case of error, the queue is rewound by afsql_dd_commit_txn() */
412+
413+ g_string_free(insert_command, TRUE);
414+ msg_set_context(NULL);
415+
416+ return FALSE;
417+ }
418 }
419
420- g_string_free(table, TRUE);
421- g_string_free(query_string, TRUE);
422+ out:
423+
424+ if (table != NULL)
425+ g_string_free(table, TRUE);
426+
427+ if (insert_command != NULL)
428+ g_string_free(insert_command, TRUE);
429
430 msg_set_context(NULL);
431
432- if (!success)
433- return afsql_dd_insert_fail_handler(self, msg, &path_options);
434+ if (success)
435+ {
436+ log_msg_ack(msg, &path_options);
437+ log_msg_unref(msg);
438+ step_sequence_number(&self->seq_num);
439+ self->failed_message_counter = 0;
440+ }
441+ else
442+ {
443+ if (self->failed_message_counter < self->num_retries - 1)
444+ {
445+ if (!afsql_dd_handle_insert_row_error_depending_on_connection_availability(self, msg, &path_options))
446+ return FALSE;
447
448- /* we only ACK if each INSERT is a separate transaction */
449- if ((self->flags & AFSQL_DDF_EXPLICIT_COMMITS) == 0)
450- log_msg_ack(msg, &path_options);
451- log_msg_unref(msg);
452- step_sequence_number(&self->seq_num);
453- self->failed_message_counter = 0;
454+ self->failed_message_counter++;
455+ }
456+ else
457+ {
458+ msg_error("Multiple failures while inserting this record into the database, message dropped",
459+ evt_tag_int("attempts", self->num_retries),
460+ NULL);
461+ stats_counter_inc(self->dropped_messages);
462+ log_msg_drop(msg, &path_options);
463+ self->failed_message_counter = 0;
464+ success = TRUE;
465+ }
466+ }
467
468- return TRUE;
469+ return success;
470 }
471
472 static void
473@@ -895,7 +978,7 @@ afsql_dd_message_became_available_in_the
474 static void
475 afsql_dd_wait_for_suspension_wakeup(AFSqlDestDriver *self)
476 {
477- /* we got suspended, probably because of a connection error,
478+ /* we got suspended, probably because of a connection error,
479 * during this time we only get wakeups if we need to be
480 * terminated. */
481 if (!self->db_thread_terminate)
482@@ -974,8 +1057,7 @@ afsql_dd_database_thread(gpointer arg)
483
484 afsql_dd_commit_txn(self);
485 }
486-
487- exit:
488+exit:
489 afsql_dd_disconnect(self);
490
491 msg_verbose("Database thread finished",
492--
4931.8.4.1
494