Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: parse_robots_txt #205

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 95 additions & 42 deletions src/web_analysis/parse_robots.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import argparse
import io
import gzip
import json
from collections import defaultdict
Expand All @@ -8,25 +7,46 @@

def parse_robots_txt(robots_txt):
"""Parses the robots.txt to create a map of agents, sitemaps, and parsing oddities."""
rules = {"ERRORS": []}
rules = {"ERRORS": [], "Sitemaps": []}
agent_map = {}
current_agents = []

for line in robots_txt.splitlines():
line = line.strip()
if line.startswith('User-agent:'):
agent = line.split(':', 1)[1].strip()
if agent not in rules:
rules[agent] = defaultdict(list)
current_agents = [agent]
elif line.startswith(('Allow:', 'Disallow:', 'Crawl-delay:')) and current_agents:

for raw_line in robots_txt.splitlines():
line = raw_line.split("#", 1)[0].strip()
if not line:
continue
lower_line = line.lower()

if lower_line.startswith("user-agent:"):
agent_name_raw = line.split(":", 1)[1].strip()
agent_name = agent_name_raw.lower()

if agent_name not in agent_map:
agent_map[agent_name] = agent_name_raw
rules[agent_name_raw] = defaultdict(list)
current_agents.append(agent_name)

elif any(
lower_line.startswith(d + ":") for d in ["allow", "disallow", "crawl-delay"]
):
if not current_agents:
rules["ERRORS"].append(
f"Directive '{line}' found with no preceding user-agent."
)
continue
directive_name = lower_line.split(":", 1)[0].strip()
directive_value = line.split(":", 1)[1].strip()
directive_key = directive_name.capitalize()
for agent in current_agents:
rules[agent][line.split(":")[0]].append(":".join(line.split(":")[1:]).strip())
elif line.lower().startswith('sitemap:'):
rules.setdefault('Sitemaps', []).append(line.split(':', 1)[1].strip())
elif line == "" or line.startswith('#'):
current_agents = []
original_name = agent_map[agent]
rules[original_name][directive_key].append(directive_value)

elif lower_line.startswith("sitemap:"):
sitemap_value = line.split(":", 1)[1].strip()
rules["Sitemaps"].append(sitemap_value)
else:
rules["ERRORS"].append(f"Unmatched line: {line}")
rules["ERRORS"].append(f"Unmatched line: {raw_line}")

return rules


Expand All @@ -39,7 +59,11 @@ def parallel_parse_robots(data):
url_to_rules = {url: {} for url, txt in data.items() if not txt}
# interpret the robots.txt
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_url = {executor.submit(parse_robots_txt, txt): url for url, txt in data.items() if txt}
future_to_url = {
executor.submit(parse_robots_txt, txt): url
for url, txt in data.items()
if txt
}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
Expand All @@ -54,9 +78,13 @@ def interpret_agent(rules):
agent_disallow = [x for x in rules.get("Disallow", []) if "?" not in x]
agent_allow = [x for x in rules.get("Allow", []) if "?" not in x]

if len(agent_disallow) == 0 or agent_disallow == [""] or (agent_allow == agent_disallow):
if (
len(agent_disallow) == 0
or agent_disallow == [""]
or (agent_allow == agent_disallow)
):
disallow_type = "none"
elif any('/' == x.strip() for x in agent_disallow) and len(agent_allow) == 0:
elif any("/" == x.strip() for x in agent_disallow) and len(agent_allow) == 0:
disallow_type = "all"
else:
disallow_type = "some"
Expand All @@ -65,7 +93,7 @@ def interpret_agent(rules):


def interpret_robots(agent_rules, all_agents):
"""Given the robots.txt agent rules, and a list of relevant agents
"""Given the robots.txt agent rules, and a list of relevant agents
(a superset of the robots.txt), determine whether they are:

(1) blocked from scraping all parts of the website
Expand All @@ -80,20 +108,22 @@ def interpret_robots(agent_rules, all_agents):

for agent in all_agents:
rule = agent_rules.get(agent)
judgement = interpret_agent(rule) if rule is not None else agent_to_judgement["*"]
judgement = (
interpret_agent(rule) if rule is not None else agent_to_judgement["*"]
)
agent_to_judgement[agent] = judgement

return agent_to_judgement


def aggregate_robots(url_to_rules, all_agents):
"""Across all robots.txt, determine basic stats:
(1) For each agent, how often it is explicitly mentioned
(2) For each agent, how often it is blocked from scraping all parts of the website
(3) For each agent, how often it is blocked from scraping part of the website
(4) For each agent, how often it is NOT blocked from scraping at all
"""
robots_stats = defaultdict(lambda: {'counter': 0, 'all': 0, 'some': 0, 'none': 0})
robots_stats = defaultdict(lambda: {"counter": 0, "all": 0, "some": 0, "none": 0})
url_decisions = {}

for url, robots in url_to_rules.items():
Expand All @@ -103,11 +133,11 @@ def aggregate_robots(url_to_rules, all_agents):
for agent in all_agents + ["*"]:
judgement = agent_to_judgement[agent]
robots_stats[agent][judgement] += 1
if agent in robots: # Missing robots.txt means nothing blocked
if agent in robots: # Missing robots.txt means nothing blocked
robots_stats[agent]["counter"] += 1

# See if *All Agents* are blocked on all content,
# or at least some agents can access some or more content, or
# See if *All Agents* are blocked on all content,
# or at least some agents can access some or more content, or
# there are no blocks on any agents at all.
if all(v == "all" for v in agent_to_judgement.values()):
agg_judgement = "all"
Expand All @@ -118,14 +148,15 @@ def aggregate_robots(url_to_rules, all_agents):
robots_stats["*All Agents*"]["counter"] += 1
robots_stats["*All Agents*"][agg_judgement] += 1
url_decisions[url]["*All Agents*"] = agg_judgement

return robots_stats, url_decisions


def read_robots_file(file_path):
with gzip.open(file_path, 'rt') as file:
with gzip.open(file_path, "rt") as file:
return json.load(file)


def analyze_robots(data):
"""Takes in {URL --> robots text}

Expand All @@ -136,28 +167,44 @@ def analyze_robots(data):
`some` is how many times its had some paths blocked
`none` is how many times its had none paths blocked
url_interpretations: {URL --> {Agent --> <all/some/none>}}
Only includes agents seen at this URL.
Only includes agents seen at this URL.
Agents not seen at this URL will inherit "*" rules, or `none` at all.
"""
url_to_rules = parallel_parse_robots(data)
# Collect all agents
all_agents = list(set([agent for url, rules in url_to_rules.items()
for agent, _ in rules.items() if agent not in ["ERRORS", "Sitemaps", "*"]]))
all_agents = list(
set(
[
agent
for url, rules in url_to_rules.items()
for agent, _ in rules.items()
if agent not in ["ERRORS", "Sitemaps", "*"]
]
)
)

robots_stats, url_decisions = aggregate_robots(url_to_rules, all_agents)

# URL --> agents in its robots.txt and their decisions (all/some/none).
url_interpretations = {k:
{agent: v for agent, v in vs.items() if agent not in ["ERRORS", "Sitemaps"] and agent in list(url_to_rules[k]) + ["*All Agents*"]}
for k, vs in url_decisions.items()}
url_interpretations = {
k: {
agent: v
for agent, v in vs.items()
if agent not in ["ERRORS", "Sitemaps"]
and agent in list(url_to_rules[k]) + ["*All Agents*"]
}
for k, vs in url_decisions.items()
}

return robots_stats, url_interpretations


def main(args):
data = read_robots_file(args.file_path)
print(f"Total URLs: {len(data)}")
print(f"URLs with robots.txt: {sum(1 for robots_txt in data.values() if robots_txt)}")
print(
f"URLs with robots.txt: {sum(1 for robots_txt in data.values() if robots_txt)}"
)

robots_stats, url_interpretations = analyze_robots(data)

Expand All @@ -168,14 +215,20 @@ def main(args):
# import pdb; pdb.set_trace()
# print(url_interpretations["http://www.machinenoveltranslation.com/robots.txt"])

sorted_robots_stats = sorted(robots_stats.items(), key=lambda x: x[1]['counter'] / len(data) if len(data) > 0 else 0, reverse=True)
sorted_robots_stats = sorted(
robots_stats.items(),
key=lambda x: x[1]["counter"] / len(data) if len(data) > 0 else 0,
reverse=True,
)

print(f"{'Agent':<30} {'Observed': <10} {'Blocked All': >15} {'Blocked Some': >15} {'Blocked None': >15}")
print(
f"{'Agent':<30} {'Observed': <10} {'Blocked All': >15} {'Blocked Some': >15} {'Blocked None': >15}"
)
for i, (agent, stats) in enumerate(sorted_robots_stats):
counter_pct = stats['counter'] / len(data) * 100 if len(data) > 0 else 0
all_pct = stats['all'] / len(data) * 100 if len(data) > 0 else 0
some_pct = stats['some'] / len(data) * 100 if len(data) > 0 else 0
none_pct = stats['none'] / len(data) * 100 if len(data) > 0 else 0
counter_pct = stats["counter"] / len(data) * 100 if len(data) > 0 else 0
all_pct = stats["all"] / len(data) * 100 if len(data) > 0 else 0
some_pct = stats["some"] / len(data) * 100 if len(data) > 0 else 0
none_pct = stats["none"] / len(data) * 100 if len(data) > 0 else 0
print_outs = [
f"{agent:<20}",
f"{stats['counter']:>5} ({counter_pct:5.1f}%) {'':>5} ",
Expand Down
Loading