首页 Postgresql利用触发器对表进行审计
文章
取消

Postgresql利用触发器对表进行审计

Postgresql利用触发器对表进行审计

业务特殊需要,需要多人对一个数据库中的某个表进行增删改,那么问题来了,数据出了问题,怎么定位到是谁动了数据呢?利用Posgresql自带的触发器可以轻松实现,让月光宝盒再次回到历史的那一刻,直接上代码(代码来自老外的一个博客)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
--添加需要的扩展
CREATE EXTENSION IF NOT EXISTS hstore;
--建表,各字段的都有注释,连我这种英语水平都可以轻松看懂
CREATE TABLE logged_actions (
event_id bigserial primary key,
schema_name text not null,
table_name text not null,
relid oid not null,
session_user_name text,
action_tstamp_tx TIMESTAMP WITH TIME ZONE NOT NULL,
action_tstamp_stm TIMESTAMP WITH TIME ZONE NOT NULL,
action_tstamp_clk TIMESTAMP WITH TIME ZONE NOT NULL,
transaction_id bigint,
application_name text,
client_addr inet,
client_port integer,
client_query text,
action TEXT NOT NULL CHECK (action IN ('I','D','U', 'T')),
row_data hstore,
changed_fields hstore,
statement_only boolean not null
);
REVOKE ALL ON logged_actions FROM public;
 
COMMENT ON TABLE logged_actions IS 'History of auditable actions on audited tables, from if_modified_func()';
COMMENT ON COLUMN logged_actions.event_id IS 'Unique identifier for each auditable event';
COMMENT ON COLUMN logged_actions.schema_name IS 'Database schema audited table for this event is in';
COMMENT ON COLUMN logged_actions.table_name IS 'Non-schema-qualified table name of table event occured in';
COMMENT ON COLUMN logged_actions.relid IS 'Table OID. Changes with drop/create. Get with ''tablename''::regclass';
COMMENT ON COLUMN logged_actions.session_user_name IS 'Login / session user whose statement caused the audited event';
COMMENT ON COLUMN logged_actions.action_tstamp_tx IS 'Transaction start timestamp for tx in which audited event occurred';
COMMENT ON COLUMN logged_actions.action_tstamp_stm IS 'Statement start timestamp for tx in which audited event occurred';
COMMENT ON COLUMN logged_actions.action_tstamp_clk IS 'Wall clock time at which audited event''s trigger call occurred';
COMMENT ON COLUMN logged_actions.transaction_id IS 'Identifier of transaction that made the change. May wrap, but unique paired with action_tstamp_tx.';
COMMENT ON COLUMN logged_actions.client_addr IS 'IP address of client that issued query. Null for unix domain socket.';
COMMENT ON COLUMN logged_actions.client_port IS 'Remote peer IP port address of client that issued query. Undefined for unix socket.';
COMMENT ON COLUMN logged_actions.client_query IS 'Top-level query that caused this auditable event. May be more than one statement.';
COMMENT ON COLUMN logged_actions.application_name IS 'Application name set when this audit event occurred. Can be changed in-session by client.';
COMMENT ON COLUMN logged_actions.action IS 'Action type; I = insert, D = delete, U = update, T = truncate';
COMMENT ON COLUMN logged_actions.row_data IS 'Record value. Null for statement-level trigger. For INSERT this is the new tuple. For DELETE and UPDATE it is the old tuple.';
COMMENT ON COLUMN logged_actions.changed_fields IS 'New values of fields changed by UPDATE. Null except for row-level UPDATE events.';
COMMENT ON COLUMN logged_actions.statement_only IS '''t'' if audit event is from an FOR EACH STATEMENT trigger, ''f'' for FOR EACH ROW';
 
CREATE INDEX logged_actions_relid_idx ON logged_actions(relid);
CREATE INDEX logged_actions_action_tstamp_tx_stm_idx ON logged_actions(action_tstamp_stm);
CREATE INDEX logged_actions_action_idx ON logged_actions(action);
 
--建立触发器函数
CREATE OR REPLACE FUNCTION if_modified_func() RETURNS TRIGGER AS $body$
DECLARE
audit_row logged_actions;
include_values boolean;
log_diffs boolean;
h_old hstore;
h_new hstore;
excluded_cols text[] = ARRAY[]::text[];
BEGIN
IF TG_WHEN <> 'AFTER' THEN
RAISE EXCEPTION 'if_modified_func() may only run as an AFTER trigger';
END IF;
audit_row = ROW(
nextval('logged_actions_event_id_seq'), -- event_id
TG_TABLE_SCHEMA::text,                        -- schema_name
TG_TABLE_NAME::text,                          -- table_name
TG_RELID,                                     -- relation OID for much quicker searches
session_user::text,                           -- session_user_name
current_timestamp,                            -- action_tstamp_tx
statement_timestamp(),                        -- action_tstamp_stm
clock_timestamp(),                            -- action_tstamp_clk
txid_current(),                               -- transaction ID
current_setting('application_name'),          -- client application
inet_client_addr(),                           -- client_addr
inet_client_port(),                           -- client_port
current_query(),                              -- top-level query or queries (if multistatement) from client
substring(TG_OP,1,1),                         -- action
NULL, NULL,                                   -- row_data, changed_fields
'f'                                           -- statement_only
);
IF NOT TG_ARGV[0]::boolean IS DISTINCT FROM 'f'::boolean THEN
audit_row.client_query = NULL;
END IF;
IF TG_ARGV[1] IS NOT NULL THEN
excluded_cols = TG_ARGV[1]::text[];
END IF;
IF (TG_OP = 'UPDATE' AND TG_LEVEL = 'ROW') THEN
audit_row.row_data = hstore(OLD.*);
audit_row.changed_fields =  (hstore(NEW.*) - audit_row.row_data) - excluded_cols;
IF audit_row.changed_fields = hstore('') THEN
-- All changed fields are ignored. Skip this update.
RETURN NULL;
END IF;
ELSIF (TG_OP = 'DELETE' AND TG_LEVEL = 'ROW') THEN
audit_row.row_data = hstore(OLD.*) - excluded_cols;
ELSIF (TG_OP = 'INSERT' AND TG_LEVEL = 'ROW') THEN
audit_row.row_data = hstore(NEW.*) - excluded_cols;
ELSIF (TG_LEVEL = 'STATEMENT' AND TG_OP IN ('INSERT','UPDATE','DELETE','TRUNCATE')) THEN
audit_row.statement_only = 't';
ELSE
RAISE EXCEPTION '[if_modified_func] - Trigger func added as trigger for unhandled case: %, %',TG_OP, TG_LEVEL;
RETURN NULL;
END IF;
INSERT INTO logged_actions VALUES (audit_row.*);
RETURN NULL;
END;
$body$
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog, public;
 
CREATE OR REPLACE FUNCTION audit_table(target_table regclass, audit_rows boolean, audit_query_text boolean, ignored_cols text[]) RETURNS void AS $body$
DECLARE
stm_targets text = 'INSERT OR UPDATE OR DELETE OR TRUNCATE';
_q_txt text;
_ignored_cols_snip text = '';
BEGIN
EXECUTE 'DROP TRIGGER IF EXISTS audit_trigger_row ON ' || target_table;
EXECUTE 'DROP TRIGGER IF EXISTS audit_trigger_stm ON ' || target_table;
IF audit_rows THEN
IF array_length(ignored_cols,1) > 0 THEN
_ignored_cols_snip = ', ' || quote_literal(ignored_cols);
END IF;
_q_txt = 'CREATE TRIGGER audit_trigger_row AFTER INSERT OR UPDATE OR DELETE ON ' ||
target_table ||
' FOR EACH ROW EXECUTE PROCEDURE if_modified_func(' ||
quote_literal(audit_query_text) || _ignored_cols_snip || ');';
RAISE NOTICE '%',_q_txt;
EXECUTE _q_txt;
stm_targets = 'TRUNCATE';
ELSE
END IF;
_q_txt = 'CREATE TRIGGER audit_trigger_stm AFTER ' || stm_targets || ' ON ' ||
target_table ||
' FOR EACH STATEMENT EXECUTE PROCEDURE if_modified_func('||
quote_literal(audit_query_text) || ');';
RAISE NOTICE '%',_q_txt;
EXECUTE _q_txt;
END;
$body$
language 'plpgsql';
 
--上面的触发器创建完之后无法直接使用,需要再建一个,调整参数传入。
CREATE OR REPLACE FUNCTION audit_table(target_table regclass, audit_rows boolean, audit_query_text boolean) RETURNS void AS $body$
SELECT audit_table($1, $2, $3, ARRAY[]::text[]);
$body$ LANGUAGE SQL;
-- And provide a convenience call wrapper for the simplest case
-- of row-level logging with no excluded cols and query logging enabled.
--
CREATE OR REPLACE FUNCTION audit_table(target_table regclass) RETURNS void AS $$
SELECT audit_table($1, BOOLEAN 't', BOOLEAN 't');
$$ LANGUAGE 'sql';
 
--使用方法
select audit_table('xxxx');
 
--xxxx就是需要开启审计的表名,这样只要对xxxx表进行了增删改,同库下的logged_actions表里会留下记录,妈妈再也不用担心我背锅了。