在设计中,经常会遇到数据的持久保存。简单设计的话就是保存到文件中。如果使用python的话,可以更容易的进行一些数据库操作来保存数据。
这里使用sqlite。sqlite不是默认安装的,因此需要从网上安装包
使用apt search sqlite搜索一下,发现有数据库的包
使用apt-get install python3-sqlite3 安装之后就可以使用了。
对数据库的操作还是要使用数据库的语言的。如果要执行数据库的语言,就需要sqlite.cursor对象了,通过这个对象执行数据库的语句。
在每次使用之前需要连接数据库,在使用完之后要关闭数据库。
参考代码如下:
import sqlite3
from sqlite3 import Error
import datetime
sensor_db = 'sensor_database.db'
class MyDB:
db = None
def sql_connection(self):
if self.db == None:
try:
self.db = sqlite3.connect(sensor_db)
except Error:
print(Error)
def sql_close(self):
self.db.close()
self.db = None
def put_data_to_db(self,id='',temperature=0):
cursorObj = self.db.cursor()
cmd = 'create table if not exists %s(id text, temperature real, time text)'%id
cursorObj.execute(cmd)
time = datetime.datetime.now()
time = time.strftime('%Y-%m-%d %H:%M:%S')
cmd = 'INSERT INTO %s VALUES (?,?,?)'%id
cursorObj.execute(cmd,(id,temperature,time))
self.db.commit()
def get_id_data_number(self,id):
cursorObj = self.db.cursor()
cmd = 'SELECT * FROM %s '%id
cursorObj.execute(cmd)
rows = cursorObj.fetchall()
number = len(rows)
print(f'sensor-{id} have {number} datas')
return number
def get_id_all_data_from_db(self,id):
cursorObj = self.db.cursor()
cmd = 'SELECT * FROM %s '%id
cursorObj.execute(cmd)
rows = cursorObj.fetchall()
return rows
def get_data_from_db(self,id,index):
number = self.get_id_data_number(id)
if index >= number:
return []
else:
cursorObj = self.db.cursor()
cmd = 'SELECT * FROM %s '%id
cursorObj.execute(cmd)
rows = cursorObj.fetchall()
return rows[index]
def write_data_to_db(self,id,temperature):
self.sql_connection()
self.put_data_to_db(id,temperature)
self.sql_close()
def read_data_from_db(self,id,index):
self.sql_connection()
value = self.get_data_from_db(id,index)
self.sql_close()
return value
def read_id_data_number(self,id):
self.sql_connection()
number = self.get_id_data_number(id)
self.sql_close()
return number
def read_id_all_data(self,id):
self.sql_connection()
datas = self.get_id_all_data_from_db(id)
self.sql_close()
return datas
def get_table_list(self):
self.sql_connection()
cursorObj = self.db.cursor()
cursorObj.execute('SELECT name from sqlite_master where type= "table"')
table_list = cursorObj.fetchall()
self.sql_close()
return table_list
if __name__ == '__main__':
db = MyDB()
db.write_data_to_db('t001',27)
value = db.read_data_from_db('t001',1)
print(value)
运行之后可以测试数据库的写入与读取操作,如下图所示,读出了数据库中的数据。