Skip to content

Commit

Permalink
chore: remove pymongo_ver_low & update mongodb pipeline & update depe…
Browse files Browse the repository at this point in the history
…ndency
  • Loading branch information
shengchenyang committed Oct 19, 2024
1 parent 2130092 commit 206271d
Show file tree
Hide file tree
Showing 5 changed files with 1,296 additions and 1,110 deletions.
54 changes: 11 additions & 43 deletions ayugespidertools/common/mongodbpipe.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional, Tuple, TypeVar
from typing import TYPE_CHECKING, Tuple, TypeVar

from ayugespidertools.common.multiplexing import ReuseOperation

Expand Down Expand Up @@ -41,26 +41,19 @@ def _get_insert_data(self, item_dict: dict) -> Tuple[dict, str]:
table_name = item_dict["_table"].key_value
return insert_data, table_name

def process_item_template(
self,
item_dict: dict,
db: "PymongoDataBaseT",
pymongo_ver_low: Optional[bool] = None,
):
def process_item_template(self, item_dict: dict, db: "PymongoDataBaseT"):
"""模板方法,用于处理 mongodb pipeline 存储的模板方法类
Args:
item_dict: item 的 dict 类型
db: mongodb 数据库连接
pymongo_ver_low: pymongo 版本是否 4.0 以下
"""
insert_data, table_name = self._get_insert_data(item_dict)
self._data_storage_logic(
db=db,
item_dict=item_dict,
collection_name=table_name,
insert_data=insert_data,
pymongo_ver_low=pymongo_ver_low,
)

def _default_storage(
Expand All @@ -69,23 +62,13 @@ def _default_storage(
item_dict: dict,
collection_name: str,
insert_data: dict,
pymongo_ver_low: Optional[bool] = None,
):
if pymongo_ver_low:
# 如果没有查重字段时,就直接插入数据(不去重)
if not item_dict.get("_mongo_update_rule"):
db[collection_name].insert(insert_data)
else:
db[collection_name].update(
item_dict["_mongo_update_rule"], {"$set": insert_data}, upsert=True
)
if not item_dict.get("_mongo_update_rule"):
db[collection_name].insert_one(insert_data)
else:
if not item_dict.get("_mongo_update_rule"):
db[collection_name].insert_one(insert_data)
else:
db[collection_name].update_one(
item_dict["_mongo_update_rule"], {"$set": insert_data}, upsert=True
)
db[collection_name].update_one(
item_dict["_mongo_update_rule"], {"$set": insert_data}, upsert=True
)

@abstractmethod
def _data_storage_logic(
Expand All @@ -94,7 +77,6 @@ def _data_storage_logic(
item_dict: dict,
collection_name: str,
insert_data: dict,
pymongo_ver_low: Optional[bool] = None,
*args,
**kwargs,
) -> None:
Expand All @@ -105,7 +87,6 @@ def _data_storage_logic(
item_dict: item 的 dict 类型
collection_name: 集合名称
insert_data: 要插入的数据
pymongo_ver_low: pymongo 版本是否小于 4.0
*args: 可变参数
**kwargs:关键字参数
"""
Expand All @@ -123,13 +104,10 @@ def _data_storage_logic(
item_dict: dict,
collection_name: str,
insert_data: dict,
pymongo_ver_low: Optional[bool] = None,
*args,
**kwargs,
) -> None:
self._default_storage(
db, item_dict, collection_name, insert_data, pymongo_ver_low
)
self._default_storage(db, item_dict, collection_name, insert_data)


class TwistedAsynchronous(AbstractClass):
Expand All @@ -141,13 +119,10 @@ def _data_storage_logic(
item_dict: dict,
collection_name: str,
insert_data: dict,
pymongo_ver_low: Optional[bool] = None,
*args,
**kwargs,
) -> None:
self._default_storage(
db, item_dict, collection_name, insert_data, pymongo_ver_low
)
self._default_storage(db, item_dict, collection_name, insert_data)


class AsyncioAsynchronous(AbstractClass):
Expand All @@ -159,7 +134,6 @@ async def _data_storage_logic( # type: ignore[override]
item_dict: dict,
collection_name: str,
insert_data: dict,
pymongo_ver_low: Optional[bool] = None,
*args,
**kwargs,
): # @override to fix mypy [override] error.
Expand All @@ -170,12 +144,7 @@ async def _data_storage_logic( # type: ignore[override]
item_dict["_mongo_update_rule"], {"$set": insert_data}, upsert=True
)

async def process_item_template(
self,
item_dict: dict,
db: "PymongoDataBaseT",
pymongo_ver_low: Optional[bool] = None,
):
async def process_item_template(self, item_dict: dict, db: "PymongoDataBaseT"):
insert_data, table_name = self._get_insert_data(item_dict)
await self._data_storage_logic(
db=db,
Expand All @@ -189,7 +158,6 @@ def mongodb_pipe(
abstract_class: AbstractClass,
item_dict: dict,
db: "PymongoDataBaseT",
pymongo_ver_low: bool = True,
) -> None:
"""mongodb pipeline 存储的通用调用方法"""
abstract_class.process_item_template(item_dict, db, pymongo_ver_low)
abstract_class.process_item_template(item_dict, db)
9 changes: 1 addition & 8 deletions ayugespidertools/scraper/pipelines/mongo/fantasy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
class AyuFtyMongoPipeline:
conn: "pymongo.MongoClient"
db: "database.Database"
pymongo_ver_low: bool

def open_spider(self, spider: "AyuSpider") -> None:
assert hasattr(spider, "mongodb_conf"), "未配置 MongoDB 连接信息!"
self.pymongo_ver_low = MongoDbBase.ver_low()
mongodb_conf_dict = spider.mongodb_conf._asdict()
self.conn, self.db = MongoDbBase.connects(**mongodb_conf_dict)

Expand All @@ -36,12 +34,7 @@ def process_item(self, item: Any, spider: "AyuSpider") -> Any:
item: scrapy item
"""
item_dict = ReuseOperation.item_to_dict(item)
mongodb_pipe(
Synchronize(),
item_dict=item_dict,
db=self.db,
pymongo_ver_low=self.pymongo_ver_low,
)
mongodb_pipe(Synchronize(), item_dict=item_dict, db=self.db)
return item

def close_spider(self, spider: "AyuSpider") -> None:
Expand Down
12 changes: 6 additions & 6 deletions ayugespidertools/scraper/pipelines/mongo/twisted.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ def process_item(self, item, spider):

def db_insert(self, item, out):
item_dict = ReuseOperation.item_to_dict(item)
mongodb_pipe(
TwistedAsynchronous(),
item_dict=item_dict,
db=self.db,
pymongo_ver_low=self.pymongo_ver_low,
)
mongodb_pipe(TwistedAsynchronous(), item_dict=item_dict, db=self.db)
reactor.callFromThread(out.callback, item_dict)


"""psycopg = {version = "~3.1.13", optional = true}
psycopg-binary = {version = "~3.1.13", optional = true}
psycopg-pool = {version = "~3.2.0", optional = true}"""
Loading

0 comments on commit 206271d

Please sign in to comment.