python操作MySQL数据库源码实例

by admin on 2019年9月5日

我采用的是MySQLdb操作的MYSQL数据库。先来一个简单的例子吧:

坚持每天学一点,每天积累一点点,作为自己每天的业余收获,这个文章是我在吃饭的期间写的,利用自己零散的时间学了一下python操作MYSQL,所以整理一下。

这是python3下的MySQL基本操作。其他类型的数据库用法基本一样。就是库的名字不同。因为python官方很早之前就规定了数据库第三方库的借口,来避免API混乱的情况。

我采用的是MySQLdb操作的MYSQL数据库。先来一个简单的例子吧:

1.工具

MySQL,MySQL可视化工具navicat,Python3.5,pycharm

 

我采用的是MySQLdb操作的MYSQL数据库。先来一个简单的例子吧:

安装与准备

这是python3的库,所以windows下安装不会像python2那样各种奇葩VC错误。是比较方便的傻瓜安装。

  • Windows平台下: py -3 -m pip install PyMySQL
  • Linux:python3 pip install PyMySQL

当然,引入的时候:import pymysql


1
2
3
4
5
6
7
8
9
10
import MySQLdb
 
try:
    conn=MySQLdb.connect(host='localhost',user='root',passwd='root',db='test',port=3306)
    cur=conn.cursor()
    cur.execute('select * from user')
    cur.close()
    conn.close()
except MySQLdb.Error,e:
     print "Mysql Error %d: %s" % (e.args[0], e.args[1])

2.代码

# 创建连接

conn = pymysql.connect(host='localhost',port=3306,user='root',passwd='root',db='lexicon',charset='utf8')

# 创建游标

cursor_neg = conn.cursor()

cursor_neg.execute('select*from edu_01_neg')

cursor_pos = conn.cursor()

#定位到数据库中的两个表

cursor_pos.execute('select*from edu_01_pos')

# 获取剩余结果的第一行数据

# row_1 = cursor.fetchall()

# print(row_1)

#读取两个表中的数据

neg_cop=[]

pos_cop=[]

forr_negincursor_neg:

# print(str(r[1]))

neg_cop.append(str(r_neg[1]))

for r_pos in cursor_pos:

pos_cop.append(str(r_pos[1]))

#关闭连接

conn.commit()

cursor_neg.close()

cursor_pos.close()

conn.close()

PS:操作数据库之前需要首先打开数据库的

1
2
3
4
5
6
7
8
9
10
import MySQLdb
 
try:
    conn=MySQLdb.connect(host='localhost',user='root',passwd='root',db='test',port=3306)
    cur=conn.cursor()
    cur.execute('select * from user')
    cur.close()
    conn.close()
except MySQLdb.Error,e:
     print "Mysql Error %d: %s" % (e.args[0], e.args[1])
1
2
3
4
5
6
7
8
9
10
import MySQLdb
 
try:
    conn=MySQLdb.connect(host='localhost',user='root',passwd='root',db='test',port=3306)
    cur=conn.cursor()
    cur.execute('select * from user')
    cur.close()
    conn.close()
except MySQLdb.Error,e:
     print "Mysql Error %d: %s" % (e.args[0], e.args[1])

数据库连接对象connection

Function 描述
connection 创建connection对象
cursor() 使用该链接创建+返回游标
commit() 提交当前事务
rollback() 回滚当前十五
close() 关闭连接

介绍一下connection的参数

  1. host mysql服务器地址
  2. port 数字类型 端口
  3. user 用户名
  4. passwd 密码
  5. db 数据库名称
  6. charset 连接编码,需要显式指明编码方式

上一段代码示例:

conn = pymysql.Connect(host='127.0.0.1',port=3306,user='root',passwd='dyx240030',db='imooc',charset='utf8')
cursor = conn.cursor()
print(conn)
print(cursor)
cursor.close()
conn.close()

OUT:
<pymysql.connections.Connection object at 0x00000051C15BFDA0>
<pymysql.cursors.Cursor object at 0x00000051C15BFD68>

  请注意修改你的数据库,主机名,用户名,密码。

  请注意修改你的数据库,主机名,用户名,密码。

  请注意修改你的数据库,主机名,用户名,密码。

数据库游标对象cursor

Function 描述
execute(op[,args]) 执行一个数据库查询和命令
fetchone() 取得结果集下一行
fetchmany(size) 取得结果集size行
fetchall() 取得结果集剩下所有行
rowcount 最近一次execute返回数据的行数或影响行数
close() 关闭cursor

代码实现:

conn = pymysql.Connect(host='127.0.0.1',port=3306,user='root',passwd='dyx240030',db='imooc',charset='utf8')
cursor = conn.cursor()
sql = "select * from user"
cursor.execute(sql)
print("cursor.excute:",cursor.rowcount)

rs = cursor.fetchone()
print("rs:",rs)

for each in cursor.fetchmany(2):
    print(each)
print()
for each in cursor.fetchall():
    print(each)

OUT:
cursor.excute: 4
rs: ('1', 'name1')
('2', 'name2')
('3', 'name3')

('4', 'name4')

下面来大致演示一下插入数据,批量插入数据,更新数据的例子吧:

下面来大致演示一下插入数据,批量插入数据,更新数据的例子吧:

下面来大致演示一下插入数据,批量插入数据,更新数据的例子吧:

更新数据库insert/update/delete

不同于select操作,这三个操作修改了数据库内容,所以需要commit(),否则数据库没有做相应的更改,但是也不会报错。

按照一般的思路,一般是以下套路:

  1. 关闭自动commit:conn.autocommit(False)
  2. 出现:conn.rowback()回滚
  3. 未出现:conn.commit()

下面这段脚本,实现insert/update/delete操作。其实这种检错模式不对,这里只做简单raise,后面有更好的方法。

conn = pymysql.Connect(host='127.0.0.1',port=3306,user='root',passwd='dyx240030',db='imooc',charset='utf8')
conn.autocommit(False)
cursor = conn.cursor()

sqlInsert = "insert into user(userid,username) values('6','name6')"
sqlUpdate = "update user set username='name41' where userd='4'"
sqlDelete = "delete from user where userid='1'"
try:
    cursor.execute(sqlInsert)
    print(cursor.rowcount)
    cursor.execute(sqlUpdate)
    print(cursor.rowcount)
    cursor.execute(sqlDelete)
    print(cursor.rowcount)

    conn.commit()
except Exception as e:
    print("Reason:",e)
    conn.rollback()

cursor.close()
cursor.close()

[OUT]:
1
Reason: (1054, "Unknown column 'userd' in 'where clause'")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import MySQLdb
 
try:
    conn=MySQLdb.connect(host='localhost',user='root',passwd='root',port=3306)
    cur=conn.cursor()
     
    cur.execute('create database if not exists python')
    conn.select_db('python')
    cur.execute('create table test(id int,info varchar(20))')
     
    value=[1,'hi rollen']
    cur.execute('insert into test values(%s,%s)',value)
     
    values=[]
    for in range(20):
        values.append((i,'hi rollen'+str(i)))
         
    cur.executemany('insert into test values(%s,%s)',values)
 
    cur.execute('update test set info="I am rollen" where id=3')
 
    conn.commit()
    cur.close()
    conn.close()
 
except MySQLdb.Error,e:
     print "Mysql Error %d: %s" % (e.args[0], e.args[1])

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import MySQLdb
 
try:
    conn=MySQLdb.connect(host='localhost',user='root',passwd='root',port=3306)
    cur=conn.cursor()
     
    cur.execute('create database if not exists python')
    conn.select_db('python')
    cur.execute('create table test(id int,info varchar(20))')
     
    value=[1,'hi rollen']
    cur.execute('insert into test values(%s,%s)',value)
     
    values=[]
    for in range(20):
        values.append((i,'hi rollen'+str(i)))
         
    cur.executemany('insert into test values(%s,%s)',values)
 
    cur.execute('update test set info="I am rollen" where id=3')
 
    conn.commit()
    cur.close()
    conn.close()
 
except MySQLdb.Error,e:
     print "Mysql Error %d: %s" % (e.args[0], e.args[1])

实例 银行转账

可以看一下类思想的SQL操作,其中之前提到过的高级报错模式用到了之前看似无用的rowcount函数,通过查看操作对于数据库的影响来检错。

import os
import sys
import pymysql

class transferMoney(object):
    def __init__(self,conn):
        self.conn = conn
    def transfer(self,sourceID,targetID,money):
        #   其他函数中若是有错会抛出异常而被检测到。
        try:
            self.checkIdAvailable(sourceID)
            self.checkIdAvailable(targetID)
            self.ifEnoughMoney(sourceID,money)
            self.reduceMoney(sourceID,money)
            self.addMoney(targetID,money)
            self.conn.commit()
        except Exception as e:
            self.conn.rollback()
            raise e
    def checkIdAvailable(self,ID):
        cursor = self.conn.cursor()
        try:
            sql = "select * from account where id = %d" % ID #select语句判断可以用len(rs)
            cursor.execute(sql)
            rs=  cursor.fetchall()
            if len(rs) != 1:#   数据库类思想的报错模式,检查操作对数据库的影响条目。没有达到目标,抛出异常
                raise Exception("账号 %d 不存在"%ID)
        finally:
            cursor.close()

    def ifEnoughMoney(self,ID,money):
        cursor = self.conn.cursor()
        try:
            sql = "select * from account where id = %d and money>=%d" % (ID,money)
            cursor.execute(sql)
            rs=  cursor.fetchall()
            if len(rs) != 1:
                raise Exception("账号 %d 不存在 %d Yuan"%(ID,money))
        finally:
            cursor.close()

    def reduceMoney(self,ID,money):
        cursor = self.conn.cursor()
        try:
            sql = "update account set money = money-%d where id=%d"%(money,ID)
            cursor.execute(sql)
            if cursor.rowcount != 1:
                raise Exception("失败减钱")
        finally:
            cursor.close()

    def addMoney(self,ID,money):
        cursor = self.conn.cursor()
        try:
            sql = "update account set money = money+%d where id=%d"%(money,ID)
            cursor.execute(sql)
            if cursor.rowcount != 1:
                raise Exception("失败加款")
        finally:
            cursor.close()


if __name__=="__main__":
    if len(sys.argv)>=2:
        sourceID = int(sys.argv[1])
        targetID = int(sys.argv[2])
        money = int(sys.argv[3])

        conn = pymysql.Connect(host='127.0.0.1',port=3306,user='root',passwd='dyx240030',db='imooc',charset='utf8')
        trMoney = transferMoney(conn)

        try:
            trMoney.transfer(sourceID,targetID,money)
        except  Exception as e:
            print("出现问题"+str(e))
        finally:
            conn.close()

  请注意一定要有conn.commit()这句来提交事务要不然不能真正的插入数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import MySQLdb
 
try:
    conn=MySQLdb.connect(host='localhost',user='root',passwd='root',port=3306)
    cur=conn.cursor()
     
    cur.execute('create database if not exists python')
    conn.select_db('python')
    cur.execute('create table test(id int,info varchar(20))')
     
    value=[1,'hi rollen']
    cur.execute('insert into test values(%s,%s)',value)
     
    values=[]
    for i in range(20):
        values.append((i,'hi rollen'+str(i)))
         
    cur.executemany('insert into test values(%s,%s)',values)
 
    cur.execute('update test set info="I am rollen" where id=3')
 
    conn.commit()
    cur.close()
    conn.close()
 
except MySQLdb.Error,e:
     print "Mysql Error %d: %s" % (e.args[0], e.args[1])

  请注意一定要有conn.commit()这句来提交事务要不然不能真正的插入数据。

我踩过的坑。。。(这里是Python3哦)

运行之后我的MySQL数据库的结果就不上图了。

  请注意一定要有conn.commit()这句来提交事务要不然不能真正的插入数据。

运行之后我的MySQL数据库的结果就不上图了。

‘NoneType’ object has no attribute ‘encoding’ ,之前指明的charset必须是”UTF8″,不是”utf-8″/”UTF-8″
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import MySQLdb
 
try:
    conn=MySQLdb.connect(host='localhost',user='root',passwd='root',port=3306)
    cur=conn.cursor()
     
    conn.select_db('python')
 
    count=cur.execute('select * from test')
    print 'there has %s rows record' % count
 
    result=cur.fetchone()
    print result
    print 'ID: %s info %s' % result
 
    results=cur.fetchmany(5)
    for in results:
        print r
 
    print '=='*10
    cur.scroll(0,mode='absolute')
 
    results=cur.fetchall()
    for in results:
        print r[1]
     
 
    conn.commit()
    cur.close()
    conn.close()
 
except MySQLdb.Error,e:
     print "Mysql Error %d: %s" % (e.args[0], e.args[1])

运行之后我的MySQL数据库的结果就不上图了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import MySQLdb
 
try:
    conn=MySQLdb.connect(host='localhost',user='root',passwd='root',port=3306)
    cur=conn.cursor()
     
    conn.select_db('python')
 
    count=cur.execute('select * from test')
    print 'there has %s rows record' % count
 
    result=cur.fetchone()
    print result
    print 'ID: %s info %s' % result
 
    results=cur.fetchmany(5)
    for in results:
        print r
 
    print '=='*10
    cur.scroll(0,mode='absolute')
 
    results=cur.fetchall()
    for in results:
        print r[1]
     
 
    conn.commit()
    cur.close()
    conn.close()
 
except MySQLdb.Error,e:
     print "Mysql Error %d: %s" % (e.args[0], e.args[1])
MySQL语句后面必须有’;’,否则不会报错,也难以发现

  运行结果就不贴了,太长了。

 

  运行结果就不贴了,太长了。

数据库insert/update/delete操作需要commit()

查询后中文会正确显示,但在数据库中却是乱码的。经过我从网上查找,发现用一个属性有可搞定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import MySQLdb
 
try:
    conn=MySQLdb.connect(host='localhost',user='root',passwd='root',port=3306)
    cur=conn.cursor()
     
    conn.select_db('python')
 
    count=cur.execute('select * from test')
    print 'there has %s rows record' % count
 
    result=cur.fetchone()
    print result
    print 'ID: %s info %s' % result
 
    results=cur.fetchmany(5)
    for r in results:
        print r
 
    print '=='*10
    cur.scroll(0,mode='absolute')
 
    results=cur.fetchall()
    for r in results:
        print r[1]
     
 
    conn.commit()
    cur.close()
    conn.close()
 
except MySQLdb.Error,e:
     print "Mysql Error %d: %s" % (e.args[0], e.args[1])

查询后中文会正确显示,但在数据库中却是乱码的。经过我从网上查找,发现用一个属性有可搞定:

在构造命令的时候,注意用 ” 包裹起来,因为SQL语句字符串需要 ‘ 包裹。所以,” 比较简单的避免歧义。

在Python代码 

  运行结果就不贴了,太长了。

在Python代码 

conn = MySQLdb.Connect(host=’localhost’, user=’root’, passwd=’root’,
db=’python’) 中加一个属性:
 改为:
conn = MySQLdb.Connect(host=’localhost’, user=’root’, passwd=’root’,
db=’python’,charset=’utf8′) 
charset是要跟你数据库的编码一样,如果是数据库是gb2312
,则写charset=’gb2312’。

查询后中文会正确显示,但在数据库中却是乱码的。经过我从网上查找,发现用一个属性有可搞定:

conn = MySQLdb.Connect(host=’localhost’, user=’root’, passwd=’root’,
db=’python’) 中加一个属性:
 改为:
conn = MySQLdb.Connect(host=’localhost’, user=’root’, passwd=’root’,
db=’python’,charset=’utf8′) 
charset是要跟你数据库的编码一样,如果是数据库是gb2312
,则写charset=’gb2312’。

 

在Python代码 

 

下面贴一下常用的函数:

conn = MySQLdb.Connect(host=’localhost’, user=’root’, passwd=’root’,
db=’python’) 中加一个属性:
 改为:
conn = MySQLdb.Connect(host=’localhost’, user=’root’, passwd=’root’,
db=’python’,charset=’utf8′) 
charset是要跟你数据库的编码一样,如果是数据库是gb2312
,则写charset=’gb2312′。

下面贴一下常用的函数:

然后,这个连接对象也提供了对事务操作的支持,标准的方法
commit() 提交
rollback() 回滚

 

然后,这个连接对象也提供了对事务操作的支持,标准的方法
commit() 提交
rollback() 回滚

cursor用来执行命令的方法:
callproc(self, procname,
args):用来执行存储过程,接收的参数为存储过程名和参数列表,返回值为受影响的行数
execute(self, query,
args):执行单条sql语句,接收的参数为sql语句本身和使用的参数列表,返回值为受影响的行数
executemany(self, query,
args):执行单挑sql语句,但是重复执行参数列表里的参数,返回值为受影响的行数
nextset(self):移动到下一个结果集

下面贴一下常用的函数:

cursor用来执行命令的方法:
callproc(self, procname,
args):用来执行存储过程,接收的参数为存储过程名和参数列表,返回值为受影响的行数
execute(self, query,
args):执行单条sql语句,接收的参数为sql语句本身和使用的参数列表,返回值为受影响的行数
executemany(self, query,
args):执行单挑sql语句,但是重复执行参数列表里的参数,返回值为受影响的行数
nextset(self):移动到下一个结果集

cursor用来接收返回值的方法:
fetchall(self):接收全部的返回结果行.
fetchmany(self,
size=None):接收size条返回结果行.如果size的值大于返回的结果行的数量,则会返回cursor.arraysize条数据.
fetchone(self):返回一条结果行.
scroll(self, value,
mode=’relative’):移动指针到某一行.如果mode=’relative’,则表示从当前所在行移动value条,如果
mode=’absolute’,则表示从结果集的第一行移动value条.

然后,这个连接对象也提供了对事务操作的支持,标准的方法
commit() 提交
rollback() 回滚

cursor用来接收返回值的方法:
fetchall(self):接收全部的返回结果行.
fetchmany(self,
size=None):接收size条返回结果行.如果size的值大于返回的结果行的数量,则会返回cursor.arraysize条数据.
fetchone(self):返回一条结果行.
scroll(self, value,
mode=’relative’):移动指针到某一行.如果mode=’relative’,则表示从当前所在行移动value条,如果
mode=’absolute’,则表示从结果集的第一行移动value条.

参考资料:

cursor用来执行命令的方法:
callproc(self, procname,
args):用来执行存储过程,接收的参数为存储过程名和参数列表,返回值为受影响的行数
execute(self, query,
args):执行单条sql语句,接收的参数为sql语句本身和使用的参数列表,返回值为受影响的行数
executemany(self, query,
args):执行单挑sql语句,但是重复执行参数列表里的参数,返回值为受影响的行数
nextset(self):移动到下一个结果集

参考资料:

MySQLdb‘s user guide

cursor用来接收返回值的方法:
fetchall(self):接收全部的返回结果行.
fetchmany(self,
size=None):接收size条返回结果行.如果size的值大于返回的结果行的数量,则会返回cursor.arraysize条数据.
fetchone(self):返回一条结果行.
scroll(self, value,
mode=’relative’):移动指针到某一行.如果mode=’relative’,则表示从当前所在行移动value条,如果
mode=’absolute’,则表示从结果集的第一行移动value条.

MySQLdb‘s user guide

package
MySQLdb

参考资料:

package
MySQLdb

 

MySQLdb‘s user guide

package MySQLdb

1 2 3 4 5 6 7 8 9 10 import MySQLdb try : conn = MySQLdb.connect(host =
localhost ,user = root ,pas…

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图