-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathai_crud.py
67 lines (56 loc) · 1.83 KB
/
ai_crud.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import yaml
from sqlmodel import create_engine, SQLModel, Session, select
from sqlalchemy.dialects.mysql import insert
from ai_model import SQLMODEL
db_config: dict[str:str] = None
with open("aws.yaml", "r") as file:
db_config = yaml.safe_load(file)["database"]
URL_components: list[str] = [
db_config["drivername"],
"://",
db_config["username"],
":",
db_config["password"],
"@",
db_config["host"],
":",
db_config["port"],
"/",
db_config["database"],
"?",
"charset=utf8",
]
engine = create_engine("".join(URL_components))
SQLModel.metadata.create_all(engine)
def upsert_newspapers(newspapers: list[SQLMODEL.NewsPaper]):
with Session(engine, expire_on_commit=False) as session:
try:
for newspaper in newspapers:
stmt = insert(SQLMODEL.NewsPaper).values(newspaper.model_dump())
stmt = stmt.on_duplicate_key_update(link_hash=stmt.inserted.link_hash)
session.exec(stmt)
session.commit()
except Exception as e:
session.rollback()
raise e
def read_newspaper(link_hash: str) -> SQLMODEL.NewsPaper:
print("[START]read_newspaper, link_hash:", link_hash)
with Session(engine, expire_on_commit=False) as session:
try:
statement = select(SQLMODEL.NewsPaper).where(
SQLMODEL.NewsPaper.link_hash == link_hash
)
newspaper = session.exec(statement).one()
return newspaper
except Exception as e:
print("[EXCEPTION]", e)
raise ValueError
# def test():
# try:
# newspaper = read_newspaper(
# link_hash="a2de8839d7f32bb07bd9505612796d7abda66a8bc2a32e0a7ac046f20f279fb2"
# )
# print(newspaper.__dir__)
# except Exception:
# print("ERROR")
# test()