主页 > 其他  > 

Python实现定时查询数据库并发送消息的完整流程

Python实现定时查询数据库并发送消息的完整流程
Python 实现定时查询数据库并发送消息的完整流程

简介:在许多实际应用场景中,我们需要定时从数据库中查询特定数据,并将这些数据发送到指定的接口进行后续处理。本文将详细介绍如何使用 Python 编写一个程序,实现定时查询 MySQL 数据库中的数据,生成签名并发送消息到指定接口,同时在消息发送成功后更新数据库中的状态字段。

一、 整体流程概述 (一)整个程序的主要流程如下:

1、根据接口要求生成签名和时间戳,用于后续发送消息时的身份验证。 2、定时查询 MySQL 数据库中的msg表的数据。 3、对于查询到的每一行数据,如果其sfts字段为否,则将该行数据组织成消息内容,并结合之前生成的签名和时间戳发送到指定接口。a 4、在消息发送成功后,更新数据库中该行数据的sfts字段为是。 5、每小时重置一次数据。

(二)数据库数据处理 设置定时调度事件或者使用触发器,将需要的数据根据需求写入到msg表中。 CREATE DEFINER=`root`@`%` PROCEDURE `insert_zero_rows_data`() BEGIN -- 插入数据到 msg 表 INSERT INTO msg SELECT -- 插入数据到 msg 表 '否', --是否发送,默认为未发送 NOW() -- 记录数据插入的当前时间 FROM 来源表1 gg JOIN 来源表2 dr ON gg.bywm = dr.table_name WHERE dr.TABLE_ROWS = 0 AND gg.bywm NOT IN (SELECT bywm FROM msg);-- 避免重复插入数据 END

根据实际的业务需求,合理设置上述存储过程的调用周期或者触发条件。例如,我们可以通过以下代码调用该存储过程: BEGIN CALL insert_zero_rows_data(); END

(三)python代码实现 导入必要的库 import hashlib import base64 import hmac import time import requests import pymysql import schedule

这里导入了多个库: hashlib:用于哈希算法,如 SHA256。 base64:用于进行 Base64 编码和解码。 hmac:用于生成 HMAC(Hash - based Message Authentication Code)。 time:用于获取当前时间戳。 requests:用于发送 HTTP 请求。 pymysql:用于连接和操作 MySQL 数据库。 schedule:用于实现定时任务。

生成签名和时间戳 注:这里需要根据你的实际情况来进行修改。 def generate_signature(): """ 生成签名和时间戳 :return: 签名和时间戳 """ secret = "this is secret " # 预定义的密钥,需根据实际情况修改 timestamp = int(round(time.time())) # 获取当前时间戳 string_to_sign = f'{timestamp}@{secret}' # 组合时间戳和密钥生成待签名的字符串 hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest()# 使用 HMAC-SHA256 算法生成哈希值 sign = base64.b64encode(hmac_code).decode('utf-8')# 对哈希值进行 Base64 编码得到签名 return sign, timestamp

这个函数首先获取当前时间戳,然后将时间戳和一个预定义的密钥(secret)组合成一个字符串。接着使用 HMAC 算法,以这个字符串为输入,使用 SHA256 哈希算法生成一个哈希值,最后将这个哈希值进行 Base64 编码得到签名。函数返回生成的签名和时间戳。

查询数据库 def query_database(): """ 查询数据库中 msg 表的数据 :return: 查询结果 """ try: connection = pymysql.connect( host="host", port=port, user="user", password=r"password", database="database" ) with connection.cursor() as cursor: sql = "SELECT jkmc, bywm, sydw, sfts, datatime FROM msg" cursor.execute(sql) results = cursor.fetchall() return results except pymysql.Error as e: print(f"数据库查询出错: {e}") return [] finally: if 'connection' in locals() and connection: connection.close()

此函数用于连接到指定的 MySQL 数据库,并执行 SQL 查询语句,从msg表中获取jkmc、bywm、sydw、sfts和datatime字段的数据。如果查询过程中出现错误,将打印错误信息并返回一个空列表。无论查询是否成功,最终都会关闭数据库连接。

发送消息到指定接口 def send_message(row, sign, timestamp): """ 发送消息到指定接口 :param row: 数据库查询结果的一行数据 :param sign: 签名 :param timestamp: 时间戳 :return: 响应对象 """ HOOK_TOKEN = "HOOK_TOKEN" url = "url" params = { "hook_token": HOOK_TOKEN } CONTENT = f"监测:\n名称 {row[0]}\n表英文名 {row[1]}\n其他 {row[2]}\n发现问题时间 {row[4]}" payload = { "timestamp": str(timestamp), "sign": sign, "msgType": "text", "msgData": { "text": { "content": CONTENT } } } try: response = requests.post( url, params=params, json=payload, headers={'Content-Type': 'application/json'} ) print(f"Status Code: {response.status_code}")# 打印响应状态码 print(f"Response: {response.text}")# 打印响应内容 return response except requests.RequestException as e: print(f"消息发送出错: {e}")# 打印发送错误信息 return None

该函数负责将数据库查询结果的一行数据组织成消息内容,并发送到指定的接口。它首先定义了接口的 URL、请求参数(hook_token)以及消息内容(CONTENT)。然后构建请求负载(payload),其中包含时间戳、签名、消息类型和具体的消息数据。使用requests.post方法发送 POST 请求,并在请求成功或失败时打印相应的状态码和响应信息,最后返回响应对象。

更新数据库 def update_database(row): """ 更新数据库中 msg 表的 sfts 字段 :param row: 数据库查询结果的一行数据 """ try: connection = pymysql.connect( host="host", port=port, user="user", password=r"password", database="database" ) with connection.cursor() as cursor: # 修改更新条件,避免更新多条记录 sql = "UPDATE msg SET sfts = '是' WHERE bywm = %s AND datatime = %s" cursor.execute(sql, (row[1], row[4])) connection mit() # 提交事务 except pymysql.Error as e: print(f"数据库更新出错: {e}") connection.rollback() finally: if 'connection' in locals() and connection: connection.close()

此函数用于更新数据库中msg表的sfts字段。它连接到数据库,执行 SQL 更新语句,将符合条件(bywm和datatime匹配)的记录的sfts字段更新为是。如果更新过程中出现错误,将打印错误信息并回滚事务。无论更新是否成功,最终都会关闭数据库连接。

主函数 def main(): sign, timestamp = generate_signature() results = query_database() for row in results: if row[3] == "否": send_message(row, sign, timestamp) update_database(row)

主函数main首先调用generate_signature函数生成签名和时间戳,然后调用query_database函数获取数据库查询结果。接着遍历查询结果,对于每一行数据,如果其sfts字段为否,则调用send_message函数发送消息,并在消息发送成功后调用update_database函数更新数据库.

定时任务设置 注:定时任务可以在代码中部署,也可以在服务器上进行部署(见往期:服务器上任务的定时调度)

本次采用python 定时调度

def job(): main() if __name__ == "__main__": # 每五分钟运行一次 job 函数 schedule.every(5).minutes.do(job) while True: schedule.run_pending() time.sleep(1) job()

这里定义了一个job函数,其内部调用main函数。在if name == "main"代码块中,使用schedule库设置了一个定时任务,每五分钟运行一次job函数。通过一个无限循环,不断检查是否有定时任务需要执行,并在每次循环中等待 1 秒,同时也额外调用一次job函数(这部分额外调用可能是代码编写时的一个小失误,正常情况下按照schedule的机制,每五分钟执行一次即可,这部分额外调用可能会导致某些逻辑异常,建议根据实际需求进行调整)。

二、注意事项 数据库连接配置:确保数据库的主机地址、端口、用户名、密码和数据库名称等配置信息正确无误,否则将无法成功连接数据库。签名和时间戳的有效性:生成的签名和时间戳与接口的验证机制紧密相关,确保接口方的验证逻辑与本地生成逻辑一致,并且时间戳的有效期在接口允许的范围内。错误处理:在实际应用中,应根据具体需求对错误处理进行进一步优化。例如,对于数据库查询失败或消息发送失败的情况,可以记录详细的日志信息,以便后续排查问题。定时任务的稳定性:schedule库在某些复杂环境下可能会出现一些稳定性问题,建议在生产环境中进行充分的测试和监控,确保定时任务能够稳定运行。

通过以上步骤,我们成功实现了一个定时查询数据库并发送消息的 Python 程序,希望这篇文章对你有所帮助。如果你在实际应用中遇到问题,欢迎在评论区留言讨论。 以上代码仅供参考,实际应用中请根据具体需求进行调整和优化。

三、完整代码 import hashlib import base64 import hmac import time import requests import pymysql import schedule def generate_signature(): """ 生成签名和时间戳 :return: 签名和时间戳 """ secret = "" timestamp = int(round(time.time())) string_to_sign = f'{timestamp}@{secret}' hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest() sign = base64.b64encode(hmac_code).decode('utf-8') return sign, timestamp def query_database(): """ 查询数据库中 msg 表的数据 :return: 查询结果 """ try: connection = pymysql.connect( host="", port=, user="", password=r"", database="" ) with connection.cursor() as cursor: sql = "SELECT jkmc, bywm, sydw, sfts, datatime FROM msg" cursor.execute(sql) results = cursor.fetchall() return results except pymysql.Error as e: print(f"数据库查询出错: {e}") return [] finally: if 'connection' in locals() and connection: connection.close() def send_message(row, sign, timestamp): """ 发送消息到指定接口 :param row: 数据库查询结果的一行数据 :param sign: 签名 :param timestamp: 时间戳 :return: 响应对象 """ HOOK_TOKEN = "" url = "" params = { "hook_token": HOOK_TOKEN } CONTENT = f"监测:\n接口名称 {row[0]}\n表英文名 {row[1]}\n其他 {row[2]}\n发现问题时间 {row[4]}" payload = { "timestamp": str(timestamp), "sign": sign, "msgType": "text", "msgData": { "text": { "content": CONTENT } } } try: response = requests.post( url, params=params, json=payload, headers={'Content-Type': 'application/json'} ) print(f"Status Code: {response.status_code}") print(f"Response: {response.text}") return response except requests.RequestException as e: print(f"消息发送出错: {e}") return None def update_database(row): """ 更新数据库中 msg 表的 sfts 字段 :param row: 数据库查询结果的一行数据 """ try: connection = pymysql.connect( host="", port=, user="", password=r"", database="" ) with connection.cursor() as cursor: # 修改更新条件,避免更新多条记录 sql = "UPDATE msg SET sfts = '是' WHERE bywm = %s AND datatime = %s" cursor.execute(sql, (row[1], row[4])) connection mit() except pymysql.Error as e: print(f"数据库更新出错: {e}") connection.rollback() finally: if 'connection' in locals() and connection: connection.close() def main(): sign, timestamp = generate_signature() results = query_database() for row in results: if row[3] == "否": send_message(row, sign, timestamp) update_database(row) def job(): main() if __name__ == "__main__": # 每五分钟运行一次 job 函数 schedule.every(5).minutes.do(job) while True: schedule.run_pending() time.sleep(1) job()
标签:

Python实现定时查询数据库并发送消息的完整流程由讯客互联其他栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Python实现定时查询数据库并发送消息的完整流程