如何实现自动化运维?自动化运维之日志统一管理.doc

上传人:白大夫 文档编号:3426910 上传时间:2019-08-24 格式:DOC 页数:10 大小:46KB
返回 下载 相关 举报
如何实现自动化运维?自动化运维之日志统一管理.doc_第1页
第1页 / 共10页
亲,该文档总共10页,到这儿已超出免费预览范围,如果喜欢就下载吧!
资源描述

《如何实现自动化运维?自动化运维之日志统一管理.doc》由会员分享,可在线阅读,更多相关《如何实现自动化运维?自动化运维之日志统一管理.doc(10页珍藏版)》请在三一文库上搜索。

1、如何实现自动化运维?自动化运维之日志统一管理一、日志收集及告警项目背景近来安全测试项目较少,想着把安全设备、nginx日志收集起来并告警, 话不多说,直接说重点,搭建背景:1. 日志源:安全设备日志(Imperva WAF、绿盟WAF、paloalto防火墙)、nginx日志等;2. 日志分析开源软件:ELK,告警插件:Sentinl 或elastalert,告警方式:钉钉和邮件;3. 安全设备日志-logstash-es,nginx日志由于其他部门已有一份(flume-kafka)我们通过kafka-logstash-es再输出一份,其中logstash的正则过滤规则需要配置正确,不然比较消

2、耗性能,建议写之前使用grokdebug先测试好再放入配置文件;4. 搭建系统:centos 7 , JDK 1.8, Python 2.75. ELK统一版本为5.5.2由于es和kibana的安装都比较简单,就不在下文中说明安装及配置方法了。相关软件的下载链接如下:Es:https:/artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.5.2.tar.gzKibana:https:/artifacts.elastic.co/downloads/kibana/kibana-5.5.2-linux-x86_64.tar.gz

3、Logstash:https:/artifacts.elastic.co/downloads/logstash/logstash-5.5.2.tar.gz测试链接Grokdebug:http:/grokdebug.herokuapp二、安全设备日志收集2.1 Imperva WAF配置策略-操作集-新增日志告警规则我这边没有用设备自身的一些日志规则,而是根据手册自定义了一些需要的日志字段,可在自定义策略的消息填入如下字段:StartTime=$!Alert.createTimeAlarmID=$!Alert.dn EventID=$!Event.dn AggregationInfo=$!Ale

4、rt.aggregationInfo.occurrencesAlert_level=$!Alert.severity RuleName=$!Alert.alertMetadata.alertName Category=$!Alert.alertType Alert_description=$!Alert.descriptionEventType=$!Event.eventType PolicyName=$!Rule.parent.displayNameSrcIP=$!Event.sourceInfo.sourceIp SrcPort=$!Event.sourceInfo.sourcePortP

5、roto=$!Event.sourceInfo.ipProtocol DstIP=$!Event.destInfo.serverIpDstPort=$!Event.destInfo.serverPort WebMethod=$!Event.struct.httpRequest.url.methodDomain=$!Alert.serverGroupName URL=$!Event.struct.httpRequest.url.pathResponseCode=$!Event.struct.httpResponse.responseCodeAlert_key=$!Event.struct.htt

6、pRequest.url.queryStringAction=$!Alert.immediateAction ResponseTime=$!Event.struct.responseTimeResponseSize=$!Event.struct.responseSize Headers_value=$!Event.struct.httpRequest.headers.valueParameters_value=$!Event.struct.httpRequest.parameters.value参数说明:告警开始时间、告警ID、事件ID、事情数量、告警级别. HTTP返回码、触发告警字符串、响

7、应动作、响应时间、响应大小、http包头的值,中间省略的部分请自行查看手册。其实Imperva WAF的总日志字段数不少于一两百个,单从这一点可以看出确实好于国产WAF太多。针对Imperva WAF的logstash配置如下:input syslog type = syslog port = 514 filter grok match =message,%GREEDYDATA:StartTime AlarmID=%NUMBER:AlarmID EventID=%NUMBER:EventIDAggregationInfo=%NUMBER:AggregationInfo Alert_level=

8、%DATA:Alert_levelRuleName=%GREEDYDATA:RuleName Category=%GREEDYDATA:CategoryAlert_description=%GREEDYDATA:Alert_description EventType=%DATA:EventTypePolicyName=%GREEDYDATA:PolicyName SrcIP=%IPV4:SrcIP SrcPort=%NUMBER:SrcPortProto=%DATA:Proto DstIP=%IPV4:DstIP DstPort=%NUMBER:DstPortWebMethod=%GREEDY

9、DATA:WebMethod Domain=%DATA:DomainURL=%GREEDYDATA:URL ResponseCode=%GREEDYDATA:ResponseCodeAlert_key=%GREEDYDATA:Alert_key Action=%DATA:ActionResponseTime=%GREEDYDATA:ResponseTime ResponseSize=%GREEDYDATA:ResponseSizeHeaders_value=%GREEDYDATA:Headers_valueParameters_value=%GREEDYDATA:Parameters_valu

10、e # 实际复制粘贴可能会有点格式问题,注意参数之间空一个空格即可 remove_field = message geoip source = SrcIP #IP归属地解析插件 output elasticsearch hosts = es单机或集群地址:9200 index = impervasyslog 说明:其中geoip为elk自带插件,可以解析ip归属地,比如ip归属的国家及城市,在仪表盘配置“地图炮”装X、查看攻击源地理位置的时候有点用,2.2 绿盟WAF配置日志报表-日志管理配置-Syslog配置针对绿盟WAF的logstash配置如下:input和output参照imperva

11、 waf,贴出最要的grok部分,如下:grok match = message,%DATA:syslog_flag site_id:%NUMBER:site_id protect_id:%NUMBER:protect_id dst_ip:%IPV4:dst_ip dst_port:%NUMBER:dst_port src_ip:%IPV4:src_ipsrc_port:%NUMBER:src_port method:%DATA:method domain:%DATA:domain uri:%DATA:uri alertlevel:%DATA:alert_level event_type:%D

12、ATA:Attack_types stat_time:%GREEDYDATA:stat_time policy_id:%NUMBER:policy_idrule_id:%NUMBER:rule_id action:%DATA:action block:%DATA:block block_info:%DATA:block_info http:%GREEDYDATA:URL2.3 paloalto防火墙配置(6.1版本,其他版本可能会有点差异)新建syslog服务器-日志转发,具体看截图针对PA的logstash配置如下:input和output参照imperva waf,贴出最要的grok部分,

13、如下:grok match =message,%DATA:PA_Name,%GREEDYDATA:Time,%NUMBER:Eventid,%DATA:Category,%DATA:Subcategory,%NUMBER:NULL,%GREEDYDATA:Generate_Time,%IPV4:SourceIP,%IPV4:DestinationIP,%IPV4:NAT_SourceIP,%IPV4:NAT_DestinationIP,%DATA:RuleName,%DATA:SourceUser,%DATA:DestinationUser,%DATA:Application,%DATA:VM

14、sys,%DATA:SourceZone,%DATA:DestinationZone,%DATA:IN_interface,%DATA:OUT_interface,%DATA:Syslog,%DATA:GREEDYDATA:TimeTwo,%NUMBER:SessionID,%NUMBER:Repeat,%NUMBER:SourcePort,%NUMBER:DestinationPort,%NUMBER:NAT-SourcePort,%NUMBER:NAT-DestinationPort,%DATA:Flag,%DATA:Proto,%DATA:Action,%DATA:NUll2,%DATA

15、:ThreatName,%DATA:Category2,%DATA:Priority,%DATA:Direction,%NUMBER:Serialnum,%GREEDYDATA:NULL3贴两张最终的效果图:三、Nginx日志收集由于nginx日志已经被其他大数据部门收集过一遍了,为避免重复读取,我们从其他部门的kafka拉取过来即可,这里说一下nginx收集的方式,flume-kafka 示例配置方式如下:Flume配置如下配置扫描日志文件log_analysis_test.conf配置文件a1.sources=s1 #可以理解为输入端,定义名称为s1a1.channels=c1 #传输频道

16、,类似队列,定义为c1,设置为内存模式a1.sinks=k1 #可以理解为输出端,定义为sk1#source配置a1.sources.s1.type=execa1.sources.s1mand=tail -F/data/log/nginx/crf_crm.access.loga1.sources.s1.channels=c1#channel配置a1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100a1.sinks.k1.type=org.apache.flume.s

17、ink.kafka.KafkaSink 设置Kafka接收器a1.sinks.k1.channel=c1a1.sinks.k1.topic=crm_nginx_log_topic #设置Kafka的Topica1.sinks.k1.brokerList=x.x.x.x:9092 #设置Kafka的broker地址和端口号a1.sinks.k1.requiredAcks=1a1.sinks.k1.batchSize=20kafka配置kafka下载wgethttp:/mirror.bit.edu/apache/kafka/0.8.2.2/kafka_2.9.1-0.8.2.2.tgz配置zook

18、eeper,根据机器状况更改jvm 内存设置vimbin/zookeeper-server-start.sh配置kafkavimbin/kafka-server-start.sh 根据机器状况更改jvm 内存设置启动zookeepernohupbin/zookeeper-server-start.sh config/zookeeper.properties 启动kafkanohup bin/kafka-server-start.shconfig/server.properties 创建应用服务器的topickafka-topics.sh -create zookeeper x.x.x.x:21

19、81-replication-factor 2 -partitions 5 -topic crm_nginx_log_topic分区及副本以自己的情况而定查看Topic是否创建成功./bin/kafka-topics.sh -list -zookeeperx.x.x.x:2181启动kafka生产者bin/kafka-console-producer.sh -broker-listx.x.x.x:9092 -topic pay启动kafka消费者kafka-console-consumer.sh -zookeeperx.x.x.x:2181-topic crm_nginx_log_topicl

20、ogstash 配置文件input kafka codec = plain group_id = logstash1 auto_offset_reset = smallest reset_beginning = true topic_id = crm_nginx_log_topic zk_connect = x.x.x.x:2181 # zookeeper的地址 grok 省略 output elasticsearch hosts = es单机或集群地址:9200 index = impervasyslog 四、安全告警安全告警可以是Sentinl或 elastalert,Sentinl是ki

21、bana插件,可以集成到kibana内图形化展示,但是写规则时需要对JS较熟悉,elastalert 是es的插件,不支持集成到kibana界面进行图形化展示。下面分别对这2个插件进行安装及配置说明。4.1 Sentinl安装如下:kibana-plugin install https:/githubsirensolutionssentinlreleasesdownloadtag-5.4sentinl-v5.4.1.zip安装后,以IP请求频次告警设置为例:每2分钟去es查询一次针对需要监控的IP字段Input的body内容即es查询语法编写过滤条件 script: script:payloa

22、d.json=超过阀值40;var match=false;var threshold=40;varfirst=payload.aggregations.code.buckets;for(var i=0;i= threshold)match=true;payload.json += 【ip】: + firsti.key + 【count】: +firsti.doc_count + n;match; 如果想对状态码做监控,参考如下:Action里面添加钉钉群机器人的webhook钉钉报警如下钉钉的接口文档链接:https:/open-doc.dingtalk/docs/doc?spm=a219a

23、.7629140.0.0.karFPe邮件告警设置如下:4.2 elastalert前置条件:JDK 1.8python 2.7easy_install -U setuptools (最新setuptools-39.2.0)yum install gccyum install python-develyum install libffi-devel安装git clonehttps:/github/Yelp/elastalert.git cd elastalert python setup.py install pip install -r requirements.txt cp config.

24、yaml.example config.yaml下载https:/github/xuyaoqiang/elastalert-dingtalk-pluginelastalert-dingtalk-plugin中的elastalert_modules复制到elastalert下创建索引 elastalert-create-index输入 es的IP 及 es的端口,其余根据自己的环境写,一般默认即可配置config.yaml,以下为关键配置信息:rules_folder: example_rules #指定rule的目录run_every:minutes: 1 #每一分钟去探测一次buffer_t

25、ime:minutes: 15 #缓存15分钟es_host: x.x.x.xes_port: 9200writeback_index: elastalert_status #创建的告警索引alert_time_limit:days: 2 #失败重试的时间限制下面先以钉钉告警为例:在example_rules里面新建钉钉告警配置文件,内容如下es_host: x.x.x.xes_port: 9200name: xxx安全告警type: cardinalityindex: nsfocuswaf_syslogcardinality_field: src_ipmax_cardinality: 30t

26、imeframe:minutes: 5 #单IP 5分钟内访问超过30次就会告警aggregation_key: src_ipsummary_table_fields:src_ipdst_ipdst_portAttack_typesstat_timealert:-“elastalert_modules.dingtalk_alert.DingTalkAlerter”dingtalk_webhook: “your webhook”dingtalk_msgtype: text群机器人的配置比较简单,自己搜索一下即可注意如果告警匹配了N条,却只发出1条告警,修改elastalert.py代码,在856

27、行后面增加如下代码:修改dingtalk_alert.py的代码,增加如下内容:src_ip = matches0src_ip #src_ip为grok过滤后自定义的字段,以下相同 dst_ip = matches0dst_ip dst_port = matches0dst_port attack_types = matches0Attack_types start_time = matches0stat_time 修改payload初的代码,如下: payload = msgtype: self.dingtalk_msgtype, text: content: r发现源IP地址:在5分钟内,

28、针对服务器IP:的端口发动了3次攻击,攻击类型是:,攻击时间:,请及时处理,谢谢!.format(src_ip,dst_ip,dst_port,attack_types,start_time) ,如果没有过滤除自定义的字段,只有message字段的话,可以新增如下代码:message = matches0message src_ip_pattern =rsrc_ip:(d+.d+.d+.d+) dst_ip_pattern =rdst_ip:(d+.d+.d+.d+) dst_port_pattern =rdst_port:(d+) time_pattern =rstat_time:(d4-d

29、2-d2s+d2:d2:d2) type_pattern =revent_type:(w+) src_ip= re.findall(src_ip_pattern, message)0 dst_ip= re.findall(dst_ip_pattern, message)0 dst_port= re.findall(dst_port_pattern, message)0 start_time= re.findall(time_pattern, message)0 attack_types= re.findall(type_pattern, message)0最终效果图如下:邮件的rule配置文件

30、如下:es_host: x.x.x.x es_port: 9200 name: 信息安全告警 type: cardinality index: nsfocuswaf_syslog cardinality_field: src_ip max_cardinality: 5 timeframe: minutes: 5 alert: - email smtp_host: mail.crfchina smtp_port: 25 smtp_auth_file:/opt/elastalert/smtp_auth_file.yaml email_reply_to: xxxxxqq #回复给谁 from_add

31、r: xxxxxxqq #发件人 email: - xxxxxqq #收件人 alert_subject: 信息安全告警 alert_text_type: alert_text_only alert_text: | XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 您好, 网站:正被人恶意攻击,请及时处理! 当前匹配XX规则数: 发生时间: 攻击者的IP: 攻击类型: 请求的URL: alert_text_args: - domain -num_hits -stat_tim

32、e -src_ip -Attack_types -URL邮件告警最终效果如下:五、总结相关进程运行命令如下:后台启动es nohup su - elasticsearch -c/opt/elasticsearch-5.5.2/bin/elasticsearch -d 后台启动logstash nohup ./bin/logstash -f x.x.x.conf 后台启动kibana nohup ./kibana 启动flumeflume-ng agent -conf conf -conf-fileconf/log_analysis_test.conf -name a1 Dflume.root.logger=INFO,console启动kafka nohup bin/kafka-server-start.shconfig/server.properties 后台启动elastalertnohup python -m elastalert.elastalert-config ./config.yaml -verbose -rule ./example_rules/DD_rule.yaml 常见的告警策略除了来自安全设备的正则之外,大量的IP请求、错误状态码、nginx的request请求中包含的特征码也都是常见的告警规则。

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 其他


经营许可证编号:宁ICP备18001539号-1