import sqlite3 import shutil import os import json import hashlib import database_api PATH_INIT_SCRIPT = "./init_database.sql" PATH_FILESYSTEM = "./storage" PATH_DATABASE = "./crisis_events.db" PATH_METADATA = "./crisis_events_database_metadata.json" class SqliteDatabaseAPI(database_api.CrisisEventsDatabase): def _connect(self): connection = sqlite3.connect(PATH_DATABASE) return connection, connection.cursor() def _disconnect(self,connection, cursor): cursor.close() connection.commit() connection.close() def get_next_user_id(self): next = None metadata = None with open(PATH_METADATA, "r") as f: #im lazy :) metadata = json.loads(f.read()) next = metadata["Users"] metadata["Users"] += 1 with open(PATH_METADATA, "w") as f: #who needs r+ json.dump(metadata,f) return next def get_next_collection_id(self): next = None metadata = None with open(PATH_METADATA, "r") as f: #im lazy :) metadata = json.loads(f.read()) next = metadata["Collections"] metadata["Collections"] += 1 with open(PATH_METADATA, "w") as f: #who needs r+ json.dump(metadata,f) return next def create_collection(self,user_id: int, collection_name:str): connection, cur = self._connect() collection_id = self.get_next_collection_id() cur.execute("INSERT INTO event_collections VALUES (?, ?, ?, ?, ?)", (collection_id,user_id,"{}","summary",collection_name,)) print(f"Created new collection '{collection_name}' owned by {user_id}") self._disconnect(connection, cur) return collection_id def create_user(self, username: str, password: str): connection, cur = self._connect() user_id = self.get_next_user_id() user_hash = hashlib.md5(password.encode()).digest().hex() print(user_hash) cur.execute("INSERT INTO users VALUES (?, ?, ?)", (user_id,username,user_hash,)) print(f"Created new user '{username}'") self._disconnect(connection, cur) return user_id def get_collection(self,collection_id:int, user_id: int): connection, cur = self._connect() cur.execute("SELECT * FROM event_collections WHERE collection_id == ? AND owner_id == ?;", (collection_id,user_id,)) result = cur.fetchone() json_result = { "collection_id":result[0], "user_id":result[1], "collection_data":result[2], "collection_summary":result[3], "collection_name":result[4], } self._disconnect(connection, cur) return json_result def set_collection(self,collection_id:int, user_id: int, collection_json:str, summary:str, name:str): connection, cur = self._connect() cur.execute("UPDATE event_collections SET collection_data = ?, collection_summary = ?, collection_name = ? WHERE collection_id == ? AND owner_id == ?;", (collection_json,summary,name,collection_id,user_id,)) # result = cur.fetchone() self._disconnect(connection, cur) # return result def get_sample_of_collections(self): connection, cur = self._connect() cur.execute("SELECT * FROM event_collections LIMIT 10;") result = cur.fetchall() self._disconnect(connection, cur) return result def get_sample_of_users(self): connection, cur = self._connect() cur.execute("SELECT * FROM users LIMIT 10;") result = cur.fetchall() self._disconnect(connection, cur) return result def initialize(self, reset = False): #try to delete the databse and all relevant data # try: # shutil.rmtree(PATH_FILESYSTEM) # except FileNotFoundError: # pass # finally: # os.mkdir(PATH_FILESYSTEM) connection, cur = self._connect() with open(PATH_INIT_SCRIPT,"r") as f: sql = f.read() cur.executescript(sql) with open(PATH_METADATA, "w") as f: metadata = { "Users": 0, "Collections": 0, } json.dump(metadata,f) self._disconnect(connection, cur) def get_info(self) -> str: return "SQLITE Backend" IMPLEMENTATION = SqliteDatabaseAPI if __name__ == "__main__": crisis_db = SqliteDatabaseAPI() crisis_db.initialize() print(crisis_db.get_next_user_id()) print(crisis_db.get_next_user_id()) print(crisis_db.get_next_user_id()) print(crisis_db.get_next_user_id()) user_id = crisis_db.get_next_user_id() collection_id = crisis_db.create_collection(user_id,"My collection") print(crisis_db.get_collection(0,4)) print(crisis_db.set_collection(0,4,"{'hi':'bye'}","new summary", "new name")) print(crisis_db.get_collection(0,4)) # print(get_collection_title(collection_id))