-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpydbapi.py
89 lines (74 loc) · 2.91 KB
/
pydbapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# -*- coding: utf-8 -*-
'''
Some wappers for python [synchronous] database operation
Tested with PyMysql. NEED MORE TEST.
'''
import sys
def runInteraction(conn, interaction, *args):
"""
A logic execution unit wapper with transaction support naturally.
Condition: 1. autocommit: off .
Example:
def interaction( txn, name, age):
sql = "select * from user where name=%s and age=%s"
txn.execute( sql, (name, age))
return txn.fetchall()
"""
trans = conn.cursor()
try:
result = interaction(trans, *args)
trans.close()
conn.commit()
return result
except:
excType, excValue, excTraceback = sys.exc_info()
try:
conn.rollback()
except:
pass
raise excType, excValue, excTraceback
def runInsertWithId(conn, interaction, *args):
"""
Run insert sql, and return the id
"""
runInteraction(conn, interaction, *args)
return conn.insert_id()
def runUpdateWithUpdateRows(self, conn, interaction, *args, **kw):
"""
You may need the affected rows count when implement Optimistic Lock with select - update.
"""
rowCount = runInteraction(conn, interaction, args, kw)
return rowCount
def getModifiedSql(conn, sql, *args):
"""
Get escaped sql
"""
return conn.cursor().mogrify(sql, *args)
"""
Simple Not-ORM interaction api wrapper
"""
def insert(txn, tableName, dataDict):
keys = dataDict.keys()
values = dataDict.values()
bindParam = ",".join( [ "%s" for _ in keys ] )
fields = ",".join( [ i for i in keys ] )
sql = "insert into {TABLENAME}({FIELDS}) values({BIND_PARAM})".\
format(TABLENAME=tableName, FIELDS=fields, BIND_PARAM = bindParam )
return txn.execute(sql, tuple(values))
def select(txn, tableName, fields=[ "*"], wheres={"1":"1"}):
fieldParam = ",".join(fields )
whereParam = " and ".join( [ "{KEY}=%s".format(KEY=key) for key in wheres.keys() ] )
sql = "select {FIELDS} from {TABLENAME} where {CONDITION}".\
format(FIELDS=fieldParam, TABLENAME=tableName, CONDITION=whereParam)
txn.execute(sql, tuple(wheres.values()))
return txn.fetchall()
def update(txn, tableName, dataDict, wheres):
fieldParam = ",".join( [ "{KEY}=%s".format(KEY=key) for key in dataDict.keys()] )
whereParam = " and ".join( [ "{KEY}=%s".format(KEY=key) for key in wheres.keys() ] )
sql = "update {TABLENAME} set {FIELDS} where {CONDITION}".\
format(FIELDS=fieldParam, TABLENAME=tableName, CONDITION=whereParam)
return txn.execute(sql, tuple( dataDict.values() + wheres.values() ) )
def delete(txn, tableName, wheres):
whereParam = " and ".join( [ "{KEY}=%s".format(KEY=key) for key in wheres.keys() ] )
sql = "delete from {TABLENAME} where {CONDITION}".format(TABLENAME=tableName, CONDITION=whereParam)
return txn.execute(sql, tuple(wheres.values()))