Postgresql利用触发器对表进行审计

01 12月
作者:PK|分类:工作|标签:Postgresql 触发器 审计

    业务特殊需要,需要多人对一个数据库中的某个表进行增删改,那么问题来了,数据出了问题,怎么定位到是谁动了数据呢?利用Posgresql自带的触发器可以轻松实现,让月光宝盒再次回到历史的那一刻,直接上代码(代码来自老外的一个博客)。

--添加需要的扩展
CREATE EXTENSION IF NOT EXISTS hstore;
--建表,各字段的都有注释,连我这种英语水平都可以轻松看懂
CREATE TABLE logged_actions (
event_id bigserial primary key,
schema_name text not null,
table_name text not null,
relid oid not null,
session_user_name text,
action_tstamp_tx TIMESTAMP WITH TIME ZONE NOT NULL,
action_tstamp_stm TIMESTAMP WITH TIME ZONE NOT NULL,
action_tstamp_clk TIMESTAMP WITH TIME ZONE NOT NULL,
transaction_id bigint,
application_name text,
client_addr inet,
client_port integer,
client_query text,
action TEXT NOT NULL CHECK (action IN ('I','D','U', 'T')),
row_data hstore,
changed_fields hstore,
statement_only boolean not null
);
REVOKE ALL ON logged_actions FROM public;

COMMENT ON TABLE logged_actions IS 'History of auditable actions on audited tables, from if_modified_func()';
COMMENT ON COLUMN logged_actions.event_id IS 'Unique identifier for each auditable event';
COMMENT ON COLUMN logged_actions.schema_name IS 'Database schema audited table for this event is in';
COMMENT ON COLUMN logged_actions.table_name IS 'Non-schema-qualified table name of table event occured in';
COMMENT ON COLUMN logged_actions.relid IS 'Table OID. Changes with drop/create. Get with ''tablename''::regclass';
COMMENT ON COLUMN logged_actions.session_user_name IS 'Login / session user whose statement caused the audited event';
COMMENT ON COLUMN logged_actions.action_tstamp_tx IS 'Transaction start timestamp for tx in which audited event occurred';
COMMENT ON COLUMN logged_actions.action_tstamp_stm IS 'Statement start timestamp for tx in which audited event occurred';
COMMENT ON COLUMN logged_actions.action_tstamp_clk IS 'Wall clock time at which audited event''s trigger call occurred';
COMMENT ON COLUMN logged_actions.transaction_id IS 'Identifier of transaction that made the change. May wrap, but unique paired with action_tstamp_tx.';
COMMENT ON COLUMN logged_actions.client_addr IS 'IP address of client that issued query. Null for unix domain socket.';
COMMENT ON COLUMN logged_actions.client_port IS 'Remote peer IP port address of client that issued query. Undefined for unix socket.';
COMMENT ON COLUMN logged_actions.client_query IS 'Top-level query that caused this auditable event. May be more than one statement.';
COMMENT ON COLUMN logged_actions.application_name IS 'Application name set when this audit event occurred. Can be changed in-session by client.';
COMMENT ON COLUMN logged_actions.action IS 'Action type; I = insert, D = delete, U = update, T = truncate';
COMMENT ON COLUMN logged_actions.row_data IS 'Record value. Null for statement-level trigger. For INSERT this is the new tuple. For DELETE and UPDATE it is the old tuple.';
COMMENT ON COLUMN logged_actions.changed_fields IS 'New values of fields changed by UPDATE. Null except for row-level UPDATE events.';
COMMENT ON COLUMN logged_actions.statement_only IS '''t'' if audit event is from an FOR EACH STATEMENT trigger, ''f'' for FOR EACH ROW';

CREATE INDEX logged_actions_relid_idx ON logged_actions(relid);
CREATE INDEX logged_actions_action_tstamp_tx_stm_idx ON logged_actions(action_tstamp_stm);
CREATE INDEX logged_actions_action_idx ON logged_actions(action);

--建立触发器函数
CREATE OR REPLACE FUNCTION if_modified_func() RETURNS TRIGGER AS $body$
DECLARE
audit_row logged_actions;
include_values boolean;
log_diffs boolean;
h_old hstore;
h_new hstore;
excluded_cols text[] = ARRAY[]::text[];
BEGIN
IF TG_WHEN <> 'AFTER' THEN
RAISE EXCEPTION 'if_modified_func() may only run as an AFTER trigger';
END IF;
audit_row = ROW(
nextval('logged_actions_event_id_seq'), -- event_id
TG_TABLE_SCHEMA::text,                        -- schema_name
TG_TABLE_NAME::text,                          -- table_name
TG_RELID,                                     -- relation OID for much quicker searches
session_user::text,                           -- session_user_name
current_timestamp,                            -- action_tstamp_tx
statement_timestamp(),                        -- action_tstamp_stm
clock_timestamp(),                            -- action_tstamp_clk
txid_current(),                               -- transaction ID
current_setting('application_name'),          -- client application
inet_client_addr(),                           -- client_addr
inet_client_port(),                           -- client_port
current_query(),                              -- top-level query or queries (if multistatement) from client
substring(TG_OP,1,1),                         -- action
NULL, NULL,                                   -- row_data, changed_fields
'f'                                           -- statement_only
);
IF NOT TG_ARGV[0]::boolean IS DISTINCT FROM 'f'::boolean THEN
audit_row.client_query = NULL;
END IF;
IF TG_ARGV[1] IS NOT NULL THEN
excluded_cols = TG_ARGV[1]::text[];
END IF;
IF (TG_OP = 'UPDATE' AND TG_LEVEL = 'ROW') THEN
audit_row.row_data = hstore(OLD.*);
audit_row.changed_fields =  (hstore(NEW.*) - audit_row.row_data) - excluded_cols;
IF audit_row.changed_fields = hstore('') THEN
-- All changed fields are ignored. Skip this update.
RETURN NULL;
END IF;
ELSIF (TG_OP = 'DELETE' AND TG_LEVEL = 'ROW') THEN
audit_row.row_data = hstore(OLD.*) - excluded_cols;
ELSIF (TG_OP = 'INSERT' AND TG_LEVEL = 'ROW') THEN
audit_row.row_data = hstore(NEW.*) - excluded_cols;
ELSIF (TG_LEVEL = 'STATEMENT' AND TG_OP IN ('INSERT','UPDATE','DELETE','TRUNCATE')) THEN
audit_row.statement_only = 't';
ELSE
RAISE EXCEPTION '[if_modified_func] - Trigger func added as trigger for unhandled case: %, %',TG_OP, TG_LEVEL;
RETURN NULL;
END IF;
INSERT INTO logged_actions VALUES (audit_row.*);
RETURN NULL;
END;
$body$
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog, public;

CREATE OR REPLACE FUNCTION audit_table(target_table regclass, audit_rows boolean, audit_query_text boolean, ignored_cols text[]) RETURNS void AS $body$
DECLARE
stm_targets text = 'INSERT OR UPDATE OR DELETE OR TRUNCATE';
_q_txt text;
_ignored_cols_snip text = '';
BEGIN
EXECUTE 'DROP TRIGGER IF EXISTS audit_trigger_row ON ' || target_table;
EXECUTE 'DROP TRIGGER IF EXISTS audit_trigger_stm ON ' || target_table;
IF audit_rows THEN
IF array_length(ignored_cols,1) > 0 THEN
_ignored_cols_snip = ', ' || quote_literal(ignored_cols);
END IF;
_q_txt = 'CREATE TRIGGER audit_trigger_row AFTER INSERT OR UPDATE OR DELETE ON ' ||
target_table ||
' FOR EACH ROW EXECUTE PROCEDURE if_modified_func(' ||
quote_literal(audit_query_text) || _ignored_cols_snip || ');';
RAISE NOTICE '%',_q_txt;
EXECUTE _q_txt;
stm_targets = 'TRUNCATE';
ELSE
END IF;
_q_txt = 'CREATE TRIGGER audit_trigger_stm AFTER ' || stm_targets || ' ON ' ||
target_table ||
' FOR EACH STATEMENT EXECUTE PROCEDURE if_modified_func('||
quote_literal(audit_query_text) || ');';
RAISE NOTICE '%',_q_txt;
EXECUTE _q_txt;
END;
$body$
language 'plpgsql';

--上面的触发器创建完之后无法直接使用,需要再建一个,调整参数传入。
CREATE OR REPLACE FUNCTION audit_table(target_table regclass, audit_rows boolean, audit_query_text boolean) RETURNS void AS $body$
SELECT audit_table($1, $2, $3, ARRAY[]::text[]);
$body$ LANGUAGE SQL;
-- And provide a convenience call wrapper for the simplest case
-- of row-level logging with no excluded cols and query logging enabled.
--
CREATE OR REPLACE FUNCTION audit_table(target_table regclass) RETURNS void AS $$
SELECT audit_table($1, BOOLEAN 't', BOOLEAN 't');
$$ LANGUAGE 'sql';

--使用方法
select audit_table('xxxx');

--xxxx就是需要开启审计的表名,这样只要对xxxx表进行了增删改,同库下的logged_actions表里会留下记录,妈妈再也不用担心我背锅了。


浏览1332 评论0
返回
目录
返回
首页
PostgreSQL触发器使用(实现建表时表名必须统一) 《琅琊榜》读书笔记