Skip to content

Commit

Permalink
optimize: change mongo dependency judgment logic
Browse files Browse the repository at this point in the history
  • Loading branch information
shengchenyang committed Apr 30, 2024
1 parent 3e076a5 commit 625ad1c
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 21 deletions.
33 changes: 19 additions & 14 deletions ayugespidertools/common/mongodbpipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,22 @@ def process_item_template(
self,
item_dict: dict,
db: "PymongoDataBaseT",
sys_ver_low: Optional[bool] = None,
pymongo_ver_low: Optional[bool] = None,
):
"""模板方法,用于处理 mongodb pipeline 存储的模板方法类
Args:
item_dict: item 的 dict 类型
db: mongodb 数据库连接
sys_ver_low: 是否是 py3.11 以下
pymongo_ver_low: pymongo 版本是否 4.0 以下
"""
insert_data, table_name = self._get_insert_data(item_dict)
self._data_storage_logic(
db=db,
item_dict=item_dict,
collection_name=table_name,
insert_data=insert_data,
sys_ver_low=sys_ver_low,
pymongo_ver_low=pymongo_ver_low,
)

def _default_storage(
Expand All @@ -69,9 +69,9 @@ def _default_storage(
item_dict: dict,
collection_name: str,
insert_data: dict,
sys_ver_low: Optional[bool] = None,
pymongo_ver_low: Optional[bool] = None,
):
if sys_ver_low:
if pymongo_ver_low:
# 如果没有查重字段时,就直接插入数据(不去重)
if not item_dict.get("_mongo_update_rule"):
db[collection_name].insert(insert_data)
Expand All @@ -94,7 +94,7 @@ def _data_storage_logic(
item_dict: dict,
collection_name: str,
insert_data: dict,
sys_ver_low: Optional[bool] = None,
pymongo_ver_low: Optional[bool] = None,
*args,
**kwargs,
) -> None:
Expand All @@ -105,6 +105,7 @@ def _data_storage_logic(
item_dict: item 的 dict 类型
collection_name: 集合名称
insert_data: 要插入的数据
pymongo_ver_low: pymongo 版本是否小于 4.0
*args: 可变参数
**kwargs:关键字参数
"""
Expand All @@ -120,11 +121,13 @@ def _data_storage_logic(
item_dict: dict,
collection_name: str,
insert_data: dict,
sys_ver_low: Optional[bool] = None,
pymongo_ver_low: Optional[bool] = None,
*args,
**kwargs,
) -> None:
self._default_storage(db, item_dict, collection_name, insert_data, sys_ver_low)
self._default_storage(
db, item_dict, collection_name, insert_data, pymongo_ver_low
)


class TwistedAsynchronous(AbstractClass):
Expand All @@ -136,11 +139,13 @@ def _data_storage_logic(
item_dict: dict,
collection_name: str,
insert_data: dict,
sys_ver_low: Optional[bool] = None,
pymongo_ver_low: Optional[bool] = None,
*args,
**kwargs,
) -> None:
self._default_storage(db, item_dict, collection_name, insert_data, sys_ver_low)
self._default_storage(
db, item_dict, collection_name, insert_data, pymongo_ver_low
)


class AsyncioAsynchronous(AbstractClass):
Expand All @@ -152,7 +157,7 @@ async def _data_storage_logic( # type: ignore[override]
item_dict: dict,
collection_name: str,
insert_data: dict,
sys_ver_low: Optional[bool] = None,
pymongo_ver_low: Optional[bool] = None,
*args,
**kwargs,
): # @override to fix mypy [override] error.
Expand All @@ -167,7 +172,7 @@ async def process_item_template(
self,
item_dict: dict,
db: "PymongoDataBaseT",
sys_ver_low: Optional[bool] = None,
pymongo_ver_low: Optional[bool] = None,
):
insert_data, table_name = self._get_insert_data(item_dict)
await self._data_storage_logic(
Expand All @@ -182,7 +187,7 @@ def mongodb_pipe(
abstract_class: AbstractClass,
item_dict: dict,
db: "PymongoDataBaseT",
sys_ver_low: bool = True,
pymongo_ver_low: bool = True,
) -> None:
"""mongodb pipeline 存储的通用调用方法"""
abstract_class.process_item_template(item_dict, db, sys_ver_low)
abstract_class.process_item_template(item_dict, db, pymongo_ver_low)
9 changes: 7 additions & 2 deletions ayugespidertools/mongoclient.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import TYPE_CHECKING, Optional, Tuple

import pymongo
from gridfs import GridFS
from pymongo import MongoClient

Expand All @@ -11,7 +12,6 @@
]

if TYPE_CHECKING:
import pymongo
from pymongo import database

from ayugespidertools.common.typevars import authMechanismStr
Expand All @@ -37,7 +37,7 @@ def connects(
authMechanism: "authMechanismStr" = "SCRAM-SHA-1",
database: Optional[str] = None,
uri: Optional[str] = None,
) -> Tuple["pymongo.MongoClient", "database.Database"]:
) -> Tuple[pymongo.MongoClient, "database.Database"]:
"""初始化 mongo 连接句柄
可传入 user, password, host 等参数的形式,也可只传入 uri 的方式
Expand Down Expand Up @@ -67,6 +67,11 @@ def connects(
db = conn[database]
return conn, db

@staticmethod
def ver_low() -> bool:
pymongo_ver = pymongo.version or pymongo.__version__
return pymongo_ver <= "3.13.0"

@staticmethod
def getFileMd5(db, _id, collection):
gridfs_col = GridFS(db, collection)
Expand Down
7 changes: 3 additions & 4 deletions ayugespidertools/scraper/pipelines/mongo/fantasy.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import sys
from typing import TYPE_CHECKING, Any

from ayugespidertools.common.mongodbpipe import Synchronize, mongodb_pipe
Expand All @@ -17,11 +16,11 @@
class AyuFtyMongoPipeline:
conn: "pymongo.MongoClient"
db: "database.Database"
sys_ver_low: bool
pymongo_ver_low: bool

def open_spider(self, spider: "AyuSpider") -> None:
assert hasattr(spider, "mongodb_conf"), "未配置 MongoDB 连接信息!"
self.sys_ver_low = sys.version_info < (3, 11)
self.pymongo_ver_low = MongoDbBase.ver_low()
mongodb_conf_dict = spider.mongodb_conf._asdict()
self.conn, self.db = MongoDbBase.connects(**mongodb_conf_dict)

Expand All @@ -41,7 +40,7 @@ def process_item(self, item: Any, spider: "AyuSpider") -> Any:
Synchronize(),
item_dict=item_dict,
db=self.db,
sys_ver_low=self.sys_ver_low,
pymongo_ver_low=self.pymongo_ver_low,
)
return item

Expand Down
2 changes: 1 addition & 1 deletion ayugespidertools/scraper/pipelines/mongo/twisted.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ def db_insert(self, item, out):
TwistedAsynchronous(),
item_dict=item_dict,
db=self.db,
sys_ver_low=self.sys_ver_low,
pymongo_ver_low=self.pymongo_ver_low,
)
reactor.callFromThread(out.callback, item_dict)

0 comments on commit 625ad1c

Please sign in to comment.