forked from Animenosekai/japanterebi-xmltv
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfilter.py
154 lines (140 loc) · 4.56 KB
/
filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""Filter channels from the iptv-org/database repository"""
import argparse
import datetime
import json
import pathlib
import typing
import tqdm
from model import Channel
def read_file(file_path: pathlib.Path) -> typing.Iterable[Channel]:
"""
Read a file and return its content as a list of strings.
Parameters
----------
file_path: Path | str
The path to the file.
Returns
-------
Iterable
Generator
list
The content of the file as a list of strings.
"""
with file_path.open() as file:
file.readline()
for line in file:
line = line.strip()
(
id,
name,
alt_names,
network,
owners,
country,
subdivision,
city,
broadcast_area,
languages,
categories,
is_nsfw,
launched,
closed,
replaced_by,
website,
logo,
) = line.split(",")
yield Channel(
id=id,
name=name,
alt_names=alt_names.split(";") if alt_names else [],
network=network or None,
owners=owners.split(";") if owners else [],
country=country,
subdivision=subdivision or None,
city=city or None,
broadcast_area=broadcast_area,
languages=languages.split(";") if languages else [],
categories=categories.split(";") if categories else [],
is_nsfw=is_nsfw == "TRUE",
launched=datetime.datetime.strptime(launched, "%Y-%m-%d")
if launched
else None,
closed=datetime.datetime.strptime(closed, "%Y-%m-%d")
if closed
else None,
replaced_by=replaced_by or None,
website=website or None,
logo=logo,
)
def main(
file_path: pathlib.Path,
languages: list[str],
countries: list[str],
categories: list[str],
add: list[str],
remove: list[str],
progress: bool = False,
) -> typing.Iterable[Channel]:
"""
The main function of the script.
Parameters
----------
file_path: Path
languages: list
countries: list
categories: list
add: list
remove: list
progress: bool, default = True
language: list
category: list
Returns
-------
Iterable
Generator
"""
for channel in tqdm.tqdm(read_file(file_path), disable=not progress):
if not channel.id in add:
if channel.id in remove:
continue
if not (
any((lang in channel.languages for lang in languages))
or any((cat in channel.categories for cat in categories))
or any((country == channel.country for country in countries))
):
continue
yield channel
def entry():
"""The entry point of the script."""
parser = argparse.ArgumentParser(prog="filter", description="Filter channels")
parser.add_argument("--language", help="The language of the channels", nargs="*")
parser.add_argument("--country", help="The country of the channels", nargs="*")
parser.add_argument("--category", help="The category of the channels", nargs="*")
parser.add_argument("--add", help="Add a channel to the list", nargs="*")
parser.add_argument("--remove", help="Remove a channel from the list", nargs="*")
parser.add_argument("--input", "-i", help="The input database file", required=True)
parser.add_argument(
"--minify", "-m", action="store_true", help="Minify the JSON result"
)
parser.add_argument("output", default="-", help="The output path", nargs="?")
args = parser.parse_args()
stdout = not (args.output and args.output != "-")
results = main(
file_path=pathlib.Path(args.input),
languages=args.language or [],
countries=args.country or [],
categories=args.category or [],
add=args.add or [],
remove=args.remove or [],
progress=not stdout,
)
extra_args = {"separators": (",", ":")} if args.minify else {"indent": 4}
encoded_result = json.dumps(
[result.as_dict for result in results], ensure_ascii=False, **extra_args
)
if stdout:
print(encoded_result)
else:
pathlib.Path(args.output).write_text(encoded_result)
if __name__ == "__main__":
entry()