Python提取阿里云CDN访问日志

需求说明

公司运营同事需要查看网站的访问来源分布地图,从而做一些运营决策分析事情。

公司网站使用的阿里云CDN加速,源站的访问访问日志里记录的是CDN节点的地址,这样就没办法使用源站的日志作为依据了;不过阿里云支持通过API导出CDN访问日志,这样只需要将CDN的日志下载到服务器,只要通过ELK日志采集流程,使用 logstash 的 Geoip 插件将IP转换成经纬度,然后通过kibana的地图展示出来就可以了。

实现步骤

1、导出CDN访问日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#!/usr/bin/env python
#coding=utf-8

from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcdn.request.v20180510.DescribeCdnDomainLogsRequest import DescribeCdnDomainLogsRequest
import json,requests,gzip,datetime,os


def download_logs(damin_name,file_down_path,file_unzip_path):
in_date = datetime.datetime.now().strftime("%Y-%m-%d")
dt = datetime.datetime.strptime(in_date, "%Y-%m-%d")
start_time = ((dt + datetime.timedelta(days=-1)).strftime("%Y-%m-%d") + "T00:00:00Z")
end_time = in_date + "T00:00:00Z"
client = AcsClient('<accessKeyId>', '<accessSecret>', 'cn-hangzhou')
request = DescribeCdnDomainLogsRequest()
request.set_accept_format('json')
request.set_DomainName(damin_name)
request.set_StartTime(start_time)
request.set_EndTime(end_time)
response = client.do_action_with_exception(request)
#print(str(response, encoding='utf-8'))
disk = json.loads(response)
disk2 = disk["DomainLogDetails"]["DomainLogDetail"]
for item in disk2:
disk3 = item["LogInfos"]
for item in disk3["LogInfoDetail"]:
#print(item["LogName"])
#print(item["LogPath"])
file_name = item["LogName"]
print("file_name:",file_name)
url = item["LogPath"]
r = requests.get("https://" + url)
#print(r.status_code)
with open(file_down_path + "/" + file_name, "wb") as code:
code.write(r.content)
unzip_file(file_down_path,file_unzip_path,file_name)

def unzip_file(file_down_path,file_unzip_path,file_name):
f_name = (file_name.replace(".gz", ""))
g_file = gzip.GzipFile(file_down_path + "/" + file_name)
open(file_unzip_path + "/" + f_name, "wb+").write(g_file.read())
g_file.close()
#删除下载的压缩包
os.remove(file_down_path + "/" + file_name)

if __name__ == '__main__':
damin_name = "zdmall.feiersmart.com"
file_down_path = "/work/admin/downloads/zdmall-cdn-log"
file_unzip_path = "/work/admin/logs/zdmall-cdn-log"

download_logs(damin_name,file_down_path,file_unzip_path)

2、配置filebeat采集日志

由filebeat采集日志,统一由logstash写入kafka这样可以防止数据过大导致日志丢失,再由logstash消费并解析为经纬度后存储到ES

filebeat版本是5.5.3

1
2
3
4
5
6
7
8
9
10
11
filebeat.prospectors:

- input_type: log
paths:
- /work/admin/logs/zdmall-cdn-log/zdmall.feiersmart.com*
document_type: nginx-cdn-logs

output.logstash:
hosts: ["logstash.feiersmart.local:5044"]
output.console:
pretty: true

3、logstash写入kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
output {

if [type] == "nginx-cdn-logs" {

# stdout {
# codec=>rubydebug
# }

kafka {
bootstrap_servers => "kafka1.feiersmart.local:9092,kafka2.feiersmart.local:9092,kafka3.feiersmart.local:9092"
topic_id => "logstash-nginx-cdn-logs"
codec => "json"
compression_type => "gzip"
max_request_size => 20000120
}
}
}

4、logstash Geoip 插件解析经纬度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
input {
kafka {
bootstrap_servers => "kafka1.feiersmart.local:9092,kafka2.feiersmart.local:9092,kafka3.feiersmart.local:9092"
topics => ["logstash-nginx-cdn-logs"]
codec => "json"
}
}

filter {
grok {
match => {
"message" => "(?<request_time>([\s\S]{28})) %{IP:clientip} "
}
}
geoip {
source => "clientip"
target => "geoip"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
#fields => ["country_name", "coordinates", "region_name", "city_name", "latitude", "longitude", "ip"]
remove_field => ["[geoip][country_code2]","[geoip][country_code3]","[geoip][continent_code]","[geoip][region_code]","[geoip][latitude]","[geoip][longitude]","[geoip][timezone]","[geoip][coordinates]","[input_type]","[beat][hostname]","[beat][name]","[beat][version]","[tags]","[host]","[source]","[clientip]"]
}
mutate {
convert => [ "[geoip][coordinates]", "float"]
rename => {"request_time" => "message"}
}

}

output {

if [type] == "nginx-cdn-logs" {

#stdout {
# codec=>rubydebug
#}

elasticsearch {
hosts => ["es-work.feiersmart.local:9200"]
index => "logstash-nginx-cdn-logs-%{+YYYY.MM.dd}"
document_id => "%{type}-%{+YYYY.MM.dd}-%{host}-%{offset}"
}
}
}

这里使用 remove_field 删除日志中不需要的数据,这样存储到ES中时数据会小很多

5、kibana绘图

点击kibana左的Visualize可视化功能,选择 Maps 下的 Corrdinate Map 视图

调整好参数后,大概就是这个样子

-------------本文结束感谢您的阅读-------------