钉钉

钉钉机器人

创建钉钉群组机器人

  1. 选中群→点击右上角的群设置→机器人
  2. 添加机器人→自定义→添加
  3. 机器人名称:期货(填写给机器人取的名称)
  4. 安全设置:加签(选中)
  5. 复制:SEC4c4262656fe9585e82ed9004f3688d0ff1257aee8e79de3c1c896db7d2b3f30d,脚本中的secret变量内容。
  6. 我已阅读并同意《自定义机器人服务及免责条款》→完成
  7. 复制Webhook: https://oapi.dingtalk.com/robot/send?access_token=a64487796eb55203949dfb7cbcd5f4cefa30ad718f2069f18bd155f5ae846784,脚本access_token变量为等号后面的内容

注意:access_token与secret的内容需要替换为实际内容,并且需要保密。

创建机器人

机器人脚本

dt_robot.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#!/usr/bin/python3

import requests
import json
import datetime
import time
import hmac
import hashlib
import base64
import urllib.parse
import configparser

class dt_robot():
config_file = 'config.ini'
parser = configparser.ConfigParser()

@classmethod
def readkey(cls, section, key):
cls.parser.read(cls.config_file)
value = cls.parser.get(section, key)
return value

@classmethod
def sections(cls):
sections = cls.parser.sections()
print(sections)
return sections

def __init__(self, access_token, secret, name):
self.access_token = access_token
self.secret = secret
self.name = name

def timestamp_sign(self):
timestamp = str(round(time.time() * 1000))
secret_enc = self.secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, self.secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))

return (timestamp, sign)

def url(self):
timestamp, sign = self.timestamp_sign()
url_template = 'https://oapi.dingtalk.com/robot/send?access_token={}&timestamp={}&sign={}'
url = url_template.format(self.access_token, timestamp, sign)
return url

def message_headers(self):
return {
'Content-Type': 'application/json'
}

def message_data(self, content):
data = {
'msgtype': 'text',
'text': {
'content': '{}: {}'.format(self.name, content)
}
}
return data

def send(self, content):
url = self.url()
response = requests.post(url, headers=self.message_headers(),
data=json.dumps(self.message_data(content)))
if response.status_code == 200:
print('{}: 消息发送成功'.format(self.name))
else:
print('{}: 消息发送失败'.format(self.name))


if __name__ == '__main__':
def main():
access_token = dt_robot.readkey('1min', 'access_token')
secret = dt_robot.readkey('1min', 'secret')
# print(access_token)
# print(secret)
content = datetime.datetime.now().strftime('%F %T')
dt_1m = dt_robot(access_token = access_token, secret = secret, name = "1分钟线")
dt_1m.send(content)

access_token = dt_robot.readkey('5min', 'access_token')
secret = dt_robot.readkey('5min', 'secret')
# print(access_token)
# print(secret)
content = datetime.datetime.now().strftime('%F %T')
dt_5m = dt_robot(access_token = access_token, secret = secret, name = "5分钟线")
dt_5m.send(content)


main()

config.ini

1
2
3
4
5
6
7
[1min]
access_token=47fe7914af8b0c1b791689b1ff94e35e2fd21e6bdd38a2d04d9c157001321234
secret=SEC517b26d52b5c1a85ad86b68351f31f2512c0b2e07962e60de91f5026da141234

[5min]
access_token=39fcd931f4efee7bce03d6f011ea515108b7f148bfa6d5791c348ea997ca1234
secret=SECb559605001187e90fa6e7e8b65e70fa18f84b74f4103196d67d65235f75a1234