We solved Our Problem With Audit Postgres and Elasticsearch

Solving Our Problem With Audit Postgres and Elasticsearch

Discover the challenges of managing extensive database audits and our innovative solution using Elasticsearch, a custom Node tool, and PostgreSQL triggers.

Tom Lindley
Tom Lindley
Chief Technology Officer
November 13, 2018

The problem

Every database change is audited, it's a smart idea for the work that we do. Very quickly the amount of audit data becomes enormous, it cripples backups, uses a boat-load of disk space and makes for some cry-worthy storage costs. Not to mention the anxiety caused by jumbo backup processes, freezing servers and early morning monitoring alerts.

Chart showing increasing AWS costs. Costs

Hmmmm, what to do?

So, I brew another coffee that's blacker than night in the antarctic and wonder how do I turn this horrible unusable data into something useful? I know a little about Elasticsearch and how it can be useful for storing and analyzing large amounts of data.

OK I think I've got an idea!

  • The audit is powered by database triggers in PostgreSQL.
  • Postgres also has the ability to allow asynchronous triggers using NOTIFY and LISTEN.
  • JavaScript is a good language for dealing with asynchronous tasks.
  • Elasticsearch is good at analyzing data.

Right, looks like we have a combination of technology that will work to solve this problem. Now I just need to figure out how.

Idea

Several hours (days) later... postgres-to-elasticsearch was born.

The tool supports 1-way push from PostgreSQL to Elasticsearch and real-time data streaming. The way it works is in essence very simple.

What does it do?

It performs 1 way data pushing between PostgreSQL and Elasticsearch and can optionally remove data from PostgreSQL after it's safely stored within Elasticsearch.

A nice bonus is that it will automatically look for the Hstore data type and parses it into JSON so that you can query the actual row data in Elasticsearch.

When it starts

  • It checks Elasticsearch for the last UID that it archived
  • If it finds a UID it will load all of the data from Postgres that has been added since the service was last run and push it to Elasticsearch. If it can't find the last UID it will push the whole lot.
  • It runs a LISTEN statement on the Postgres and waits for data to be pushed from the database.

When it's running

  • It waits patiently for data to be sent to it via the NOTIFY postgres statement. Note that there is 2 NOTIFYs that can run (which can be defined by PG_LISTEN_TO and PG_LISTEN_TO_ID). This is due to the 8000 character hard limit on the messages sent through NOTIFY.
  • When it receives data it will cache it locally for a defined period or up to a defined cache size limit.
  • It pushes the data to Elasticsearch

How this solves the problem

An option built into postgres-to-elasticsearch is to delete data from the table as it pushes it to Elasticsearch. This means that while it's running the database table is almost always empty or with very few items in it that are yet to be pushed.

This means that our database backups aren't crippled by huge amounts of write-only data and when we actually need to review the audit data we aren't plagued by huge query times thanks to the speed of Elasticsearch.

Lovely data

Observe your lovely real-time data in Kibana

Kibana

What does the output actually look like?

Behind the scenes

On first run

tom@sweet:~/code/postgres-to-elasticsearch/src$ INFO=1 PG_SCHEMA=audit PG_TABLE=logged_actions PG_DELETE_ON_INDEX=1 INFO=1 ES_INDEX=audit ES_TYPE=record INDEX_QUEUE_TIMEOUT=60 PG_USERNAME=tom PG_PASSWORD=tom PG_DATABASE=my_database ES_PROTO=http STATUS_UPDATE_INTERVAL=2 INDEX_QUEUE_LIMIT=20 node index.js 
INFO Starting
INFO Index audit does not exist, creating.
INFO Index audit created successfully
INFO Processing historic audit
INFO Found Hstore type id, Hstore processing enabled.
INFO LISTEN statement completed
INFO No historic audit found, cannot get last processed event_id
INFO Loading all available audit data for backlog processing
INFO Historic audit query completed, processing...
INFO No more historic rows to process, processed a total of 56 rows
INFO Flushed 20 records
INFO Flushed 20 records
INFO Attempted to delete 20 rows, actually deleted 20 rows
INFO Attempted to delete 20 rows, actually deleted 20 rows
INFO Flushed 20 records
INFO Attempted to delete 20 rows, actually deleted 20 rows
INFO Flushed 8 records
INFO Attempted to delete 8 rows, actually deleted 8 rows
STATUS UPDATE: Created a total of 68 indexes with 12 queued at 1541617222
INFO Flushed 12 records
INFO Attempted to delete 12 rows, actually deleted 12 rows

So what's happened here is:

  • It connects to Elasticsearch using the ES_* env variables.
  • The Elasticsearch index doesn't exist, so it creates it.
  • It tries to find the last processed primary key of the audit.logged_actions table (as defined by PG_SCHEMA and PG_TABLE).
  • It can't find it because we just created the index so it's empty.
  • It fetches all of the data from the audit.logged_actiond table and processes it in chunks of 20 (defined by INDEX_QUEUE_LIMIT)
  • It deletes the rows as it processes them from the database
  • A status update is shown after 2 minutes (STATUS_UPDATE_INTERVAL)
  • It continues to listen to the NOTIFY statements run by the trigger function in Postgres until the number of rows in the cache hits INDEX_QUEUE_LIMIT or it has been INDEX_QUEUE_TIMEOUT seconds since the first item in the cache was added.

On second run

INFO Starting
INFO Processing historic audit
INFO Found Hstore type id, Hstore processing enabled.
INFO LISTEN statement completed
INFO Found last processed event_id: 55748
INFO Historic audit query completed, processing...
INFO No more historic rows to process, processed a total of 5 rows
INFO Deleted a total of 0 rows
INFO Flushed 5 records
INFO Attempted to delete 5 rows, actually deleted 5 rows
  • It's found the last processed id (the last record inserted into Elasticsearch before it was stopped)
  • It deletes any audit with an id that is lower than that last processed id (to remove any audit that wasn't deleted before)
  • It finds 5 new rows since it last ran and adds them to the cache.
  • It continues as before.

The PostgreSQL trigger

If you're considering implementing this then your own trigger is a good idea, this is what ours looks like:

CREATE OR REPLACE FUNCTION audit.if_modified_func()
  RETURNS trigger AS
$BODY$
DECLARE
    audit_row audit.logged_actions;
    include_values BOOLEAN;
    log_diffs BOOLEAN;
    h_old hstore;
    h_new hstore;
    excluded_cols text[] = ARRAY[]::text[];
BEGIN
    IF TG_WHEN <> 'AFTER' THEN
        RAISE EXCEPTION 'audit.if_modified_func() may only run as an AFTER trigger';
    END IF;
 
    audit_row = ROW(
        NEXTVAL('audit.logged_actions_event_id_seq'), -- event_id
        TG_TABLE_SCHEMA::text,                        -- schema_name
        TG_TABLE_NAME::text,                          -- table_name
        NULL,                                         -- table_id
        TG_RELID,                                     -- relation OID for much quicker searches
        session_user::text,                           -- session_user_name
        statement_timestamp(),                        -- action_tstamp_stm
        inet_client_addr(),                           -- client_addr
        current_query(),                              -- top-level query or queries (if multistatement) from client
        SUBSTRING(TG_OP,1,1),                         -- action
        NULL, NULL,                                   -- row_data, changed_fields
        'f'                                           -- statement_only
        );
 
    IF NOT TG_ARGV[0]::BOOLEAN IS DISTINCT FROM 'f'::BOOLEAN THEN
        audit_row.client_query = NULL;
    END IF;
 
    IF TG_ARGV[1] IS NOT NULL THEN
        excluded_cols = TG_ARGV[1]::text[];
    END IF;
 
    IF (TG_OP = 'UPDATE' AND TG_LEVEL = 'ROW') THEN
        IF TG_TABLE_NAME != 'migrations' AND TG_TABLE_NAME != 'sessions' THEN
            audit_row.table_id = OLD.id;
        END IF;
        audit_row.row_data = hstore(OLD.*);
        audit_row.changed_fields =  (hstore(NEW.*) - audit_row.row_data) - excluded_cols;
        IF audit_row.changed_fields = hstore('') THEN
            -- All changed fields are ignored. Skip this update.
            RETURN NULL;
        END IF;
    ELSIF (TG_OP = 'DELETE' AND TG_LEVEL = 'ROW') THEN
        IF TG_TABLE_NAME != 'migrations' AND TG_TABLE_NAME != 'sessions' THEN
            audit_row.table_id = OLD.id;
        END IF;
        audit_row.row_data = hstore(OLD.*) - excluded_cols;
    ELSIF (TG_OP = 'INSERT' AND TG_LEVEL = 'ROW') THEN
        IF TG_TABLE_NAME != 'migrations' AND TG_TABLE_NAME != 'sessions' THEN
            audit_row.table_id = NEW.id;
        END IF;
        audit_row.row_data = hstore(NEW.*) - excluded_cols;
    ELSIF (TG_LEVEL = 'STATEMENT' AND TG_OP IN ('INSERT','UPDATE','DELETE','TRUNCATE')) THEN
        audit_row.statement_only = 't';
    ELSE
        RAISE EXCEPTION '[audit.if_modified_func] - Trigger func added as trigger for unhandled case: %, %',TG_OP, TG_LEVEL;
        RETURN NULL;
    END IF;
    INSERT INTO audit.logged_actions VALUES (audit_row.*);

    IF (length(CAST(row_to_json(audit_row) as text)) > 8000) THEN
	PERFORM pg_notify(CAST('audit_id' AS text), audit_row.event_id::text);
    ELSE 
	PERFORM pg_notify(CAST('audit' AS text), CAST(row_to_json(audit_row) as text));
    END IF;
    
    RETURN NULL;
END;
$BODY$
  LANGUAGE plpgsql VOLATILE SECURITY DEFINER
  COST 100;

I hope this article helps someone. Thanks for reading.


About the author

Tom is a Co-founder and CTO of OnSecurity. He has over a decade of web development experience and many years in IT Security. Feel free to connect with Tom on LinkedIn.

More recommended articles

© 2025 ONSECURITY TECHNOLOGY LIMITED (company registered in England and Wales. Registered number: 14184026 Registered office: Runway East, 101 Victoria Street, Bristol, England, BS1 6PU). All rights reserved.