Motivated by the first commentor's suggestion on my MSAccess question: Python SQL insert to MSAccess with VBScript
I am moving to use SQLite databases for a python application and I created this query builder class to help implement the SQL logic. One constraint is having to use Python 3.6 so that is why I used .format()
instead of the more modern F-string for string formatting. I'm looking for an overall review of my code quality, etc. Any suggested improvements to the code...
import sqlite3
from typing import Union, List, Optional
import logging
class QueryBuilder:
'''
Query Builder for SQLite databases
Usage:
query_builder = QueryBuilder("database.db")
query_builder.insert("users", {"username": "John", "password": "johnspassword"})
query_builder.update("users", "password = ?", "username = ?")
query_builder.delete("users", {"username": "John"})
query_builder.select("users", ["username", "password"], "username = ?")
'''
def __init__(self, db_name: str, log_file_path: str = 'querybuilder.log'):
'''
Establish database connection and cursor creation and configure logging.
Arguments:
- db_name: The name of the SQLite database
Usage:
query_builder = QueryBuilder("database.db")
'''
logging.basicConfig(filename=log_file_path, level=logging.INFO,
format='%(asctime)s:%(levelname)s:%(message)s')
self.conn = sqlite3.connect(db_name, isolation_level=None)
self.cursor = self.conn.cursor()
def __execute_query(self, query: str, params: Union[tuple, list] = None):
'''
Execute SQL query
Arguments:
- query: SQL query as a string
- params: Parameters for the SQL query
'''
try:
if params:
self.cursor.execute(query, params)
else:
self.cursor.execute(query)
return self.cursor.fetchall()
except sqlite3.Error as error:
logging.error("Error while executing SQLite Script: {}".format(error))
raise
def select(self, table: str, fields: List[str] = ['*'], where: Optional[str] = None,
join: Optional[str] = None, group_by: Optional[str] = None,
order_by: Optional[str] = None):
'''
Select records from a table with various clauses
Arguments:
- table: Table to select from
- fields: List of fields to be selected
- where: WHERE clause
- join: JOIN clause
- group_by: GROUP BY clause
- order_by: ORDER BY clause
Usage:
result = query_builder.select("users", ["username", "password"], "username = ?", "LEFT JOIN orders ON users.id = orders.user_id", "username", "orders.id DESC")
'''
selected_fields = ', '.join(fields)
base_query = "SELECT {} FROM {}".format(selected_fields, table)
if join:
base_query += " {}".format(join)
if where:
base_query += " WHERE {}".format(where)
if group_by:
base_query += " GROUP BY {}".format(group_by)
if order_by:
base_query += " ORDER BY {}".format(order_by)
return self.__execute_query(base_query)
def insert(self, table: str, data: dict):
'''
Insert a new record into a table
Arguments:
- table: Table to insert into
- data: Dictionary of columns and values to insert
Usage:
query_builder.insert("users", {"username": "John", "password": "johnspassword"})
This will execute: INSERT INTO users (username,password) VALUES (?,?) with parameters ('John', 'johnspassword')
'''
fields = ', '.join(data.keys())
placeholder = ', '.join('?' * len(data))
query = "INSERT INTO {} ({}) VALUES ({})".format(table, fields, placeholder)
return self.__execute_query(query, list(data.values()))
def update(self, table: str, data: Optional[dict] = None, where: Optional[dict] = None):
'''
Update an existing record in a table
Arguments:
- table: Table to update
- data: Dictionary of new data for the record
- where: Dictionary for WHERE clause to select the record
Usage:
query_builder.update("users", {"password": "new_password"}, {"username": "John"})
This will execute: UPDATE users SET password = ? WHERE username = ? with parameters ('new_password', 'John')
'''
if data:
set_query = ', '.join(["{} = ?".format(k) for k in data.keys()])
query = "UPDATE {} SET {}".format(table, set_query)
else:
print("Error: No data provided to update.")
return
if where:
where_query = ' AND '.join(["{} = ?".format(k) for k in where.keys()])
query += " WHERE {}".format(where_query)
params = tuple(list(data.values()) + list(where.values() if where else []))
return self.__execute_query(query, params)
def delete(self, table: str, where: Optional[dict] = None):
'''
Delete an existing record from a table
Arguments:
- table: Table to delete from
- where: WHERE clause to select the record
Usage:
query_builder.delete("users", {"username": "John"})
This will execute: DELETE FROM users WHERE username = ? with parameter ('John')
'''
query = "DELETE FROM {}".format(table)
if where is not None:
condition = ' AND '.join(["{} = ?".format(key) for key in where.keys()])
query += " WHERE {}".format(condition)
params = tuple(where.values())
self.__execute_query(query, params)
return self.__execute_query(query, params)
def close_connection(self):
'''
Close the connection to the database
Usage:
query_builder.close_connection()
'''
self.conn.close()
def __del__(self):
'''
Class destructor. Closes the connection to the database
'''
self.conn.close()