grafana企业微信报警webhook
2019-04-29
项目地址:https://github.com/fish2018/grafana-wechat
这是用flask实现的一个web服务,grafana配置报警时选择POST方式,通过query方式指定接收报警的微信分组(HOST:PORT?toparty=1),不显式传递toparty会使用配置里的默认分组。
- app.py
# _*_ coding:utf-8 _*_
from flask import Flask,request,jsonify,current_app
from utils import WeChat
app = Flask(__name__)
app.config.from_object('config.APP_ENV')
@app.route('/wechat',methods=['POST'])
def send_wechat():
# 在grafana使用query方式指定接收报警的微信分组
toparty = request.args.get('toparty')
req = request.json
res,token,expires_in = WeChat.send_card(req,toparty=toparty)
current_app.logger.info(res.json())
current_app.logger.info(req)
current_app.logger.info(expires_in)
return jsonify({"msg":req})
实现时主要有2个重点:
1、token校验有效期,这里将失效期写入到本地文件,下次读取出来做时间校验,失效了就重新获取
2、grafana报警会有多种结果,处理多种情况的数据结构
- wechat.py
# _*_coding:utf-8_*_
import time
import pickle
import datetime
import json
import requests
from config import APP_ENV
class WeChat(object):
CORPID = APP_ENV.CORPID
CORPSECRET = APP_ENV.CORPSECRET
BASEURL = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={0}&corpsecret={1}'.format(CORPID, CORPSECRET)
URL = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s"
@classmethod
def get_token(cls):
token_file = 'token'
try:
with open(token_file, 'rb') as f:
data_dict = pickle.load(f)
except:
data_dict = {}
try:
expires_time = data_dict['expires_time']
except:
expires_time = 0
now_time = int(time.mktime(datetime.datetime.now().timetuple()))
if now_time >= expires_time:
result = requests.get(cls.BASEURL).json()
expires_time = now_time + 7000
result['expires_time'] = expires_time
result['expires_in'] = expires_time - now_time
with open(token_file, 'wb') as f:
pickle.dump(result, f)
access_token = result['access_token']
expires_in = {"expires_in":result['expires_in']}
else:
result = data_dict
expires_time = data_dict['expires_time']
result['expires_in'] = expires_time - now_time
with open(token_file, 'wb') as f:
pickle.dump(result, f)
access_token = data_dict['access_token']
expires_in = {"expires_in":result['expires_in']}
return access_token,expires_in
@classmethod
def handler(cls,cont):
content = ""
c = dict()
title = cont['title']
ruleurl = cont['ruleUrl']
if cont['state'] == 'ok':
content = "已恢复正常"
elif not cont['evalMatches']:
content = "监控项异常"
else:
i = 1
for item in cont['evalMatches']:
value = item["value"]
if item["tags"]:
instance = item["tags"]["instance"]
else:
instance = "instance%s" % i
c['%s-%s' % (i,instance)] = value
i+=1
for k,v in c.items():
content += "%s:%s,\n" % (k,v)
return title,ruleurl,content
@classmethod
def send_card(cls,cont,**kwargs):
team_token,expires_in = cls.get_token()
url = cls.URL % (team_token)
title,ruleurl,content = cls.handler(cont)
wechat_json = {
"touser": APP_ENV.TOUSER,
"toparty": kwargs.get('toparty') or APP_ENV.TOPARTY,
"totag": APP_ENV.TOTAG,
"msgtype": "textcard",
"agentid": APP_ENV.AGENTID,
"textcard": {
"title": title,
"description": content,
"url": ruleurl,
"btntxt": "更多"
}
}
response = requests.post(url, data=json.dumps(wechat_json))
return response,team_token,expires_in
配置文件
- settings.py
class WeChatConf:
CORPID = "XXX"
CORPSECRET = "XXX"
TOUSER = ""
TOPARTY = "1"
TOTAG = ""
AGENTID = "xxx"
#开发环境
class DevelopmentConfig(LogConf,WeChatConf):
pass
APP_ENV = DevelopmentConfig
附企业微信平台的使用:
企业微信平台:https://work.weixin.qq.com/
注册完微信平台账号后,需要获取如下信息:
CorpID :点击 我的企业-企业信息,在最后一行查看
AgentId和Secret:点击 企业应用,创建自定义应用后显示
扫码关注企业微信平台:点击 连接微信-微信工作台
标题:grafana企业微信报警webhook
作者:fish2018
地址:https://www.devopser.org/articles/2019/04/24/1556064883572.html