-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgrpc_client.py
84 lines (67 loc) · 4.03 KB
/
grpc_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from grpc_sc2reader_client_functions import process_replay, initialize_worker
from multiprocessing import Pool
import argparse
import os
import logging
from settings import LOGGING_FORMAT
# Initiate multiprocessing spawning processes that are using load_replay
# This must be done by popping a list so that processes don't have the same replay by accident.
def get_replays(replay_directory:str):
absolute_filepaths = []
# Listing the files available in a supplied directory, checking for the right extension and obtaining absolute path:
for replay in os.listdir(replay_directory):
if replay.endswith(".SC2Replay"):
absolute_filepaths.append(os.path.abspath(os.path.join(replay_directory, replay)))
return absolute_filepaths
def start_processing(args_replay_directory:str,
args_output_directory:str,
args_agents:int,
args_chunksize:int,
args_multiprocessing:bool,
args_anonymize_toon:bool,
args_anonymize_chat:bool):
logging.info("Entered start_processing()")
# Getting a list of replay filepaths by using a helper function:
list_of_replays = get_replays(args_replay_directory)
logging.info(f"Got list_of_replays= {len(list_of_replays)}")
processing_arguments = []
for replay_path in list_of_replays:
processing_arguments.append((replay_path, args_output_directory, args_anonymize_toon, args_anonymize_chat))
processing_arguments_iterator = iter(processing_arguments)
if args_multiprocessing:
logging.info("Detected args_multiprocessing = True")
# Defining available pool of processes for replay processing:
with Pool(processes=args_agents, initializer=initialize_worker) as pool:
pool.imap_unordered(process_replay,
processing_arguments_iterator,
args_chunksize)
pool.close()
pool.join()
else:
logging.info("Detected args_multiprocessing = False")
initialize_worker()
for replay_file in list_of_replays:
process_replay(replay_file=replay_file,
output_dir=args_output_directory,
anonymize_toon=args_anonymize_toon,
anonymize_chat=args_anonymize_chat)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="StarCraft II replay processing tool that uses multiprocessing.")
parser.add_argument("--input_dir", type=str, default="./DEMOS/Input", help="Provide the path to the input directory that contains .SC2Replay files.")
parser.add_argument("--output_dir", type=str, default="./DEMOS/Output/", help="Provide the path to the output directory that will contain .pickle files.")
parser.add_argument("--agents", type=int, default=22, help="Provide how much agents will be available in the pool for execution.")
parser.add_argument("--chunksize", type=int, default=1000, help="Provide how much replays are to be processed at once.")
parser.add_argument("--use_multiprocessing", type=bool, default=True, help="Set this flag to true if You would like to use multiprocessing.")
parser.add_argument("--anonymize_toon", type=bool, default=True, help="Set this flag to true if You would like to perform toon/nickname anonymization.")
parser.add_argument("--anonymize_chat", type=bool, default=True, help="Set this flag to true if You would like to perform chat anonymization.")
args = parser.parse_args()
# Setting up logging:
logging.basicConfig(level=logging.DEBUG, format=LOGGING_FORMAT)
# Starting the client:
start_processing(args_replay_directory=args.input_dir,
args_output_directory=args.output_dir,
args_agents=args.agents,
args_chunksize=args.chunksize,
args_multiprocessing=args.use_multiprocessing,
args_anonymize_toon=args.anonymize_toon,
args_anonymize_chat=args.anonymize_chat)