Python 操作 Mysql 进行数据库的 diff
off999 2024-10-12 06:16 29 浏览 0 评论
分享主题:如何使用 Python 操作 Mysql 实现不同环境相同库的 diff
一、适用场景
项目工作中,我们会遇到测试环境特别多的情况,例如:n套beta环境,多套预生产环境,多套uat环境等各种测试环境。为保证所有测试环境表结构上的一致性,如果只是单纯地靠人工来检查并更新,未免显得太过吃力且效率低下,还容易在实施的过程中出现遗漏的问题。
因此使用自动化脚本来完成这项工作就显得格外重要了,今天分享的主要内容就是通过自动化脚本协助你找到不同测试环境之间差异化的内容,进而可以避免同步过程中出现的遗漏问题。
二、代码实现
1、前置处理与基础配置
1import pymysql
2from email.mime.multipart import MIMEMultipart
3from email.mime.text import MIMEText
4import smtplib
5import time
6
7#忽略掉的db字典
8ignoreDb={'information_schema': 'information_schema',
9 'mysql': 'mysql',
10 'performance_schema': 'performance_schema',
11 'sys': 'sys'
12 }
13
14#忽略掉的db在查询sql中使用的
15ignoreDbSQL="('information_schema', 'mq_store','performance_schema', 'sys','edsystem')"
16
17#收diff报告的邮箱地址
18emails='tony.wu@test-la.com'
19
20#排除不在diff范围内的表名称列表
21exclude_table = ['tt', 'hurdle_policy_back', 'preferences_0309', 'preferences_0524', 'preferences_0310','t1','t2','mock_data']
2、用来发送diff报告
1def send_mail(receivers, title, content):
2 sender = 'qa.notice@test-la.com'
3 mailto = receivers.split(",")
4 try:
5 msg = MIMEMultipart()
6 msg['Subject'] = title
7 to_user = ",".join(mailto)
8
9 print("receivers...", to_user)
10 msg['to'] = to_user
11 msg['From'] = sender
12
13 body = MIMEText(content, _subtype='html', _charset='utf-8')
14 msg.attach(body)
15 smtp = smtplib.SMTP('smtp.office365.com', 587)
16 smtp.starttls()
17 print("sending")
18 smtp.login("qa.notice@test-la.com", "test123456")
19 smtp.sendmail(sender, mailto, msg.as_string())
20 print("send")
21 smtp.quit()
22 except smtplib.SMTPException as e:
23 print(e)
3、查询获取所有的列名数据
1def queryAllColumns(mycursor):
2 sql = " SELECT TABLE_SCHEMA 库名,TABLE_NAME 表名,COLUMN_NAME 列名, COLUMN_TYPE 数据类型, DATA_TYPE 字段类型, " \
3 "CHARACTER_MAXIMUM_LENGTH 长度, IS_NULLABLE 是否为空, COLUMN_DEFAULT 默认值, " \
4 "COLUMN_COMMENT 备注 FROM INFORMATION_SCHEMA.COLUMNS " \
5 "where table_schema not in " + ignoreDbSQL;
6 # print(sql)
7 mycursor.execute(sql)
8 result = mycursor.fetchall()
9
10 d = {};
11 for x in result:
12 r={}
13 r['TABLE_SCHEMA'] = x[0];
14 r['TABLE_NAME'] = x[1];
15 if(x[1] in exclude_table):
16 continue
17 r['COLUMN_NAME'] = x[2];
18 r['COLUMN_TYPE'] = x[3];
19 r['DATA_TYPE'] = x[4];
20 r['CHARACTER_MAXIMUM_LENGTH'] = x[5];
21 r['IS_NULLABLE'] = x[6];
22 r['COLUMN_DEFAULT'] = x[7];
23 r['COLUMN_COMMENT'] = x[8];
24 d[x[0] + "." + x[1] + "." + x[2]] = r;
4、最终d的数据模式如下并将其返回
1 d=
2 {'route_config_meta.entity_sharding_config.create_date':
3 {'TABLE_SCHEMA': 'route_config_meta', 'TABLE_NAME': 'entity_sharding_config', 'COLUMN_NAME': 'create_date',
4 'COLUMN_TYPE': 'datetime', 'DATA_TYPE': 'datetime', 'CHARACTER_MAXIMUM_LENGTH': None, 'IS_NULLABLE': 'NO',
5 'COLUMN_DEFAULT': None, 'COLUMN_COMMENT': '创建时间'},
6 'route_config_meta.entity_sharding_config.update_time':
7 {'TABLE_SCHEMA': 'route_config_meta', 'TABLE_NAME': 'entity_sharding_config', 'COLUMN_NAME': 'update_time',
8 'COLUMN_TYPE': 'datetime', 'DATA_TYPE': 'datetime', 'CHARACTER_MAXIMUM_LENGTH': None, 'IS_NULLABLE': 'NO',
9 'COLUMN_DEFAULT': 'CURRENT_TIMESTAMP', 'COLUMN_COMMENT': '更新时间'}
10 }
11 return d;
5、查询获取所有的索引数据
1def queryAllIndex(mycursor):
2 sql = "Select TABLE_SCHEMA 库名称,TABLE_NAME 表名称,INDEX_NAME 索引的名称,SEQ_IN_INDEX 索引中的列序列号,COLUMN_NAME 列名称 from INFORMATION_SCHEMA.STATISTICS " \
3 "where TABLE_SCHEMA not in " + ignoreDbSQL;
4
5 sql=
6 Select TABLE_SCHEMA,TABLE_NAME,INDEX_NAME,SEQ_IN_INDEX,COLUMN_NAME from INFORMATION_SCHEMA.STATISTICS where TABLE_SCHEMA
7 not in ('information_schema', 'mq_store', 'mysql', 'performance_schema', 'slow_query_log','repeater',
8 'repeater_console','sys','edsystem','crs_adapter','oxi-adapter')
9
10 mycursor.execute(sql)
11
12 result = mycursor.fetchall()
13 # print(result)
14
15 result=
16 (
17 ('auth', 'authorities', 'PRIMARY', 1, 'id'),
18 ('auth', 'authorities', 'idx_code', 1, 'code')
19 )
20
21 d = {};
22 for x in result:
23 r = {}
24 r['TABLE_SCHEMA'] = x[0];
25 r['TABLE_NAME'] = x[1];
26 if (x[1] in exclude_table):
27 continue
28 r['INDEX_NAME'] = x[2];
29 r['SEQ_IN_INDEX'] = x[3];
30 r['COLUMN_NAME'] = x[4];
31 d[x[0]+"."+x[1]+"."+x[2]]=r;
32
33 d=
34 {
35 'auth.authorities.PRIMARY': {'TABLE_SCHEMA': 'auth', 'TABLE_NAME': 'authorities', 'INDEX_NAME': 'PRIMARY', 'SEQ_IN_INDEX': 1, 'COLUMN_NAME': 'id'},
36 'auth.authorities.idx_code': {'TABLE_SCHEMA': 'auth', 'TABLE_NAME': 'authorities', 'INDEX_NAME': 'idx_code', 'SEQ_IN_INDEX': 1, 'COLUMN_NAME': 'code'}
37 }
38 return d;
6、查询获取数据库实例名称
1def queryDbs(mycursor):
2 sql = 'show databases';
3 mycursor.execute(sql)
4 result = mycursor.fetchall()
5
6 d = {};
7 for x in result:
8 d[x[0]] = x[0];
9 return d;
7、构建成html格式的diff报表出来
1def buildHtml(db1,db2,cst,ist,tip):
2 # tip = db2.get('name') + "(" + db2.get('host') + ")"+ " 对比 " + db1.get('name') + "(" + db1.get('host') + ")"
3 str = '<!DOCTYPE html> <html> <meta charset="utf-8"> <head> <style type="text/css"> table.gridtable {font-family: verdana,arial,sans-serif; font-size:11px; color:#333333; border-width: 1px; border-color: #666666; border-collapse: collapse; } table.gridtable th {border-width: 1px; padding: 8px; border-style: solid; border-color: #666666; background-color: #dedede; } table.gridtable td {border-width: 1px; padding: 8px; border-style: solid; border-color: #666666; background-color: #ffffff; } </style> </head>';
4 str = str+'<body>'
5
6 str = str+' <table class="gridtable">'
7 str = str+ tip+' 缺少字段:'
8 str = str+' <tr> <th>database</th> <th>table</th> <th>column</th> <tr/> '
9
10 for x in cst:
11 xs = x.split('.')
12 str=str + '<tr> <td>'+xs[0]+'</td> <td>'+xs[1]+'</td> <td>'+xs[2]+'</td> <tr/>'
13 str = str + ' </table>'
14
15
16 str = str + ' <table class="gridtable">'
17 str = str + tip + ' 缺少索引:'
18 str = str + ' <tr> <th>database</th> <th>table</th> <th>index</th> <tr/> <tr>'
19
20 for x in ist:
21 xs = x.split('.')
22 str = str + '<tr> <td>' + xs[0] + '</td> <td>' + xs[1] + '</td> <td>' + xs[2] + '</td> <tr/>'
23
24 str = str + ' </table>'
25 str = str+'</body> </html>';
26 return str;
8、数据库的字段与索引diff 对比,并生成diff报告,发送邮件
1def diff(db1,db2):
2 mydb1 = pymysql.connect(
3 host=db1.get('host'),
4 user=db1.get('user'),
5 passwd=db1.get('password')
6 );
7
8 mydb2 = pymysql.connect(
9 host=db2.get('host'),
10 user=db2.get('user'),
11 passwd=db2.get('password')
12 )
13
14 mycursor1 = mydb1.cursor()
15 mycursor2 = mydb2.cursor()
16
17 #获取两个库里面的所有字段相关值
18 all_columns1 = queryAllColumns(mycursor1)
19 all_columns2 = queryAllColumns(mycursor2)
20
21 #获取两个库里面的所有索引相关值
22 all_index1 = queryAllIndex(mycursor1)
23 all_index2 = queryAllIndex(mycursor2)
24
25 mycursor1.close()
26 mycursor2.close()
27 mydb1.close()
28 mydb2.close()
29
30 #定义了一个提示信息标题头
31 tip = db2.get('name') +" 对比 " + db1.get('name') + "--(" + db2.get('host') + " 对比 "+ db1.get('host') + ")"
32
33 all_columns1的数据格式与如下all_index1雷同({key:value}),但是数据值上是有差异的。
34
35 cs1 = set(all_columns1)
36 cs2 = set(all_columns2)
37 cst = cs1.difference(cs2)
9、索引的示例数据如下
1 all_index1=
2 {
3 'auth.authorities.PRIMARY': {'TABLE_SCHEMA': 'auth', 'TABLE_NAME': 'authorities', 'INDEX_NAME': 'PRIMARY', 'SEQ_IN_INDEX': 1, 'COLUMN_NAME': 'id'},
4 'auth.authorities.idx_code': {'TABLE_SCHEMA': 'auth', 'TABLE_NAME': 'authorities', 'INDEX_NAME': 'idx_code', 'SEQ_IN_INDEX': 1, 'COLUMN_NAME': 'code'}
5 }
10、使用set做去重处理,只留下唯一的key
1 is1 = set(all_index1)
2 is2 = set(all_index2)
3 #
4 ist = is1.difference(is2)
5
6 ist=
7 {'rate.tt.PRIMARY', 'entity_storage_1.reservation_log_0.transaction_id', 'rate.rate_header.idx_tcode_tax_include'}
8
9 content = buildHtml(db1,db2,sorted(cst),sorted(ist),tip)
10
11 fmt = '%Y-%m-%d %a %H:%M:%S' # 定义时间显示格式
12 nowtime = time.strftime(fmt, time.localtime(time.time())) # 把传入的元组按照格式,输出字符串
13 print ('当前的时间:', nowtime)
14
15 if len(ist) >= 0 or len(cst) >0:
16 send_mail(emails,'DBDIFF:'+tip ,content)
17
18if __name__ == '__main__':
19 beta1 = {'name': 'beta1', 'host': '10.7.36.34', 'user': 'root', 'password': '123456'}
20 beta2 = {'name': 'beta2', 'host': '10.7.36.2', 'user': 'root', 'password': '123456'}
21
22 diff(beta1, beta2)
11、输出的邮件内容如下
1beta2 对比 beta1--(10.7.36.2 对比 10.7.36.34) 缺少字段:
2database table column
3db1 task_statistic create_date
4db1 task_statistic execute_task_count
5
6beta2 对比 beta1--(10.7.36.2 对比 10.7.36.34) 缺少索引:
7database table index
8db1 award_test idx_membership_related_id
9db2 record_flow idx_related_member_id
三、总结
今天分享的内容实操性比较强,Python 实现代码都是干货,建议动手实操更有助于加深理解哟~
相关推荐
- 面试官:来,讲一下枚举类型在开发时中实际应用场景!
-
一.基本介绍枚举是JDK1.5新增的数据类型,使用枚举我们可以很好的描述一些特定的业务场景,比如一年中的春、夏、秋、冬,还有每周的周一到周天,还有各种颜色,以及可以用它来描述一些状态信息,比如错...
- 一日一技:11个基本Python技巧和窍门
-
1.两个数字的交换.x,y=10,20print(x,y)x,y=y,xprint(x,y)输出:102020102.Python字符串取反a="Ge...
- Python Enum 技巧,让代码更简洁、更安全、更易维护
-
如果你是一名Python开发人员,你很可能使用过enum.Enum来创建可读性和可维护性代码。今天发现一个强大的技巧,可以让Enum的境界更进一层,这个技巧不仅能提高可读性,还能以最小的代价增...
- Python元组编程指导教程(python元组的概念)
-
1.元组基础概念1.1什么是元组元组(Tuple)是Python中一种不可变的序列类型,用于存储多个有序的元素。元组与列表(list)类似,但元组一旦创建就不能修改(不可变),这使得元组在某些场景...
- 你可能不知道的实用 Python 功能(python有哪些用)
-
1.超越文件处理的内容管理器大多数开发人员都熟悉使用with语句进行文件操作:withopen('file.txt','r')asfile:co...
- Python 2至3.13新特性总结(python 3.10新特性)
-
以下是Python2到Python3.13的主要新特性总结,按版本分类整理:Python2到Python3的重大变化Python3是一个不向后兼容的版本,主要改进包括:pri...
- Python中for循环访问索引值的方法
-
技术背景在Python编程中,我们经常需要在循环中访问元素的索引值。例如,在处理列表、元组等可迭代对象时,除了要获取元素本身,还需要知道元素的位置。Python提供了多种方式来实现这一需求,下面将详细...
- Python enumerate核心应用解析:索引遍历的高效实践方案
-
喜欢的条友记得关注、点赞、转发、收藏,你们的支持就是我最大的动力源泉。根据GitHub代码分析统计,使用enumerate替代range(len())写法可减少38%的索引错误概率。本文通过12个生产...
- Python入门到脱坑经典案例—列表去重
-
列表去重是Python编程中常见的操作,下面我将介绍多种实现列表去重的方法,从基础到进阶,帮助初学者全面掌握这一技能。方法一:使用集合(set)去重(最简单)pythondefremove_dupl...
- Python枚举类工程实践:常量管理的标准化解决方案
-
本文通过7个生产案例,系统解析枚举类在工程实践中的应用,覆盖状态管理、配置选项、错误代码等场景,适用于Web服务开发、自动化测试及系统集成领域。一、基础概念与语法演进1.1传统常量与枚举类对比#传...
- 让Python枚举更强大!教你玩转Enum扩展
-
为什么你需要关注Enum?在日常开发中,你是否经常遇到这样的代码?ifstatus==1:print("开始处理")elifstatus==2:pri...
- Python枚举(Enum)技巧,你值得了解
-
枚举(Enum)提供了更清晰、结构化的方式来定义常量。通过为枚举添加行为、自动分配值和存储额外数据,可以提升代码的可读性、可维护性,并与数据库结合使用时,使用字符串代替数字能简化调试和查询。Pytho...
- 78行Python代码帮你复现微信撤回消息!
-
来源:悟空智能科技本文约700字,建议阅读5分钟。本文基于python的微信开源库itchat,教你如何收集私聊撤回的信息。[导读]Python曾经对我说:"时日不多,赶紧用Python"。于是看...
- 登录人人都是产品经理即可获得以下权益
-
文章介绍如何利用Cursor自动开发Playwright网页自动化脚本,实现从选题、写文、生图的全流程自动化,并将其打包成API供工作流调用,提高工作效率。虽然我前面文章介绍了很多AI工作流,但它们...
- Python常用小知识-第二弹(python常用方法总结)
-
一、Python中使用JsonPath提取字典中的值JsonPath是解析Json字符串用的,如果有一个多层嵌套的复杂字典,想要根据key和下标来批量提取value,这是比较困难的,使用jsonpat...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- python计时 (73)
- python安装路径 (56)
- python类型转换 (93)
- python自定义函数 (53)
- python进度条 (67)
- python吧 (67)
- python字典遍历 (54)
- python的for循环 (65)
- python格式化字符串 (61)
- python串口编程 (60)
- python读取文件夹下所有文件 (59)
- java调用python脚本 (56)
- python操作mysql数据库 (66)
- python字典增加键值对 (53)
- python获取列表的长度 (64)
- python接口 (63)
- python调用函数 (57)
- python人脸识别 (54)
- python多态 (60)
- python命令行参数 (53)
- python匿名函数 (59)
- python打印九九乘法表 (65)
- python赋值 (62)
- python异常 (69)
- python元祖 (57)