Добавлена БД
This commit is contained in:
parent
f9fc25c64b
commit
003ccabeda
33
app.py
33
app.py
@ -1,26 +1,30 @@
|
||||
from fastapi import FastAPI
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from pydantic import BaseModel
|
||||
from typing import List, Optional
|
||||
from typing import List, Optional, Dict
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
class Item(BaseModel):
|
||||
module_name: str
|
||||
module_version: str
|
||||
|
||||
@app.get('/', response_model=List[Item])
|
||||
@app.get('/', response_model=List[Dict[str, str]])
|
||||
def get_items():
|
||||
with open('.version') as f:
|
||||
module_version = f.read().strip()
|
||||
try:
|
||||
with open('.version') as f:
|
||||
module_version = f.read().strip()
|
||||
if not module_version:
|
||||
raise ValueError("Version file is empty")
|
||||
except FileNotFoundError:
|
||||
raise HTTPException(status_code=404, detail="Version file not found")
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
data_info = [Item(module_name="Crypto Licensing", module_version=module_version)]
|
||||
data_info = [{"module_name": "Crypto Licensing", "module_version": module_version}]
|
||||
return data_info
|
||||
|
||||
# @app.get('/items/{item_id}', response_model=Optional[Item])
|
||||
# def get_item(item_id: int):
|
||||
# item = next((item for item in data if item.id == item_id), None)
|
||||
# return item
|
||||
|
||||
@app.get('/check_license/{license_key}', response_model=Dict[str, str])
|
||||
def get_item(license_key: str):
|
||||
return {"license_key": license_key}
|
||||
|
||||
|
||||
|
||||
# @app.post('/items', response_model=Item)
|
||||
@ -31,4 +35,5 @@ def get_items():
|
||||
|
||||
if __name__ == '__main__':
|
||||
import uvicorn
|
||||
|
||||
uvicorn.run(app, host='127.0.0.1', port=8000)
|
||||
|
110
db.py
Normal file
110
db.py
Normal file
@ -0,0 +1,110 @@
|
||||
import psycopg2
|
||||
from psycopg2 import sql
|
||||
|
||||
# Настройки подключения
|
||||
host = "localhost"
|
||||
port = "5432"
|
||||
user = "postgres"
|
||||
password = "postgres"
|
||||
database_name = "postgres"
|
||||
table_name = "licenses"
|
||||
|
||||
|
||||
def connect_to_database():
|
||||
connection = psycopg2.connect(
|
||||
host=host,
|
||||
port=port,
|
||||
user=user,
|
||||
password=password,
|
||||
database=database_name
|
||||
)
|
||||
return connection, connection.cursor()
|
||||
|
||||
|
||||
class LicenseDatabase:
|
||||
def __init__(self):
|
||||
self.connection, self.cursor = connect_to_database()
|
||||
self.create_table()
|
||||
|
||||
def create_table(self):
|
||||
create_table_query = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
id SERIAL PRIMARY KEY,
|
||||
owner_license VARCHAR(255) NOT NULL,
|
||||
expiration_date DATE NOT NULL,
|
||||
device_count INT NOT NULL,
|
||||
license_key VARCHAR(500) -- Добавлена запятая здесь
|
||||
);
|
||||
"""
|
||||
self.cursor.execute(create_table_query)
|
||||
print(f"Таблица '{table_name}' успешно создана или уже существует.")
|
||||
|
||||
def add_record(self, owner_license, expiration_date, device_count, license_key):
|
||||
insert_query = f"""
|
||||
INSERT INTO {table_name} (owner_license, expiration_date, device_count, license_key)
|
||||
VALUES (%s, %s, %s, %s);
|
||||
"""
|
||||
self.cursor.execute(insert_query, (owner_license, expiration_date, device_count, license_key))
|
||||
self.connection.commit()
|
||||
print("Запись успешно добавлена.")
|
||||
|
||||
def delete_record(self, record_id):
|
||||
delete_query = f"DELETE FROM {table_name} WHERE id = %s;"
|
||||
self.cursor.execute(delete_query, (record_id,))
|
||||
self.connection.commit()
|
||||
print("Запись успешно удалена.")
|
||||
|
||||
def update_record(self, record_id, owner_license, expiration_date, device_count, license_key):
|
||||
update_query = f"""
|
||||
UPDATE {table_name}
|
||||
SET owner_license = %s, expiration_date = %s, device_count = %s
|
||||
WHERE id = %s;
|
||||
"""
|
||||
self.cursor.execute(update_query, (owner_license, expiration_date, device_count, record_id, license_key))
|
||||
self.connection.commit()
|
||||
print("Запись успешно обновлена.")
|
||||
|
||||
def get_record_by_id(self, record_id):
|
||||
select_query = f"SELECT * FROM {table_name} WHERE id = %s;"
|
||||
self.cursor.execute(select_query, (record_id,))
|
||||
record = self.cursor.fetchone()
|
||||
|
||||
if record:
|
||||
return {
|
||||
"id": record[0],
|
||||
"owner_license": record[1],
|
||||
"expiration_date": record[2],
|
||||
"device_count": record[3],
|
||||
"license_key": record[4]
|
||||
}
|
||||
else:
|
||||
print(f"Запись с ID {record_id} не найдена.")
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
if self.cursor:
|
||||
self.cursor.close()
|
||||
if self.connection:
|
||||
self.connection.close()
|
||||
print("Соединение с базой данных закрыто.")
|
||||
|
||||
|
||||
# Пример использования класса
|
||||
if __name__ == "__main__":
|
||||
db = LicenseDatabase()
|
||||
# Добавление записи
|
||||
db.add_record("John Doe", "2024-12-31", 5, "ABC123")
|
||||
|
||||
# Получение записи по ID (например, с ID 1)
|
||||
record = db.get_record_by_id(2)
|
||||
if record:
|
||||
print("Полученная запись:", record)
|
||||
|
||||
# Обновление записи (например, с ID 1)
|
||||
db.update_record(2, "Jane Doe", "2025-01-01", 10, "DEF456")
|
||||
|
||||
# Удаление записи (например, с ID 1)
|
||||
db.delete_record(2)
|
||||
|
||||
# Закрытие соединения
|
||||
db.close()
|
Loading…
Reference in New Issue
Block a user