-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcrawly.py
163 lines (121 loc) · 5.1 KB
/
crawly.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# -*- coding: utf-8 -*-
# Heavily based on https://gist.github.com/996120
import os
import sys
import csv
import requests
from urlparse import urlsplit, urljoin
import lxml.html
from item import Item
from request import Request
from response import Response
import gevent
from gevent import monkey, queue, Greenlet, pool, event
monkey.patch_all(thread=False)
import traceback
import logging
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger('crawly')
class Crawly(object):
# Based on gcrawler
def __init__(self, start_urls = [], timeout = 2, worker_count = 10, pipeline_size = 100, csv_file = None):
self.timeout = timeout
self.count = worker_count
self.in_queue = queue.Queue() # Queue of URL's to be scheduled
self.out_queue = queue.Queue(pipeline_size) # Queue to the pipeline thread
self.worker_pool = pool.Pool(worker_count)
self.seen_requests = set()
self.allowed_domains = set()
self.worker_finished = event.Event()
for url in start_urls:
self.allowed_domains.add( urlsplit(url).hostname )
self.add_request(url)
log.debug("Allowed domains: %s" %self.allowed_domains)
if csv_file is not None:
self.csv_file = csv.DictWriter(open(csv_file, 'wb'))
else:
self.csv_file = None
# Utilize this method to add new URLs to be parsed, do not manually add items to in_queue!
def add_request(self, req):
if isinstance(req, basestring):
req = Request(req)
if req in self.seen_requests:
return
self.seen_requests.add(req)
log.debug("Adding request: %s" % req)
self.in_queue.put(req)
def start(self):
# Start scheduler
self.scheduler_greenlet = gevent.spawn(self.scheduler)
# Start pipeline
self.pipeline_greenlet = gevent.spawn(self.pipeline)
# Wait until scheduler is complete
self.scheduler_greenlet.join()
def scheduler(self):
log = logging.getLogger('crawly.scheduler')
while True:
# Remove dead greenlets
for thread in list(self.worker_pool):
if thread.dead:
self.worker_pool.discard(thread)
# Assign requests
try:
req = self.in_queue.get_nowait()
log.debug("Fetching request from in_queue")
# If we have no more requests in queue
except queue.Empty:
log.debug("No requests remaining!")
# Check and wait for existing threads to complete
if self.worker_pool.free_count() != self.worker_pool.size:
log.debug("%d workers remaining, waiting.." %( self.worker_pool.size - self.worker_pool.free_count() ))
self.worker_finished.wait()
self.worker_finished.clear()
continue # Keep looping until
else:
log.info("No workers left, shutting down!")
return self.shutdown()
# Check to make sure the request URL falls under our allowed domains
if any(urlsplit(req.url).hostname == domain for domain in self.allowed_domains) is False:
log.debug("Request hostname (%s) does not fall in the allowed domains: %s" %(urlsplit(req.url).hostname, self.allowed_domains))
return
# Spawn worker to handle the request
self.worker_pool.spawn(self.worker, req)
def shutdown(self):
self.worker_pool.join() # Should not block
self.out_queue.put(StopIteration) # Notify pipeline thread that we are at the end
self.pipeline_greenlet.join() # Wait until pipeline finishes processing the last data, StopIteration
return True
def worker(self, req):
log = logging.getLogger('crawly.worker')
try:
resp = requests.get(req.url, timeout=self.timeout)
# Update the request
req.response = Response(resp)
except Exception, e:
log.error("Error fetching: %s\n\t%s" % (req, e))
raise gevent.GreenletExit("error")
finally:
self.worker_finished.set()
# Put response on pipeline queue
self.out_queue.put(req)
log.debug("Fetched: %s" % req)
raise gevent.GreenletExit("success")
def pipeline(self):
log = logging.getLogger('crawly.pipeline')
# Post processing
for req in self.out_queue: # keep going until we hit StopIteration
try:
# Pass the request & response to helper function to be processed
self.process(req)
except:
log.error("Error:\n%s" % traceback.format_exc())
log.debug("Pipeline complete!")
def process(self, req):
resp = req.response
root = lxml.html.fromstring(resp.data)
item = Item()
item['title'] = root.xpath("//title/text()")[0].strip()
log.debug('Item: %s' %item)
self.write_csv(item)
def write_csv(self, item):
pass