linux服务器接入rsyslog日志审计

一、收集服务器操作命令

使用rsyslog统一收集服务器操作命令

1.1 rsyslog 客户端接入配置

使用rsyslog将服务器的操作指令和登录信息发送到目标rsyslog服务端进行统一存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# vim /etc/rsyslog.conf

$ModLoad imuxsock # provides support for local system logging (e.g. via logger command)
$ModLoad imjournal # provides access to the systemd journal
$ModLoad imklog # reads kernel messages (the same are read from journald)
$WorkDirectory /var/lib/rsyslog
$ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat
$IncludeConfig /etc/rsyslog.d/*.conf
$OmitLocalLogging on
$IMJournalStateFile imjournal.state
*.info;mail.none;authpriv.none;cron.none /var/log/messages
authpriv.* /var/log/secure
mail.* -/var/log/maillog
cron.* /var/log/cron
*.emerg :omusrmsg:*
uucp,news.crit /var/log/spooler
local7.* /var/log/boot.log
local5.* /var/log/history.log

authpriv.info /var/log/authpriv.log

kern.warning /var/log/kern.log

*.err /var/log/errors.log

auth.none /var/log/auth.log
$template message_template,"message %syslogtag% %rawmsg% \n"
$template history_template,"history %syslogtag% %rawmsg%\n"
$template authpriv_template,"authpriv %syslogtag% %rawmsg%\n"
local5.* @192.168.223.137:514;history_template
authpriv.* @192.168.223.137:514;authpriv_template

配置完后,重启 rsyslog 服务

1
# systemctl restart rsyslog.service

重新开启终端 查看/var/log/secure 日志中有Accepted password关键字就配置成功了

1
2
3
4
# tail -n1 -f /var/log/secure

Feb 22 10:58:07 czz sshd[3332]: Accepted password for czz from 192.168.223.1 port 57338 ssh2
Feb 22 10:58:08 czz sshd[3332]: pam_unix(sshd:session): session opened for user czz by (uid=0)

若没有Accepted password日志,1.重新启动rsyslog服务,关闭当前终端并重新登录;2.测试环境和日志中心网络不通,rsyslog无法将日志发送到日志中心

1.2 修改 /etc/bashrc 记录操作日志

修改/etc/bashrc在最下方添加如下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
# vim  /etc/bashrc

WHO=$(who am i|awk '{print $1}')
SSH_CLIENT(){
SSH_CLIENT=`grep "who=$WHO.*from=[0-9]\{1,3\}" /var/log/history.log|tail -n1|sed -r 's/.*from=(.*),pwd.*/\1/'`
}
if [[ `whoami` != $WHO || $SSH_CLIENT == "" ]] ;then SSH_CLIENT;fi
HISTFILESIZE=2000
HISTSIZE=2000
HISTTIMEFORMAT="%Y%m%d-%H%M%S: "
export HISTTIMEFORMAT
export SSH_CLIENT
export PROMPT_COMMAND='{ command=$(history 1 | { read x y; echo $y; }); logger -p local5.notice -t bash -i "hostname=`hostname`,user=$USER,who=$WHO,ppid=$PPID,from=$SSH_CLIENT,pwd=$PWD,command:$command"; }'

使其生效

1
# source /etc/bashrc

添加普通用户对 /var/log/history.log 日志读取权限

1
# chmod o+r,g+r /var/log/history.log

重新开启终端 查看history日志,以下输出日志所有=号后面没有空字符就配置成功了

1
2
3
# tail /var/log/history.log

Feb 23 11:26:16 czz bash[4434]: hostname=czz.com,user=root,who=czz,ppid=4380,from=192.168.223.1 61460 22,pwd=/home/czz,command:20230223-112616: chmod o+r,g+r /var/log/history.log

=号后面为空处理方法:1.重新启动rsyslog服务,关闭当前终端并重新登录,随便执行一条指令后查看日志;2.执行source /etc/bashrc时有报错,返回1.2重新配置;3./var/log/history.log日志文件需要对普通用户添加 权限

1.3 验证

rsyslog服务端机器,查看/syslogbak/2022-05-07目录下当前日期的文件夹下,是否已包含配置ip的日志文件

1
2
3
4
# ll /syslog/2023-02-22/
aspire-history-192.168.223.131.log
aspire-authpriv-192.168.223.131.log
aspire-message-192.168.63.68.log

二、Logstash 解析日志配置

filebeat 增加authpriv 日志采集

1
2
3
4
5
6
7
8
9
10
11
12
- type: log
enabled: true
paths:
- /syslog/*/authpriv-*.log
multiline.pattern: '^[[:blank:]]'
multiline.negate: false
multiline.match: after
fields:
dtype: authpriv
cip: '192.168.223.137'
fields_under_root: true
tags: ["authpriv"]

logstash 修改日志解析配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
input {
kafka {
bootstrap_servers => ["192.168.223.137:9092"]
client_id => "message"
group_id => "message"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["message"]
type => "message"
}
kafka {
bootstrap_servers => ["192.168.223.137:9092"]
client_id => "history"
group_id => "history"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["history"]
type => "history"
}
kafka {
bootstrap_servers => ["192.168.223.137:9092"]
client_id => "history"
group_id => "history"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["authpriv"]
type => "authpriv"
}
}

filter {
json {
source => "message"
}
date {
match => [ "timestamp", "yyyy-MM-dd-HH:mm:ss" ]
locale => "cn"
}
if [type] == "history" {
mutate{
split => ["message",","]
add_field => {
"hostname" => "%{[message][0]}"
"user" => "%{[message][1]}"
"exec_time" => "%{[message][0]}"
}
add_field => {
"who" => "%{[message][2]}"
}
add_field => {
"server_session_pid" => "%{[message][3]}"
}
add_field => {
"from" => "%{[message][4]}"
}
add_field => {
"pwd" => "%{[message][5]}"
}
add_field => {
"command_time" => "%{[message][6]}"
"command" => "%{[message][6]}"
"command_split" => "%{[message][6]}"
}
}
mutate{
gsub => ["hostname","\S.* hostname=",""]
gsub => ["user","user=",""]
gsub => ["exec_time"," [a-z].*",""]
gsub => ["who","who=",""]
gsub => ["server_session_pid","ppid=",""]
gsub => ["from","from=",""]
gsub => ["pwd","pwd=",""]
gsub => ["command","command:|\d{8}-\d{6}: ",""]
gsub => ["command_split","(command:|\d{8}-\d{6}: )|( [^\s]*)",""]
}
grok {
match => {
"message" => ["%{IP:client_ip}\s+(?<client_random_port>(\d{1,5}))\s(?<server_ssh_port>(\d{1,5}))"]
}
#remove_field => ["@metadata","cip","input","ecs","agent","tags","dtype","host","_id","_score","_type","@version","log.offset"]
}
grok {
match => ["[log][file][path]","(%{IP:server_ip})"]
}
date {
match=> ["exec_time","MMM dd HH:mm:ss"]
target=>"@timestamp"
}
}
if [type] == "authpriv" {
if "Failed password" in [message] {
grok {
match => {
"message" => ["(?<time>%{SYSLOGTIMESTAMP}).*(?<server_session_pid>(?<=\[)(.*)(?=\])/?)]: (?<state>[A-z]+).*(?<state2>(?<=for )([a-z]+)?(?= user)?).*(?<user>(?<=[user|for] )(.*)(?= from)/?).*%{IP:client_ip} \S+ (?<client_random_port>%{POSINT})"]
}
}
} else if "pam_unix(sshd:session)" in [message] {
grok {
match => {
"message" => ["(?<time>%{SYSLOGTIMESTAMP}).*(?<server_session_pid>(?<=\[)(.*)(?=\])/?)]: .*(?<state>(?<=: session )([a-z]+)?(?= for)?).*(?<user>(?<=user )(\S+)?)"]
}
}
} else {
drop{}
}
date {
match=> ["time","MMM dd HH:mm:ss"]
target=>"@timestamp"
}
grok {
match => ["[log][file][path]","(%{IP:server_ip})"]
}
}
#删除不需要的字段
mutate {
remove_field => ["@metadata","cip","input","ecs","agent","tags","dtype","host","_id","_score","_type","@version","log.offset"]
}
}


output {
if [type] == "message" {
elasticsearch {
hosts => ["192.168.223.137:9200"]
#user => "elastic"
#password => "QI++e00c"
index => "message-%{+yyyy.MM.dd}"
}
}else if [type] == "history" {
elasticsearch {
hosts => ["192.168.223.137:9200"]
index => "history-%{+yyyy.MM.dd}"
}
}else if [type] == "authpriv" {
elasticsearch {
hosts => ["192.168.223.137:9200"]
index => "authpriv-%{+yyyy.MM.dd}"
}
stdout {
codec => rubydebug
}
}
}
-------------本文结束感谢您的阅读-------------