• 如何在Python中使用sqlite3进行数据持久化操作
  • 发布于 2个月前
  • 194 热度
    0 评论
环境:Windows 10_x64
python版本:3.9.2
sqlite3版本:3.34.0
日常python开发中会遇到数据持久化的问题,今天记录下如何使用sqlite3进行数据持久化,并提供示例代码及数据查看工具。

一、背景描述
python应用程序在运行过程中被kill掉(比如版本升级等情况),内存中的运行数据将会丢失,如果能够准实时将数据存储在缓存中,程序下次启动时将继续执行被kill之前的动作。

使用数据库作为持久化是笔记理想的选择,可现实情况可能需要python脚本进行本地持久化,相较于pickle等方式,sqlite3的持久化方式可扩展性比较强,方便后续迁移到mysql等数据库。

二、具体实现
1、基础使用示例
查看版本信息
import sqlite3
print(sqlite3.version_info) #显示sqlite3版本信息
print(sqlite3.sqlite_version) #显示SQLite版本信息
数据库创建或连接
conn = sqlite3.connect("test1.db")
cur = conn.cursor()
数据表创建
dbCreate = '''
CREATE TABLE user(
    user_id int,
    user_name text,
    password text
)
'''
cur.executescript(dbCreate)
conn.commit()
插入数据
conn.execute("INSERT INTO user (user_id,user_name,password) VALUES(1,'user1','123456')")
conn.commit()
查询数据
cursor = conn.execute("SELECT * FROM user")
for row in cursor.fetchall():
    print(row)
完整示例代码如下:
#! /usr/bin/env python3
#--*-- ebcoding:utf-8 --*--

import sqlite3

dbCreate = '''
CREATE TABLE user(
    user_id int,
    user_name text,
    password text
)
'''

# 创建或连接数据库
conn = sqlite3.connect("test1.db")
cur = conn.cursor()

cur.executescript(dbCreate)
conn.commit()

conn.execute("INSERT INTO user (user_id,user_name,password) VALUES(1,'user1','123456')")
conn.execute("INSERT INTO user (user_id,user_name,password) VALUES(2,'user2','123457')")
conn.commit()

cursor = conn.execute("SELECT * FROM user")
for row in cursor.fetchall():
    print(row)

conn.close()
运行效果:

生成的 test1.db 数据库文件可以使用 sqlitestudio 进行查看,软件运行效果如下:

下载地址:https://sqlitestudio.pl/
如果下载过慢,可从文末提供的渠道获取。

2、代码封装示例
实际使用过程中,需要进行代码封装,以便后续使用,这里以通话记录为例演示下封装过程。
1)定义全局缓存 gDCdrData,用于在内存中存放数据;
2)定义全局队列 gQCDR ,用于数据传递;
3)定义sqlite3持久化文件,并定义保留时长;
4)创建 ObjCdrItem 类,用于定义item数据结构;
5)定义sqlite3管理类 ObjCDRMgr,用于管理sqlite3数据库;
6)启动 dataGenThrd 线程模拟数据创建;
7)启动 updateThrd 线程用于数据更新;
8)启动 cleanThrd 线程用于数据清理;

完整示例代码如下:
#-*- coding:utf-8 -*-
# 堆代码 duidaima.com
import sqlite3,json,uuid,queue,random
import logging,logging.handlers
import os,sys,traceback,time,datetime
from threading import Thread

logger = logging.getLogger()

def doDictClean(dCacheData,lable="addTime",timeout=3600*6):
    curTime = int(time.time())
    dTmp = {}
    for callId,tmpObj in list(dCacheData.items()): 
        if not tmpObj : continue 
        tTime = tmpObj.__dict__.get(lable) or 0
        if curTime - tTime > timeout: continue  
        dTmp[callId] = tmpObj
    dCacheData.clear() 
    dCacheData.update(dTmp)

class ObjCdrItem(): # only store data 
    def __init__(self):
        self.addTime = int(time.time())
        self.sip_call_id = ""
        self.caller_number = ""
        self.callee_number = ""
        self.start_time = None
        self.answer_time = None
        self.hangup_time = None

    def update(self,dUpdate:dict):
        self.__dict__.update(dUpdate)

    def dumps(self):
        retArr = []
        for k in ["sip_call_id","start_time","answer_time","hangup_time"]:
            v = self.__dict__.get(k)
            retArr.append(v)
        return retArr

class ObjCDRMgr(object):
    def __init__(self,dbFile):
        self.dbFile = dbFile
        self.tbName = "call_detail"
        self.keyid = 'sip_call_id'
        self.tbFields = ['sip_call_id','caller_number','callee_number','start_time','answer_time','hangup_time']
        self.conn = None 
        self.cur = None 
        self.lastCommitTime = int(time.time())
        self.lastQueryTime = int(time.time())
        self.logger = logging.getLogger()

    def connect(self):
        try:
            self.conn = sqlite3.connect(self.dbFile)
            self.cur = self.conn.cursor()
        except:
            self.conn = None
        return self.conn,self.cur

    def initDB(self):
        bRet = False
        dbCreate = '''
            CREATE TABLE if not exists "{tbName}" (
                "id" INTEGER,
                "sip_call_id" VARCHAR(128) NULL DEFAULT NULL,
                "caller_number" VARCHAR(50) NULL DEFAULT NULL,
                "callee_number" VARCHAR(50) NULL DEFAULT NULL,

                "start_time" DATETIME DEFAULT NULL,
                "answer_time" DATETIME DEFAULT NULL,
                "hangup_time" DATETIME DEFAULT NULL,
                "create_time" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

                PRIMARY KEY("id" AUTOINCREMENT)
            );
            create index  if not exists "sip_call_id" on {tbName}("sip_call_id");
            create index  if not exists "start_time" on {tbName}("start_time");
        '''.format(tbName=self.tbName)
        self.connect()
        try:
            #self.cur.execute(dbCreate)
            self.cur.executescript(dbCreate)
            self.conn.commit()
            bRet = True
        except:
            pass 
        return bRet

    def execute(self,query,doCommit=False):
        '''exec sql insert update or delete'''
        retCount = 0
        try:
            self.logger.info(query)
            res = self.cur.executescript(query)
            retCount = res.rowcount
        except :
            self.logger.warning("execute fail : %s" % query)
            self.logger.debug(traceback.format_exc())
            self.connect()
            if self.conn :
                res = self.cur.executescript(query)
                retCount = res.rowcount
        if doCommit :
            self.commit()
        return retCount

    def commit(self):
        if self.conn :
            try :
                curTime = int(time.time())
                if int(curTime - self.lastCommitTime) > 0:
                    self.lastCommitTime = curTime
                    self.lastQueryTime = curTime
                    self.conn.commit()
            except :
                self.logger.warning("commit exception occur")
                self.logger.debug(traceback.format_exc())
        return None

    def rollback(self):
        if self.conn :
            try :
                self.conn.rollback()
                self.logger.info("rollback ...")
            except :
                pass
        return None

    def _real_execWithRet(self,query:str):
        self.logger.debug(query)
        self.lastQueryTime = int(time.time())
        res = self.cur.execute(query)
        arrResult = res.fetchall()
        self.logger.debug("count = {0}".format(len(arrResult)))
        return arrResult

    def execWithRet(self,query:str):
        '''exec query '''
        retList = []
        try :
            retList = list(self._real_execWithRet(query))
        except :
            self.connect()
            if self.conn :
                retList = list(self._real_execWithRet(query))
        return retList

    def close(self):
        if self.conn:
            self.conn.commit()
            self.conn.close()
            self.conn = None

    def genFieldSec(self,arrFields):
        fstr = "','".join(arrFields)
        ret = "('%s')" % fstr
        return ret

    def genValueSec(self,arrFields):
        fstr = "}','{".join(arrFields)
        ret = "'{%s}'" % fstr
        return ret

    def genInsertQuery(self,dValue):
        arrFields = dValue.keys()
        queryPattern = "insert into %s %s values (%s);" % (self.tbName,
        self.genFieldSec(arrFields),self.genValueSec(arrFields))
        query = queryPattern.format(**dValue)
        return query

    def genUpdateQuery(self,dValue,keyId,callId,extCond=""):
        arrTmp = []
        for k in dValue.keys() :
            if k in [keyId] : continue
            arrTmp.append("'%s'='{%s}'" % (k,k))
        section = ",".join(arrTmp)
        queryPattern = "update %s set %s where %s='%s'" % (self.tbName,section,keyId,callId)
        if len(extCond) > 0 :
            queryPattern = "%s and %s" % (queryPattern,extCond)
        queryPattern += ";"
        query = queryPattern.format(**dValue)
        return query

    def doInsert(self,dItem:dict):
        dtmp = {}
        for k,v in dItem.items():
            if not k in self.tbFields: continue 
            dtmp[k] = v
        query = self.genInsertQuery(dtmp)
        return self.execute(query,True)

    def doUpdate(self,dItem:dict,callId:str):
        query = self.genUpdateQuery(dItem,self.keyid,callId)
        return self.execute(query,True)

    def dumpData(self):
        self.connect()
        query = "select %s from %s;" % (",".join(self.tbFields),self.tbName)
        return self.execWithRet(query)

    def doClean(self,timeout:int):
        self.connect()
        resveTime = int(time.time()) - timeout 
        query = "delete from %s where start_time < %d;" % (self.tbName,resveTime)
        self.logger.info(query)
        return self.execute(query,True)

def loadCdrFromDB(cdrMgr,dCdrData:dict,dbFile:str):
    for item in cdrMgr.dumpData():
        dtmp = dict(zip(cdrMgr.tbFields,item))
        sipCallId = dtmp.get("sip_call_id") or ""
        if 0 == len(sipCallId) : continue
        cdrItem = ObjCdrItem()
        cdrItem.update(dtmp)
        dCdrData[sipCallId] = cdrItem
    return None

def initData(dCdrData,dbFile):
    logger.info("init data begin")
    sdfFileExist = os.path.isfile(dbFile)
    try:
        if sdfFileExist: 
            # load from db file
            cdrMgr = ObjCDRMgr(dbFile)
            cdrMgr.initDB()
            loadCdrFromDB(cdrMgr,dCdrData, dbFile)
            cdrMgr.close()
            logger.info("load data from %s ok , len(gDCdrData) = %d" % (dbFile,len(dCdrData)))
    except:
        logger.info("initData not success")
        logger.debug(traceback.format_exc())    
    logger.info("init data end")

class dataGenThrd(Thread):
    def __init__(self,qCDR):
        super().__init__()
        self.qCDR = qCDR

    def run(self):
        logger.info("cdr generate task start")
        arrCaller = ["13012345678","13112345678","13212345678"]
        arrCallee = ["13312345678","13142345678","13512345678"]
        while True :
            dtmp = {
                'sip_call_id' : str(uuid.uuid4()),
                'caller_number' : random.choice(arrCaller),
                'callee_number' : random.choice(arrCallee),
                'start_time' : str(datetime.datetime.now()),
                'answer_time' : str(datetime.datetime.now()),
                'hangup_time' : str(datetime.datetime.now())
            }
            logger.debug(dtmp)
            self.qCDR.put(json.dumps(dtmp))
            time.sleep(10)


class updateThrd(Thread):
    def __init__(self,dbFile,qCDR,dCdrData):
        super().__init__()
        self.dbFile = dbFile
        self.qCDR = qCDR
        self.dCdrData = dCdrData
        self.cdrMgr = ObjCDRMgr(self.dbFile)

    def doTask(self):
        cdrItem,lastCdrItem = None,None
        while True:
            try :
                cdrItem = self.qCDR.get(timeout=3)
            except:
                cdrItem = None
            if not cdrItem : continue
            qSize = self.qCDR.qsize()
            if lastCdrItem == cdrItem :
                logger.debug("qCDR size : %d ,lastCdrItem == cdrItem , skip" % qSize)
                continue
            logger.info("qCDR size : %d ,cdrItem : %s" % (qSize,str(cdrItem)))
            sipCallId,dTmp = "",{}
            try:
                dTmp = json.loads(cdrItem)
                sipCallId = dTmp.get("sip_call_id") or ""
            except:
                sipCallId,dTmp = "",{}

            if 0 in [len(sipCallId),len(dTmp)]:
                logger.info("sipCallId or dTmp is empty , skip")
                continue
            try:
                flagInsert = False
                if not sipCallId in self.dCdrData:
                    self.dCdrData[sipCallId] = ObjCdrItem()
                    logger.info("add sipCallId(%s)" % sipCallId)
                    flagInsert = True
                refSDF = self.dCdrData[sipCallId]
                refSDF.update(dTmp)
                if flagInsert :
                    self.cdrMgr.doInsert(dTmp)
                else:
                    self.cdrMgr.doUpdate(dTmp,sipCallId)
                lastCdrItem = cdrItem
            except:
                logger.info("sdfUpdate task exception,please check")
                logger.debug(traceback.format_exc())
                time.sleep(1)

        return None

    def run(self):
        logger.info("cdr update task start")
        try :
            self.cdrMgr.initDB()
            loadCdrFromDB(self.cdrMgr,self.dCdrData,self.dbFile)
            self.doTask()
        except:
            logger.info("try next loop")
            logger.debug(traceback.format_exc())
        time.sleep(10)

class cleanThrd(Thread):
    def __init__(self,dbFile,reserveDays,dCdrData):
        super().__init__()
        self.dbFile = dbFile
        self.reserveDays = reserveDays
        self.dCdrData = dCdrData

    def run(self):
        logger.info("clean data task start")
        #cacheTimeout = 5 * 3600 # seconds  
        reserveTimeout = self.reserveDays * 24 * 3600 # seconds
        arrHour = [2,3,4,5,6,]
        while True:
            try:
                now = datetime.datetime.now()
                if now.hour in arrHour :
                    for callId,dtmp in self.dCdrData.items():
                        doDictClean(dtmp,"addTime",reserveTimeout)
                    cdrMgr = ObjCDRMgr(self.dbFile)
                    cdrMgr.doClean(reserveTimeout)
                logger.info("dCdrData len : %d , memory size : %d" % (len(self.dCdrData),sys.getsizeof(self.dCdrData)))
            except:
                logger.warning("data clean fail")
                logger.debug(traceback.format_exc())
            time.sleep(2* 60 * 60)

def main():
    gDCdrData = {} # {"call_id" :ObjCdrItem,}
    gQCDR = queue.Queue()
    dbFile,reserveDays = "test1.db",15

    logging.basicConfig(
        level=logging.DEBUG, # DEBUG,INFO,WARNING,ERROR,CRITICAL
        format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
        datefmt='%a, %d %b %Y %H:%M:%S'
    )

    logger.info("dbFile : %s " % dbFile)

    initData(gDCdrData,dbFile)
    arrTask = []

    arrTask.append(dataGenThrd(gQCDR))
    arrTask.append(updateThrd(dbFile,gQCDR,gDCdrData))
    arrTask.append(cleanThrd(dbFile,reserveDays,gDCdrData))

    for t in arrTask:
        t.start()

    while True:
        time.sleep(1)

if __name__ == "__main__":
    main()
else:
    print("import as module")
运行效果如下:

数据文件打开效果如下:

三、总结
好,就本文就说这么多了,有问题欢迎留言评论!
用户评论