The problem
Every database change is audited, it's a smart idea for the work that we do. Very quickly the amount of audit data becomes enormous, it cripples backups, uses a boat-load of disk space and makes for some cry-worthy storage costs. Not to mention the anxiety caused by jumbo backup processes, freezing servers and early morning monitoring alerts.
Chart showing increasing AWS costs.
Hmmmm, what to do?
So, I brew another coffee that's blacker than night in the antarctic and wonder how do I turn this horrible unusable data into something useful? I know a little about Elasticsearch and how it can be useful for storing and analyzing large amounts of data.
OK I think I've got an idea!
- The audit is powered by database triggers in PostgreSQL.
- Postgres also has the ability to allow asynchronous triggers using NOTIFY and LISTEN.
- JavaScript is a good language for dealing with asynchronous tasks.
- Elasticsearch is good at analyzing data.
Right, looks like we have a combination of technology that will work to solve this problem. Now I just need to figure out how.
Several hours (days) later... postgres-to-elasticsearch was born.
The tool supports 1-way push from PostgreSQL to Elasticsearch and real-time data streaming. The way it works is in essence very simple.
What does it do?
It performs 1 way data pushing between PostgreSQL and Elasticsearch and can optionally remove data from PostgreSQL after it's safely stored within Elasticsearch.
A nice bonus is that it will automatically look for the Hstore data type and parses it into JSON so that you can query the actual row data in Elasticsearch.
When it starts
- It checks Elasticsearch for the last UID that it archived
- If it finds a UID it will load all of the data from Postgres that has been added since the service was last run and push it to Elasticsearch. If it can't find the last UID it will push the whole lot.
- It runs a LISTEN statement on the Postgres and waits for data to be pushed from the database.
When it's running
- It waits patiently for data to be sent to it via the NOTIFY postgres statement. Note that there is 2 NOTIFYs that can run (which can be defined by
PG_LISTEN_TO
andPG_LISTEN_TO_ID
). This is due to the 8000 character hard limit on the messages sent through NOTIFY. - When it receives data it will cache it locally for a defined period or up to a defined cache size limit.
- It pushes the data to Elasticsearch
How this solves the problem
An option built into postgres-to-elasticsearch is to delete data from the table as it pushes it to Elasticsearch. This means that while it's running the database table is almost always empty or with very few items in it that are yet to be pushed.
This means that our database backups aren't crippled by huge amounts of write-only data and when we actually need to review the audit data we aren't plagued by huge query times thanks to the speed of Elasticsearch.
Lovely data
Observe your lovely real-time data in Kibana
What does the output actually look like?
On first run
tom@sweet:~/code/postgres-to-elasticsearch/src$ INFO=1 PG_SCHEMA=audit PG_TABLE=logged_actions PG_DELETE_ON_INDEX=1 INFO=1 ES_INDEX=audit ES_TYPE=record INDEX_QUEUE_TIMEOUT=60 PG_USERNAME=tom PG_PASSWORD=tom PG_DATABASE=my_database ES_PROTO=http STATUS_UPDATE_INTERVAL=2 INDEX_QUEUE_LIMIT=20 node index.js
INFO Starting
INFO Index audit does not exist, creating.
INFO Index audit created successfully
INFO Processing historic audit
INFO Found Hstore type id, Hstore processing enabled.
INFO LISTEN statement completed
INFO No historic audit found, cannot get last processed event_id
INFO Loading all available audit data for backlog processing
INFO Historic audit query completed, processing...
INFO No more historic rows to process, processed a total of 56 rows
INFO Flushed 20 records
INFO Flushed 20 records
INFO Attempted to delete 20 rows, actually deleted 20 rows
INFO Attempted to delete 20 rows, actually deleted 20 rows
INFO Flushed 20 records
INFO Attempted to delete 20 rows, actually deleted 20 rows
INFO Flushed 8 records
INFO Attempted to delete 8 rows, actually deleted 8 rows
STATUS UPDATE: Created a total of 68 indexes with 12 queued at 1541617222
INFO Flushed 12 records
INFO Attempted to delete 12 rows, actually deleted 12 rows
So what's happened here is:
- It connects to Elasticsearch using the
ES_*
env variables. - The Elasticsearch index doesn't exist, so it creates it.
- It tries to find the last processed primary key of the
audit.logged_actions
table (as defined byPG_SCHEMA
andPG_TABLE
). - It can't find it because we just created the index so it's empty.
- It fetches all of the data from the
audit.logged_actiond
table and processes it in chunks of 20 (defined byINDEX_QUEUE_LIMIT
) - It deletes the rows as it processes them from the database
- A status update is shown after 2 minutes (
STATUS_UPDATE_INTERVAL
) - It continues to listen to the
NOTIFY
statements run by the trigger function in Postgres until the number of rows in the cache hitsINDEX_QUEUE_LIMIT
or it has beenINDEX_QUEUE_TIMEOUT
seconds since the first item in the cache was added.
On second run
INFO Starting
INFO Processing historic audit
INFO Found Hstore type id, Hstore processing enabled.
INFO LISTEN statement completed
INFO Found last processed event_id: 55748
INFO Historic audit query completed, processing...
INFO No more historic rows to process, processed a total of 5 rows
INFO Deleted a total of 0 rows
INFO Flushed 5 records
INFO Attempted to delete 5 rows, actually deleted 5 rows
- It's found the last processed id (the last record inserted into Elasticsearch before it was stopped)
- It deletes any audit with an id that is lower than that last processed id (to remove any audit that wasn't deleted before)
- It finds 5 new rows since it last ran and adds them to the cache.
- It continues as before.
The PostgreSQL trigger
If you're considering implementing this then your own trigger is a good idea, this is what ours looks like:
CREATE OR REPLACE FUNCTION audit.if_modified_func()
RETURNS trigger AS
$BODY$
DECLARE
audit_row audit.logged_actions;
include_values BOOLEAN;
log_diffs BOOLEAN;
h_old hstore;
h_new hstore;
excluded_cols text[] = ARRAY[]::text[];
BEGIN
IF TG_WHEN <> 'AFTER' THEN
RAISE EXCEPTION 'audit.if_modified_func() may only run as an AFTER trigger';
END IF;
audit_row = ROW(
NEXTVAL('audit.logged_actions_event_id_seq'), -- event_id
TG_TABLE_SCHEMA::text, -- schema_name
TG_TABLE_NAME::text, -- table_name
NULL, -- table_id
TG_RELID, -- relation OID for much quicker searches
session_user::text, -- session_user_name
statement_timestamp(), -- action_tstamp_stm
inet_client_addr(), -- client_addr
current_query(), -- top-level query or queries (if multistatement) from client
SUBSTRING(TG_OP,1,1), -- action
NULL, NULL, -- row_data, changed_fields
'f' -- statement_only
);
IF NOT TG_ARGV[0]::BOOLEAN IS DISTINCT FROM 'f'::BOOLEAN THEN
audit_row.client_query = NULL;
END IF;
IF TG_ARGV[1] IS NOT NULL THEN
excluded_cols = TG_ARGV[1]::text[];
END IF;
IF (TG_OP = 'UPDATE' AND TG_LEVEL = 'ROW') THEN
IF TG_TABLE_NAME != 'migrations' AND TG_TABLE_NAME != 'sessions' THEN
audit_row.table_id = OLD.id;
END IF;
audit_row.row_data = hstore(OLD.*);
audit_row.changed_fields = (hstore(NEW.*) - audit_row.row_data) - excluded_cols;
IF audit_row.changed_fields = hstore('') THEN
-- All changed fields are ignored. Skip this update.
RETURN NULL;
END IF;
ELSIF (TG_OP = 'DELETE' AND TG_LEVEL = 'ROW') THEN
IF TG_TABLE_NAME != 'migrations' AND TG_TABLE_NAME != 'sessions' THEN
audit_row.table_id = OLD.id;
END IF;
audit_row.row_data = hstore(OLD.*) - excluded_cols;
ELSIF (TG_OP = 'INSERT' AND TG_LEVEL = 'ROW') THEN
IF TG_TABLE_NAME != 'migrations' AND TG_TABLE_NAME != 'sessions' THEN
audit_row.table_id = NEW.id;
END IF;
audit_row.row_data = hstore(NEW.*) - excluded_cols;
ELSIF (TG_LEVEL = 'STATEMENT' AND TG_OP IN ('INSERT','UPDATE','DELETE','TRUNCATE')) THEN
audit_row.statement_only = 't';
ELSE
RAISE EXCEPTION '[audit.if_modified_func] - Trigger func added as trigger for unhandled case: %, %',TG_OP, TG_LEVEL;
RETURN NULL;
END IF;
INSERT INTO audit.logged_actions VALUES (audit_row.*);
IF (length(CAST(row_to_json(audit_row) as text)) > 8000) THEN
PERFORM pg_notify(CAST('audit_id' AS text), audit_row.event_id::text);
ELSE
PERFORM pg_notify(CAST('audit' AS text), CAST(row_to_json(audit_row) as text));
END IF;
RETURN NULL;
END;
$BODY$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER
COST 100;
I hope this article helps someone. Thanks for reading.
About the author
Tom is a Co-founder and CTO of OnSecurity. He has over a decade of web development experience and many years in IT Security. Feel free to connect with Tom on LinkedIn.