Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 任务清理节点相关查询增加 batch 分批逻辑 #7668 #7686

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,7 @@ def monitor_report_config():
CLEAN_EXPIRED_V2_TASK_CRON = env.CLEAN_EXPIRED_V2_TASK_CRON
V2_TASK_VALIDITY_DAY = env.V2_TASK_VALIDITY_DAY
CLEAN_EXPIRED_V2_TASK_BATCH_NUM = env.CLEAN_EXPIRED_V2_TASK_BATCH_NUM
CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM = env.CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM
CLEAN_EXPIRED_V2_TASK_INSTANCE = env.CLEAN_EXPIRED_V2_TASK_INSTANCE
CLEAN_EXPIRED_V2_TASK_CREATE_METHODS = env.CLEAN_EXPIRED_V2_TASK_CREATE_METHODS
CLEAN_EXPIRED_V2_TASK_PROJECTS = env.CLEAN_EXPIRED_V2_TASK_PROJECTS
Expand Down
1 change: 1 addition & 0 deletions env.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
CLEAN_EXPIRED_V2_TASK_CRON = tuple(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_CRON", "30 0 * * *").split())
V2_TASK_VALIDITY_DAY = int(os.getenv("BKAPP_V2_TASK_VALIDITY_DAY", 730))
CLEAN_EXPIRED_V2_TASK_BATCH_NUM = int(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_BATCH_NUM", 100))
CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM = int(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM", 5000))
CLEAN_EXPIRED_V2_TASK_INSTANCE = bool(os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_INSTANCE", False))
CLEAN_EXPIRED_V2_TASK_CREATE_METHODS = os.getenv("BKAPP_CLEAN_EXPIRED_V2_TASK_CREATE_METHODS", "periodic").split(",")
# 没有配置则默认清除所有项目
Expand Down
55 changes: 32 additions & 23 deletions gcloud/contrib/cleaner/pipeline/bamboo_engine_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,31 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from typing import List, Dict
import logging
from typing import Dict, List

from django.conf import settings
from django.db.models import QuerySet

from pipeline.contrib.periodic_task.models import PeriodicTaskHistory
from pipeline.eri.models import (
ContextValue,
CallbackData,
ContextOutputs,
Process,
Node,
ContextValue,
Data,
State,
ExecutionHistory,
ExecutionData,
CallbackData,
ExecutionHistory,
Node,
Process,
Schedule,
State,
)
from pipeline.models import PipelineInstance, Snapshot, TreeInfo

from pipeline.models import PipelineInstance, TreeInfo, Snapshot
from gcloud.utils.data_handler import chunk_data
from pipeline_web.core.models import NodeInInstance

logger = logging.getLogger("root")


def get_clean_pipeline_instance_data(instance_ids: List[str]) -> Dict[str, QuerySet]:
"""
Expand All @@ -48,19 +52,24 @@ def get_clean_pipeline_instance_data(instance_ids: List[str]) -> Dict[str, Query
execution_snapshot = Snapshot.objects.filter(id__in=list(execution_snapshot_ids))

pipeline_ids = instance_ids
logger.info(
f"[get_clean_pipeline_instance_data] fetching pipeline_ids number: {pipeline_ids}, e.x.:{pipeline_ids[:3]}..."
)
context_value = ContextValue.objects.filter(pipeline_id__in=pipeline_ids)
context_outputs = ContextOutputs.objects.filter(pipeline_id__in=pipeline_ids)
process = Process.objects.filter(root_pipeline_id__in=pipeline_ids)
periodic_task_history = PeriodicTaskHistory.objects.filter(pipeline_instance_id__in=pipeline_ids)

node_ids = list(nodes_in_pipeline.values_list("node_id", flat=True)) + instance_ids
nodes = Node.objects.filter(node_id__in=node_ids)
data = Data.objects.filter(node_id__in=node_ids)
states = State.objects.filter(node_id__in=node_ids)
execution_history = ExecutionHistory.objects.filter(node_id__in=node_ids)
execution_data = ExecutionData.objects.filter(node_id__in=node_ids)
callback_data = CallbackData.objects.filter(node_id__in=node_ids)
schedules = Schedule.objects.filter(node_id__in=node_ids)
logger.info(f"[get_clean_pipeline_instance_data] fetching node_ids number: {node_ids}, e.x.:{node_ids[:3]}...")
chunk_size = settings.CLEAN_EXPIRED_V2_TASK_NODE_BATCH_NUM
nodes_list = chunk_data(node_ids, chunk_size, lambda x: Node.objects.filter(node_id__in=x))
data_list = chunk_data(node_ids, chunk_size, lambda x: Data.objects.filter(node_id__in=x))
states_list = chunk_data(node_ids, chunk_size, lambda x: State.objects.filter(node_id__in=x))
execution_history_list = chunk_data(node_ids, chunk_size, lambda x: ExecutionHistory.objects.filter(node_id__in=x))
execution_data_list = chunk_data(node_ids, chunk_size, lambda x: ExecutionData.objects.filter(node_id__in=x))
callback_data_list = chunk_data(node_ids, chunk_size, lambda x: CallbackData.objects.filter(node_id__in=x))
schedules_list = chunk_data(node_ids, chunk_size, lambda x: Schedule.objects.filter(node_id__in=x))

return {
"tree_info": tree_info,
Expand All @@ -69,13 +78,13 @@ def get_clean_pipeline_instance_data(instance_ids: List[str]) -> Dict[str, Query
"context_value": context_value,
"context_outputs": context_outputs,
"process": process,
"node": nodes,
"data": data,
"state": states,
"execution_history": execution_history,
"execution_data": execution_data,
"callback_data": callback_data,
"schedules": schedules,
"periodic_task_history": periodic_task_history,
"pipeline_instances": pipeline_instances,
"node_list": nodes_list,
"data_list": data_list,
"state_list": states_list,
"execution_history_list": execution_history_list,
"execution_data_list": execution_data_list,
"callback_data_list": callback_data_list,
"schedules_list": schedules_list,
}
8 changes: 6 additions & 2 deletions gcloud/contrib/cleaner/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@ def clean_expired_v2_task_data():
instance_fields = ["tasks", "pipeline_instances"]
with transaction.atomic():
for field, qs in data_to_clean.items():
if field not in instance_fields or settings.CLEAN_EXPIRED_V2_TASK_INSTANCE:
if field.endswith("_list") and isinstance(qs, list):
logger.info(f"[clean_expired_v2_task_data] clean field: {field}, {len(qs)} batch data")
[q.delete() for q in qs]
elif field not in instance_fields or settings.CLEAN_EXPIRED_V2_TASK_INSTANCE:
logger.info(
f"[clean_expired_v2_task_data] clean field: {field}, qs ids: {qs.values_list('id', flat=True)}"
f"[clean_expired_v2_task_data] clean field: {field}, "
f"qs ids: {qs.values_list('id', flat=True)[:10]}..."
)
qs.delete()
elif field == "pipeline_instances":
Expand Down
Loading