环境:Windows 10_x64
python版本:3.9.2
sqlite3版本:3.34.0
日常python开发中会遇到数据持久化的问题,今天记录下如何使用sqlite3进行数据持久化,并提供示例代码及数据查看工具。
一、背景描述
python应用程序在运行过程中被kill掉(比如版本升级等情况),内存中的运行数据将会丢失,如果能够准实时将数据存储在缓存中,程序下次启动时将继续执行被kill之前的动作。
使用数据库作为持久化是笔记理想的选择,可现实情况可能需要python脚本进行本地持久化,相较于pickle等方式,sqlite3的持久化方式可扩展性比较强,方便后续迁移到mysql等数据库。
二、具体实现
1、基础使用示例
查看版本信息
import sqlite3
print(sqlite3.version_info) #显示sqlite3版本信息
print(sqlite3.sqlite_version) #显示SQLite版本信息
数据库创建或连接
conn = sqlite3.connect("test1.db")
cur = conn.cursor()
数据表创建
dbCreate = '''
CREATE TABLE user(
user_id int,
user_name text,
password text
)
'''
cur.executescript(dbCreate)
conn.commit()
插入数据
conn.execute("INSERT INTO user (user_id,user_name,password) VALUES(1,'user1','123456')")
conn.commit()
查询数据
cursor = conn.execute("SELECT * FROM user")
for row in cursor.fetchall():
print(row)
完整示例代码如下:
#! /usr/bin/env python3
#--*-- ebcoding:utf-8 --*--
import sqlite3
dbCreate = '''
CREATE TABLE user(
user_id int,
user_name text,
password text
)
'''
# 创建或连接数据库
conn = sqlite3.connect("test1.db")
cur = conn.cursor()
cur.executescript(dbCreate)
conn.commit()
conn.execute("INSERT INTO user (user_id,user_name,password) VALUES(1,'user1','123456')")
conn.execute("INSERT INTO user (user_id,user_name,password) VALUES(2,'user2','123457')")
conn.commit()
cursor = conn.execute("SELECT * FROM user")
for row in cursor.fetchall():
print(row)
conn.close()
运行效果:
生成的 test1.db 数据库文件可以使用 sqlitestudio 进行查看,软件运行效果如下:
下载地址:https://sqlitestudio.pl/
如果下载过慢,可从文末提供的渠道获取。
2、代码封装示例
实际使用过程中,需要进行代码封装,以便后续使用,这里以通话记录为例演示下封装过程。
1)定义全局缓存 gDCdrData,用于在内存中存放数据;
2)定义全局队列 gQCDR ,用于数据传递;
3)定义sqlite3持久化文件,并定义保留时长;
4)创建 ObjCdrItem 类,用于定义item数据结构;
5)定义sqlite3管理类 ObjCDRMgr,用于管理sqlite3数据库;
6)启动 dataGenThrd 线程模拟数据创建;
7)启动 updateThrd 线程用于数据更新;
8)启动 cleanThrd 线程用于数据清理;
完整示例代码如下:
#-*- coding:utf-8 -*-
# 堆代码 duidaima.com
import sqlite3,json,uuid,queue,random
import logging,logging.handlers
import os,sys,traceback,time,datetime
from threading import Thread
logger = logging.getLogger()
def doDictClean(dCacheData,lable="addTime",timeout=3600*6):
curTime = int(time.time())
dTmp = {}
for callId,tmpObj in list(dCacheData.items()):
if not tmpObj : continue
tTime = tmpObj.__dict__.get(lable) or 0
if curTime - tTime > timeout: continue
dTmp[callId] = tmpObj
dCacheData.clear()
dCacheData.update(dTmp)
class ObjCdrItem(): # only store data
def __init__(self):
self.addTime = int(time.time())
self.sip_call_id = ""
self.caller_number = ""
self.callee_number = ""
self.start_time = None
self.answer_time = None
self.hangup_time = None
def update(self,dUpdate:dict):
self.__dict__.update(dUpdate)
def dumps(self):
retArr = []
for k in ["sip_call_id","start_time","answer_time","hangup_time"]:
v = self.__dict__.get(k)
retArr.append(v)
return retArr
class ObjCDRMgr(object):
def __init__(self,dbFile):
self.dbFile = dbFile
self.tbName = "call_detail"
self.keyid = 'sip_call_id'
self.tbFields = ['sip_call_id','caller_number','callee_number','start_time','answer_time','hangup_time']
self.conn = None
self.cur = None
self.lastCommitTime = int(time.time())
self.lastQueryTime = int(time.time())
self.logger = logging.getLogger()
def connect(self):
try:
self.conn = sqlite3.connect(self.dbFile)
self.cur = self.conn.cursor()
except:
self.conn = None
return self.conn,self.cur
def initDB(self):
bRet = False
dbCreate = '''
CREATE TABLE if not exists "{tbName}" (
"id" INTEGER,
"sip_call_id" VARCHAR(128) NULL DEFAULT NULL,
"caller_number" VARCHAR(50) NULL DEFAULT NULL,
"callee_number" VARCHAR(50) NULL DEFAULT NULL,
"start_time" DATETIME DEFAULT NULL,
"answer_time" DATETIME DEFAULT NULL,
"hangup_time" DATETIME DEFAULT NULL,
"create_time" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY("id" AUTOINCREMENT)
);
create index if not exists "sip_call_id" on {tbName}("sip_call_id");
create index if not exists "start_time" on {tbName}("start_time");
'''.format(tbName=self.tbName)
self.connect()
try:
#self.cur.execute(dbCreate)
self.cur.executescript(dbCreate)
self.conn.commit()
bRet = True
except:
pass
return bRet
def execute(self,query,doCommit=False):
'''exec sql insert update or delete'''
retCount = 0
try:
self.logger.info(query)
res = self.cur.executescript(query)
retCount = res.rowcount
except :
self.logger.warning("execute fail : %s" % query)
self.logger.debug(traceback.format_exc())
self.connect()
if self.conn :
res = self.cur.executescript(query)
retCount = res.rowcount
if doCommit :
self.commit()
return retCount
def commit(self):
if self.conn :
try :
curTime = int(time.time())
if int(curTime - self.lastCommitTime) > 0:
self.lastCommitTime = curTime
self.lastQueryTime = curTime
self.conn.commit()
except :
self.logger.warning("commit exception occur")
self.logger.debug(traceback.format_exc())
return None
def rollback(self):
if self.conn :
try :
self.conn.rollback()
self.logger.info("rollback ...")
except :
pass
return None
def _real_execWithRet(self,query:str):
self.logger.debug(query)
self.lastQueryTime = int(time.time())
res = self.cur.execute(query)
arrResult = res.fetchall()
self.logger.debug("count = {0}".format(len(arrResult)))
return arrResult
def execWithRet(self,query:str):
'''exec query '''
retList = []
try :
retList = list(self._real_execWithRet(query))
except :
self.connect()
if self.conn :
retList = list(self._real_execWithRet(query))
return retList
def close(self):
if self.conn:
self.conn.commit()
self.conn.close()
self.conn = None
def genFieldSec(self,arrFields):
fstr = "','".join(arrFields)
ret = "('%s')" % fstr
return ret
def genValueSec(self,arrFields):
fstr = "}','{".join(arrFields)
ret = "'{%s}'" % fstr
return ret
def genInsertQuery(self,dValue):
arrFields = dValue.keys()
queryPattern = "insert into %s %s values (%s);" % (self.tbName,
self.genFieldSec(arrFields),self.genValueSec(arrFields))
query = queryPattern.format(**dValue)
return query
def genUpdateQuery(self,dValue,keyId,callId,extCond=""):
arrTmp = []
for k in dValue.keys() :
if k in [keyId] : continue
arrTmp.append("'%s'='{%s}'" % (k,k))
section = ",".join(arrTmp)
queryPattern = "update %s set %s where %s='%s'" % (self.tbName,section,keyId,callId)
if len(extCond) > 0 :
queryPattern = "%s and %s" % (queryPattern,extCond)
queryPattern += ";"
query = queryPattern.format(**dValue)
return query
def doInsert(self,dItem:dict):
dtmp = {}
for k,v in dItem.items():
if not k in self.tbFields: continue
dtmp[k] = v
query = self.genInsertQuery(dtmp)
return self.execute(query,True)
def doUpdate(self,dItem:dict,callId:str):
query = self.genUpdateQuery(dItem,self.keyid,callId)
return self.execute(query,True)
def dumpData(self):
self.connect()
query = "select %s from %s;" % (",".join(self.tbFields),self.tbName)
return self.execWithRet(query)
def doClean(self,timeout:int):
self.connect()
resveTime = int(time.time()) - timeout
query = "delete from %s where start_time < %d;" % (self.tbName,resveTime)
self.logger.info(query)
return self.execute(query,True)
def loadCdrFromDB(cdrMgr,dCdrData:dict,dbFile:str):
for item in cdrMgr.dumpData():
dtmp = dict(zip(cdrMgr.tbFields,item))
sipCallId = dtmp.get("sip_call_id") or ""
if 0 == len(sipCallId) : continue
cdrItem = ObjCdrItem()
cdrItem.update(dtmp)
dCdrData[sipCallId] = cdrItem
return None
def initData(dCdrData,dbFile):
logger.info("init data begin")
sdfFileExist = os.path.isfile(dbFile)
try:
if sdfFileExist:
# load from db file
cdrMgr = ObjCDRMgr(dbFile)
cdrMgr.initDB()
loadCdrFromDB(cdrMgr,dCdrData, dbFile)
cdrMgr.close()
logger.info("load data from %s ok , len(gDCdrData) = %d" % (dbFile,len(dCdrData)))
except:
logger.info("initData not success")
logger.debug(traceback.format_exc())
logger.info("init data end")
class dataGenThrd(Thread):
def __init__(self,qCDR):
super().__init__()
self.qCDR = qCDR
def run(self):
logger.info("cdr generate task start")
arrCaller = ["13012345678","13112345678","13212345678"]
arrCallee = ["13312345678","13142345678","13512345678"]
while True :
dtmp = {
'sip_call_id' : str(uuid.uuid4()),
'caller_number' : random.choice(arrCaller),
'callee_number' : random.choice(arrCallee),
'start_time' : str(datetime.datetime.now()),
'answer_time' : str(datetime.datetime.now()),
'hangup_time' : str(datetime.datetime.now())
}
logger.debug(dtmp)
self.qCDR.put(json.dumps(dtmp))
time.sleep(10)
class updateThrd(Thread):
def __init__(self,dbFile,qCDR,dCdrData):
super().__init__()
self.dbFile = dbFile
self.qCDR = qCDR
self.dCdrData = dCdrData
self.cdrMgr = ObjCDRMgr(self.dbFile)
def doTask(self):
cdrItem,lastCdrItem = None,None
while True:
try :
cdrItem = self.qCDR.get(timeout=3)
except:
cdrItem = None
if not cdrItem : continue
qSize = self.qCDR.qsize()
if lastCdrItem == cdrItem :
logger.debug("qCDR size : %d ,lastCdrItem == cdrItem , skip" % qSize)
continue
logger.info("qCDR size : %d ,cdrItem : %s" % (qSize,str(cdrItem)))
sipCallId,dTmp = "",{}
try:
dTmp = json.loads(cdrItem)
sipCallId = dTmp.get("sip_call_id") or ""
except:
sipCallId,dTmp = "",{}
if 0 in [len(sipCallId),len(dTmp)]:
logger.info("sipCallId or dTmp is empty , skip")
continue
try:
flagInsert = False
if not sipCallId in self.dCdrData:
self.dCdrData[sipCallId] = ObjCdrItem()
logger.info("add sipCallId(%s)" % sipCallId)
flagInsert = True
refSDF = self.dCdrData[sipCallId]
refSDF.update(dTmp)
if flagInsert :
self.cdrMgr.doInsert(dTmp)
else:
self.cdrMgr.doUpdate(dTmp,sipCallId)
lastCdrItem = cdrItem
except:
logger.info("sdfUpdate task exception,please check")
logger.debug(traceback.format_exc())
time.sleep(1)
return None
def run(self):
logger.info("cdr update task start")
try :
self.cdrMgr.initDB()
loadCdrFromDB(self.cdrMgr,self.dCdrData,self.dbFile)
self.doTask()
except:
logger.info("try next loop")
logger.debug(traceback.format_exc())
time.sleep(10)
class cleanThrd(Thread):
def __init__(self,dbFile,reserveDays,dCdrData):
super().__init__()
self.dbFile = dbFile
self.reserveDays = reserveDays
self.dCdrData = dCdrData
def run(self):
logger.info("clean data task start")
#cacheTimeout = 5 * 3600 # seconds
reserveTimeout = self.reserveDays * 24 * 3600 # seconds
arrHour = [2,3,4,5,6,]
while True:
try:
now = datetime.datetime.now()
if now.hour in arrHour :
for callId,dtmp in self.dCdrData.items():
doDictClean(dtmp,"addTime",reserveTimeout)
cdrMgr = ObjCDRMgr(self.dbFile)
cdrMgr.doClean(reserveTimeout)
logger.info("dCdrData len : %d , memory size : %d" % (len(self.dCdrData),sys.getsizeof(self.dCdrData)))
except:
logger.warning("data clean fail")
logger.debug(traceback.format_exc())
time.sleep(2* 60 * 60)
def main():
gDCdrData = {} # {"call_id" :ObjCdrItem,}
gQCDR = queue.Queue()
dbFile,reserveDays = "test1.db",15
logging.basicConfig(
level=logging.DEBUG, # DEBUG,INFO,WARNING,ERROR,CRITICAL
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S'
)
logger.info("dbFile : %s " % dbFile)
initData(gDCdrData,dbFile)
arrTask = []
arrTask.append(dataGenThrd(gQCDR))
arrTask.append(updateThrd(dbFile,gQCDR,gDCdrData))
arrTask.append(cleanThrd(dbFile,reserveDays,gDCdrData))
for t in arrTask:
t.start()
while True:
time.sleep(1)
if __name__ == "__main__":
main()
else:
print("import as module")
运行效果如下:
数据文件打开效果如下:
三、总结
好,就本文就说这么多了,有问题欢迎留言评论!