From 285a69d0e2d0fcc2a3f769635831501c46722e33 Mon Sep 17 00:00:00 2001 From: Mikhail Korobov Date: Thu, 6 Jun 2013 06:44:42 +0600 Subject: [PATCH 01/14] WIP experiment: job scheduling is removed, more granular batches processing is added (they are deleted as soon as all their requests are processed, not when the spider finishes with a success). --- scrapylib/hcf.py | 217 +++++++++++++++++++++++------------------------ 1 file changed, 106 insertions(+), 111 deletions(-) diff --git a/scrapylib/hcf.py b/scrapylib/hcf.py index 7e1f95d..6299ef9 100644 --- a/scrapylib/hcf.py +++ b/scrapylib/hcf.py @@ -27,21 +27,6 @@ The default value is provided by the python-hubstorage package. - HS_MAX_LINKS - Number of links to be read from the HCF, the default is 1000. - - HS_START_JOB_ENABLED - Enable whether to start a new job when the spider - finishes. The default is False - - HS_START_JOB_ON_REASON - This is a list of closing reasons, if the spider ends - with any of these reasons a new job will be started - for the same slot. The default is ['finished'] - - HS_START_JOB_NEW_PANEL - If True the jobs will be started in the new panel. - The default is False. - - HS_NUMBER_OF_SLOTS - This is the number of slots that the middleware will - use to store the new links. The default is 8. - The next keys can be defined in a Request meta in order to control the behavior of the HCF middleware: @@ -57,29 +42,29 @@ ``response.meta['hcf_params']['qdata']``. The spider can override the default slot assignation function by setting the -spider slot_callback method to a function with the following signature: +spider slot_callback method to a function with the following signature:: def slot_callback(request): ... return slot """ -import hashlib import logging -from collections import defaultdict -from datetime import datetime -from scrapinghub import Connection from scrapy import signals, log -from scrapy.exceptions import NotConfigured +from scrapy.exceptions import NotConfigured, DontCloseSpider from scrapy.http import Request from hubstorage import HubstorageClient -DEFAULT_MAX_LINKS = 1000 -DEFAULT_HS_NUMBER_OF_SLOTS = 8 +# class HcfStoreStrategy(object): +# def __init__(self, hubstorage_client): +# self.client = hubstorage_client +# class HcfMiddleware(object): + PRIVATE_INFO_KEY = '__hcf_info__' + def __init__(self, crawler): self.crawler = crawler @@ -88,30 +73,15 @@ def __init__(self, crawler): self.hs_projectid = self._get_config(crawler, "HS_PROJECTID") self.hs_frontier = self._get_config(crawler, "HS_FRONTIER") self.hs_consume_from_slot = self._get_config(crawler, "HS_CONSUME_FROM_SLOT") - try: - self.hs_number_of_slots = int(crawler.settings.get("HS_NUMBER_OF_SLOTS", DEFAULT_HS_NUMBER_OF_SLOTS)) - except ValueError: - self.hs_number_of_slots = DEFAULT_HS_NUMBER_OF_SLOTS - try: - self.hs_max_links = int(crawler.settings.get("HS_MAX_LINKS", DEFAULT_MAX_LINKS)) - except ValueError: - self.hs_max_links = DEFAULT_MAX_LINKS - self.hs_start_job_enabled = crawler.settings.get("HS_START_JOB_ENABLED", False) - self.hs_start_job_on_reason = crawler.settings.get("HS_START_JOB_ON_REASON", ['finished']) - self.hs_start_job_new_panel = crawler.settings.get("HS_START_JOB_NEW_PANEL", False) - - if not self.hs_start_job_new_panel: - conn = Connection(self.hs_auth) - self.oldpanel_project = conn[self.hs_projectid] self.hsclient = HubstorageClient(auth=self.hs_auth, endpoint=self.hs_endpoint) self.project = self.hsclient.get_project(self.hs_projectid) self.fclient = self.project.frontier - self.new_links_count = defaultdict(int) - self.batch_ids = [] + self.batches = {} crawler.signals.connect(self.close_spider, signals.spider_closed) + crawler.signals.connect(self.idle_spider, signals.spider_idle) # Make sure the logger for hubstorage.batchuploader is configured logging.basicConfig() @@ -125,20 +95,60 @@ def _get_config(self, crawler, key): def _msg(self, msg, level=log.INFO): log.msg('(HCF) %s' % msg, level) - def _start_job(self, spider): - self._msg("Starting new job for: %s" % spider.name) - if self.hs_start_job_new_panel: - jobid = self.hsclient.start_job(projectid=self.hs_projectid, - spider=spider.name) - else: - jobid = self.oldpanel_project.schedule(spider.name, slot=self.hs_consume_from_slot, - dummy=datetime.now()) - self._msg("New job started: %s" % jobid) - @classmethod def from_crawler(cls, crawler): return cls(crawler) + def process_spider_output(self, response, result, spider): + # XXX: or maybe use process_spider_input? + # or process_spider_exception? + if self.PRIVATE_INFO_KEY in response.meta: + batch_id, fp = response.meta[self.PRIVATE_INFO_KEY] + self.batches[batch_id].remove(fp) + + for item in result: + if not (isinstance(item, Request) and item.meta.get('use_hcf', False)): + yield item + continue + + if not self._could_be_enqueued(item): + yield item + continue + + self._enqueue_request(item, spider) + + def _could_be_enqueued(self, request): + """ + Return True if a request could be enqueued; + return False and log an error otherwise. + """ + if request.method != 'GET': + self._msg("'use_hcf' meta key is not supported " + "for non GET requests (%s)" % request.url, log.ERROR) + return False + + # TODO: more validation rules, + # e.g. for non-default callbacks and extra meta values. + + return True + + def _enqueue_request(self, request, spider): + """ Put request to HCF queue. """ + + slot_callback = getattr(spider, 'slot_callback', self._get_slot) + slot = slot_callback(request) + + hcf_params = request.meta.get('hcf_params') + fp = {'fp': request.url} + if hcf_params: + fp.update(hcf_params) + + # This is not necessarily a request to HCF because there is a batch + # uploader in python-hubstorage, so it is fine to send a single item + self.fclient.add(self.hs_frontier, slot, [fp]) + # self._msg("request enqueued to slot(%s)" % slot, log.DEBUG) + + def process_start_requests(self, start_requests, spider): self.hs_frontier = getattr(spider, 'hs_frontier', self.hs_frontier) @@ -154,89 +164,74 @@ def process_start_requests(self, start_requests, spider): # if there are no links in the hcf, use the start_requests # unless this is not the first job. - if not self.has_new_requests and not getattr(spider, 'dummy', None): + if not self.has_new_requests: self._msg('Using start_requests') for r in start_requests: yield r - def process_spider_output(self, response, result, spider): - slot_callback = getattr(spider, 'slot_callback', self._get_slot) - for item in result: - if isinstance(item, Request): - request = item - if request.meta.get('use_hcf', False): - if request.method == 'GET': # XXX: Only GET support for now. - slot = slot_callback(request) - hcf_params = request.meta.get('hcf_params') - fp = {'fp': request.url} - if hcf_params: - fp.update(hcf_params) - # Save the new links as soon as possible using - # the batch uploader - self.fclient.add(self.hs_frontier, slot, [fp]) - self.new_links_count[slot] += 1 - else: - self._msg("'use_hcf' meta key is not supported for non GET requests (%s)" % request.url, - log.ERROR) - yield request - else: - yield request - else: - yield item def close_spider(self, spider, reason): - # Only store the results if the spider finished normally, if it - # didn't finished properly there is not way to know whether all the url batches - # were processed and it is better not to delete them from the frontier - # (so they will be picked by another process). - if reason == 'finished': - self._save_new_links_count() - self._delete_processed_ids() - - # Close the frontier client in order to make sure that all the new links - # are stored. + # Close the frontier client in order to make sure that + # all the new links are stored. + self._delete_processed_batches() self.fclient.close() self.hsclient.close() - # If the reason is defined in the hs_start_job_on_reason list then start - # a new job right after this spider is finished. - if self.hs_start_job_enabled and reason in self.hs_start_job_on_reason: + def idle_spider(self, spider): + self.fclient.flush() + self._delete_processed_batches() - # Start the new job if this job had requests from the HCF or it - # was the first job. - if self.has_new_requests or not getattr(spider, 'dummy', None): - self._start_job(spider) + has_new_requests = False + for request in self._get_new_requests(): + self.crawler.engine.schedule(request, spider) + has_new_requests = True + + if has_new_requests: + raise DontCloseSpider() def _get_new_requests(self): """ Get a new batch of links from the HCF.""" num_batches = 0 num_links = 0 - for num_batches, batch in enumerate(self.fclient.read(self.hs_frontier, self.hs_consume_from_slot), 1): + + # TODO: hook + # e.g. it should be possible to use roundrobin itertools recipe + # for fetching batches from several slots + new_batches = self.fclient.read(self.hs_frontier, self.hs_consume_from_slot) + # === + + for num_batches, batch in enumerate(new_batches, 1): + assert batch['id'] not in self.batches + self.batches[batch['id']] = set(fp for fp, data in batch['requests']) + for fingerprint, data in batch['requests']: + # TODO: hook for custom Request instantiation + meta = { + self.PRIVATE_INFO_KEY: (batch['id'], fingerprint), + 'hcf_params': {'qdata': data}, + } + yield Request(url=fingerprint, meta=meta) + # === num_links += 1 - yield Request(url=fingerprint, meta={'hcf_params': {'qdata': data}}) - self.batch_ids.append(batch['id']) - if num_links >= self.hs_max_links: - break + self._msg('Read %d new batches from slot(%s)' % (num_batches, self.hs_consume_from_slot)) self._msg('Read %d new links from slot(%s)' % (num_links, self.hs_consume_from_slot)) - def _save_new_links_count(self): - """ Save the new extracted links into the HCF.""" - for slot, link_count in self.new_links_count.items(): - self._msg('Stored %d new links in slot(%s)' % (link_count, slot)) - self.new_links_count = defaultdict(list) + def _get_processed_batch_ids(self): + return [batch_id for batch_id in self.batches + if not self.batches[batch_id]] - def _delete_processed_ids(self): + def _delete_processed_batches(self): """ Delete in the HCF the ids of the processed batches.""" - self.fclient.delete(self.hs_frontier, self.hs_consume_from_slot, self.batch_ids) - self._msg('Deleted %d processed batches in slot(%s)' % (len(self.batch_ids), - self.hs_consume_from_slot)) - self.batch_ids = [] + print([len(self.batches[id_]) for id_ in self.batches]) + + ids = self._get_processed_batch_ids() + self.fclient.delete(self.hs_frontier, self.hs_consume_from_slot, ids) + self._msg('Deleted %d processed batches in slot(%s)' % ( + len(ids), self.hs_consume_from_slot)) + for batch_id in ids: + del self.batches[batch_id] def _get_slot(self, request): """ Determine to which slot should be saved the request.""" - md5 = hashlib.md5() - md5.update(request.url) - digest = md5.hexdigest() - return str(int(digest, 16) % self.hs_number_of_slots) + return '0' From 6f2f410b6e41398c79f1a2ca165f3071170785ed Mon Sep 17 00:00:00 2001 From: Mikhail Korobov Date: Fri, 7 Jun 2013 07:47:37 +0600 Subject: [PATCH 02/14] limit a number of maximum concurrent batches; use HCF for start_requests --- scrapylib/hcf.py | 136 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 107 insertions(+), 29 deletions(-) diff --git a/scrapylib/hcf.py b/scrapylib/hcf.py index 6299ef9..a66b110 100644 --- a/scrapylib/hcf.py +++ b/scrapylib/hcf.py @@ -26,6 +26,8 @@ HS_ENDPOINT - URL to the API endpoint, i.e: http://localhost:8003. The default value is provided by the python-hubstorage package. + HS_MAX_CONCURRENT_BATCHES - maximum number of concurrently processed + batches. The defaut is 10. The next keys can be defined in a Request meta in order to control the behavior of the HCF middleware: @@ -50,6 +52,8 @@ def slot_callback(request): """ import logging +import collections +import itertools from scrapy import signals, log from scrapy.exceptions import NotConfigured, DontCloseSpider from scrapy.http import Request @@ -73,13 +77,29 @@ def __init__(self, crawler): self.hs_projectid = self._get_config(crawler, "HS_PROJECTID") self.hs_frontier = self._get_config(crawler, "HS_FRONTIER") self.hs_consume_from_slot = self._get_config(crawler, "HS_CONSUME_FROM_SLOT") + self.hs_max_concurrent_batches = int(crawler.settings.get('HS_MAX_CONCURRENT_BATCHES', 10)) self.hsclient = HubstorageClient(auth=self.hs_auth, endpoint=self.hs_endpoint) self.project = self.hsclient.get_project(self.hs_projectid) self.fclient = self.project.frontier + # For better introspection keep track of both done and scheduled requests. self.batches = {} + # It is not possible to limit a number of batches received from HCF; + # to limit a number of concurrently processed batches there is a buffer. + # Limiting a number of batches processed concurrently is important + # because in case of error we reprocess batches that are partially + # complete, and all concurrently processed batches could be partially + # complete because we don't enforce sequential processing order. + # For example, if we have 1000 batches and each of them + # have 99 requests done, 1 unfinished then in case of error we'll + # need to reprocess all 99000 finished requests. + # Another way to solve this is to reschedule unfinished requests to + # new batches and mark all existing as processed. + self.batches_buffer = collections.deque() + self.seen_batch_ids = set() + crawler.signals.connect(self.close_spider, signals.spider_closed) crawler.signals.connect(self.idle_spider, signals.spider_idle) @@ -104,20 +124,34 @@ def process_spider_output(self, response, result, spider): # or process_spider_exception? if self.PRIVATE_INFO_KEY in response.meta: batch_id, fp = response.meta[self.PRIVATE_INFO_KEY] - self.batches[batch_id].remove(fp) + done, todo = self.batches[batch_id] + done.add(fp) + todo.remove(fp) + + for non_hcf_item in self._hcf_process_spider_result(result, spider): + yield non_hcf_item + def _hcf_process_spider_result(self, result, spider): + """ + Put all applicable Requests from ``result`` iterator to a HCF queue, + yield other objects. + """ + num_enqueued = 0 for item in result: - if not (isinstance(item, Request) and item.meta.get('use_hcf', False)): + if not self._is_hcf_request(item) or not self._valid_hcf_request(item): yield item continue + self._enqueue_request(item, spider) + num_enqueued += 1 - if not self._could_be_enqueued(item): - yield item - continue + if num_enqueued: + self._msg("%d requests are put to queue" % num_enqueued) - self._enqueue_request(item, spider) + def _is_hcf_request(self, item): + """ Return if an item is a request intended to be stored in HCF queue """ + return isinstance(item, Request) and item.meta.get('use_hcf', False) - def _could_be_enqueued(self, request): + def _valid_hcf_request(self, request): """ Return True if a request could be enqueued; return False and log an error otherwise. @@ -148,7 +182,6 @@ def _enqueue_request(self, request, spider): self.fclient.add(self.hs_frontier, slot, [fp]) # self._msg("request enqueued to slot(%s)" % slot, log.DEBUG) - def process_start_requests(self, start_requests, spider): self.hs_frontier = getattr(spider, 'hs_frontier', self.hs_frontier) @@ -163,12 +196,10 @@ def process_start_requests(self, start_requests, spider): yield req # if there are no links in the hcf, use the start_requests - # unless this is not the first job. if not self.has_new_requests: self._msg('Using start_requests') - for r in start_requests: - yield r - + for non_hcf_item in self._hcf_process_spider_result(start_requests, spider): + yield non_hcf_item def close_spider(self, spider, reason): # Close the frontier client in order to make sure that @@ -191,47 +222,94 @@ def idle_spider(self, spider): def _get_new_requests(self): """ Get a new batch of links from the HCF.""" - num_batches = 0 num_links = 0 - - # TODO: hook - # e.g. it should be possible to use roundrobin itertools recipe - # for fetching batches from several slots - new_batches = self.fclient.read(self.hs_frontier, self.hs_consume_from_slot) - # === + num_batches = 0 + new_batches = self._get_new_batches(self.hs_max_concurrent_batches) for num_batches, batch in enumerate(new_batches, 1): + self._msg("incoming batch: len=%d, id=%s" % (len(batch['requests']), batch['id'])) + assert batch['id'] not in self.batches - self.batches[batch['id']] = set(fp for fp, data in batch['requests']) + self.batches[batch['id']] = set(), set(fp for fp, data in batch['requests']) for fingerprint, data in batch['requests']: - # TODO: hook for custom Request instantiation + # TODO: hook for custom Request instantiation ========= meta = { self.PRIVATE_INFO_KEY: (batch['id'], fingerprint), 'hcf_params': {'qdata': data}, } yield Request(url=fingerprint, meta=meta) - # === + # ====================================================== num_links += 1 - self._msg('Read %d new batches from slot(%s)' % (num_batches, self.hs_consume_from_slot)) - self._msg('Read %d new links from slot(%s)' % (num_links, self.hs_consume_from_slot)) + self._msg('Read %d new links from %d batches, slot(%s)' % (num_links, num_batches, self.hs_consume_from_slot)) + self._msg('Current batches: %s' % self._get_batch_sizes()) + + def _get_new_batches(self, max_batches): + """ + Return at most ``max_batches``, fetching them from HCF if necessary. + """ + buffer_size = len(self.batches_buffer) + self._msg("Buffered batches: %d" % buffer_size) + + if len(self.batches_buffer) >= max_batches: + self._msg("Buffer has enough batches, no need to go to HCF") + for i in range(max_batches): + yield self.batches_buffer.popleft() + else: + # TODO: hook ===================== + # e.g. it should be possible to use roundrobin itertools recipe + # for fetching batches from several slots + new_batches = self.fclient.read(self.hs_frontier, self.hs_consume_from_slot) + # ================================ + + # HCF could return already buffered batches; remove them + new_batches_iter = (b for b in new_batches if b['id'] not in self.seen_batch_ids) + + # yield buffered batches first + combined_batches = itertools.islice( + itertools.chain( + (self.batches_buffer.popleft() for i in range(buffer_size)), + new_batches_iter + ), + max_batches + ) + + num_consumed = 0 + for num_consumed, batch in enumerate(combined_batches, 1): + self.seen_batch_ids.add(batch['id']) + yield batch + + # XXX: new_batches_iter must be generator for this to work properly + self.batches_buffer.extend(new_batches_iter) + + self.seen_batch_ids.update(b['id'] for b in self.batches_buffer) + num_read = len(self.batches_buffer) + num_consumed - buffer_size + self._msg('Read %d new batches from slot(%s)' % (num_read, self.hs_consume_from_slot)) def _get_processed_batch_ids(self): - return [batch_id for batch_id in self.batches - if not self.batches[batch_id]] + return [batch_id for batch_id, (done, todo) in self.batches.iteritems() if not todo] + + def _get_batch_sizes(self): + return [(len(done), len(todo)) for _, (done, todo) in self.batches.iteritems()] def _delete_processed_batches(self): """ Delete in the HCF the ids of the processed batches.""" - print([len(self.batches[id_]) for id_ in self.batches]) + self._msg("Deleting batches: %r" % self._get_batch_sizes()) ids = self._get_processed_batch_ids() self.fclient.delete(self.hs_frontier, self.hs_consume_from_slot, ids) - self._msg('Deleted %d processed batches in slot(%s)' % ( - len(ids), self.hs_consume_from_slot)) for batch_id in ids: del self.batches[batch_id] + self._msg('Deleted %d processed batches in slot(%s)' % ( + len(ids), self.hs_consume_from_slot)) + self._msg('%d remaining batches with %d remaining requests (and %d processed requests)' % ( + len(self.batches), + sum(len(todo) for _, (done, todo) in self.batches.iteritems()), + sum(len(done) for _, (done, todo) in self.batches.iteritems()), + )) + def _get_slot(self, request): """ Determine to which slot should be saved the request.""" return '0' From 40df7f49d364c56788de545e2c4ca15e5ffb43d1 Mon Sep 17 00:00:00 2001 From: Mikhail Korobov Date: Fri, 7 Jun 2013 08:14:52 +0600 Subject: [PATCH 03/14] use the same slot for input and output by default --- scrapylib/hcf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrapylib/hcf.py b/scrapylib/hcf.py index a66b110..183bc0a 100644 --- a/scrapylib/hcf.py +++ b/scrapylib/hcf.py @@ -312,4 +312,4 @@ def _delete_processed_batches(self): def _get_slot(self, request): """ Determine to which slot should be saved the request.""" - return '0' + return self.hs_consume_from_slot From 32b4a4a0ad7c81acc92d986a49351f49ad4debd8 Mon Sep 17 00:00:00 2001 From: Mikhail Korobov Date: Fri, 7 Jun 2013 21:37:46 +0600 Subject: [PATCH 04/14] Rename slot_callback to hs_slot_for_request --- scrapylib/hcf.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scrapylib/hcf.py b/scrapylib/hcf.py index 183bc0a..6abe926 100644 --- a/scrapylib/hcf.py +++ b/scrapylib/hcf.py @@ -46,9 +46,9 @@ The spider can override the default slot assignation function by setting the spider slot_callback method to a function with the following signature:: - def slot_callback(request): + def hs_slot_for_request(request): ... - return slot + return 'slot' """ import logging @@ -169,7 +169,7 @@ def _valid_hcf_request(self, request): def _enqueue_request(self, request, spider): """ Put request to HCF queue. """ - slot_callback = getattr(spider, 'slot_callback', self._get_slot) + slot_callback = getattr(spider, 'hs_slot_for_request', self._get_slot) slot = slot_callback(request) hcf_params = request.meta.get('hcf_params') From a634251a4af901521478f0c0639d150bb2dc2caa Mon Sep 17 00:00:00 2001 From: Mikhail Korobov Date: Sat, 8 Jun 2013 07:04:34 +0600 Subject: [PATCH 05/14] More experiments: * remove HS_FRONTIER and HS_CONSUME_FROM settings; * use spider name as frontier by default; * allow overriding of target slot and frontier using Request meta params; * custom request serialization/deserialization using spider methods; --- scrapylib/hcf.py | 190 ++++++++++++++++++++++++----------------------- 1 file changed, 97 insertions(+), 93 deletions(-) diff --git a/scrapylib/hcf.py b/scrapylib/hcf.py index 6abe926..33a0576 100644 --- a/scrapylib/hcf.py +++ b/scrapylib/hcf.py @@ -15,11 +15,6 @@ HS_AUTH - API key HS_PROJECTID - Project ID in the panel. - HS_FRONTIER - Frontier name. - HS_CONSUME_FROM_SLOT - Slot from where the spider will read new URLs. - -Note that HS_FRONTIER and HS_SLOT can be overriden from inside a spider using -the spider attributes: "hs_frontier" and "hs_consume_from_slot" respectively. The next optional settings can be defined: @@ -29,6 +24,13 @@ HS_MAX_CONCURRENT_BATCHES - maximum number of concurrently processed batches. The defaut is 10. +By default, middleware will use spider name as HCF frontier and '0' as slot +both for getting new requests from HCF and putting requests to HCF. +Default values can be overriden from inside a spider using the +spider attributes: "hs_frontier" and "hs_slot" respectively. It is also +possible to override target frontier and slot using Request meta +('hcf_slot' and 'hcf_frontier' keys). + The next keys can be defined in a Request meta in order to control the behavior of the HCF middleware: @@ -40,15 +42,30 @@ fdata data to be stored along with the fingerprint in the fingerprint set p Priority - lower priority numbers are returned first. The default is 0 + hcf_slot - If present, this slot is used for storing request in the HCF. + hcf_frontier - If present, this frontier is used for storing request + in the HCF. + The value of 'qdata' parameter could be retrieved later using ``response.meta['hcf_params']['qdata']``. -The spider can override the default slot assignation function by setting the -spider slot_callback method to a function with the following signature:: +The spider can override how requests are serialized and deserialized +for HCF by providing ``hs_make_request`` and/or ``hs_serialize_request`` +methods with the following signatures:: + + def hs_make_request(self, fingerprint, data, batch_id): + # ... + return Request(..) - def hs_slot_for_request(request): - ... - return 'slot' + def hs_serialize_request(self, request): + # ... + return {...} + +This may be useful if your fingerprints are not URLs or you want to +customize the process for other reasons (e.g. to make use_hcf flag unnecessary). +If your ``hs_serialize_request`` decided not to serialize a request +(or can't serialize it) then return request unchanged - it will be scheduled +without HCF. """ import logging @@ -60,11 +77,6 @@ def hs_slot_for_request(request): from hubstorage import HubstorageClient -# class HcfStoreStrategy(object): -# def __init__(self, hubstorage_client): -# self.client = hubstorage_client -# - class HcfMiddleware(object): PRIVATE_INFO_KEY = '__hcf_info__' @@ -75,8 +87,6 @@ def __init__(self, crawler): self.hs_endpoint = crawler.settings.get("HS_ENDPOINT") self.hs_auth = self._get_config(crawler, "HS_AUTH") self.hs_projectid = self._get_config(crawler, "HS_PROJECTID") - self.hs_frontier = self._get_config(crawler, "HS_FRONTIER") - self.hs_consume_from_slot = self._get_config(crawler, "HS_CONSUME_FROM_SLOT") self.hs_max_concurrent_batches = int(crawler.settings.get('HS_MAX_CONCURRENT_BATCHES', 10)) self.hsclient = HubstorageClient(auth=self.hs_auth, endpoint=self.hs_endpoint) @@ -98,7 +108,7 @@ def __init__(self, crawler): # Another way to solve this is to reschedule unfinished requests to # new batches and mark all existing as processed. self.batches_buffer = collections.deque() - self.seen_batch_ids = set() + self.seen_batch_ids = set() # XXX: are batch ids globally unique? crawler.signals.connect(self.close_spider, signals.spider_closed) crawler.signals.connect(self.idle_spider, signals.spider_idle) @@ -119,9 +129,27 @@ def _msg(self, msg, level=log.INFO): def from_crawler(cls, crawler): return cls(crawler) + def process_start_requests(self, start_requests, spider): + # XXX: Running this middleware for several spiders concurrently + # is not supported; multiple input slots/frontiers are also unsupported + # (they complicate e.g. batch removing) + self.hs_consume_from_frontier = getattr(spider, 'hs_frontier', spider.name) + self.hs_consume_from_slot = getattr(spider, 'hs_slot', '0') + self._msg('Input frontier: %s' % self.hs_consume_from_frontier) + self._msg('Input slot: %s' % self.hs_consume_from_slot) + + self.has_new_requests = False + for req in self._get_new_requests(spider): + self.has_new_requests = True + yield req + + # if there are no links in the hcf, use the start_requests + if not self.has_new_requests: + self._msg('Using start_requests') + for non_hcf_item in self._hcf_process_spider_result(start_requests, spider): + yield non_hcf_item + def process_spider_output(self, response, result, spider): - # XXX: or maybe use process_spider_input? - # or process_spider_exception? if self.PRIVATE_INFO_KEY in response.meta: batch_id, fp = response.meta[self.PRIVATE_INFO_KEY] done, todo = self.batches[batch_id] @@ -136,70 +164,50 @@ def _hcf_process_spider_result(self, result, spider): Put all applicable Requests from ``result`` iterator to a HCF queue, yield other objects. """ + serialize = getattr(spider, 'hs_serialize_request', self._serialize_request) num_enqueued = 0 - for item in result: - if not self._is_hcf_request(item) or not self._valid_hcf_request(item): - yield item + for request in result: + if not isinstance(request, Request): # item or None + yield request continue - self._enqueue_request(item, spider) + + data = serialize(request) + if isinstance(data, Request): + # this is a standard non-HCF request or serialization failed + yield data + continue + + frontier, slot = self._get_output_hcf_path(request) + self.fclient.add(frontier, slot, [data]) num_enqueued += 1 if num_enqueued: self._msg("%d requests are put to queue" % num_enqueued) - def _is_hcf_request(self, item): - """ Return if an item is a request intended to be stored in HCF queue """ - return isinstance(item, Request) and item.meta.get('use_hcf', False) + def _get_output_hcf_path(self, request): + """ Determine to which frontier and slot should be saved the request. """ + frontier = request.meta.get('hcf_frontier', self.hs_consume_from_frontier) + slot = request.meta.get('hcf_slot', self.hs_consume_from_slot) + return frontier, slot + + def _serialize_request(self, request): + if not request.meta.get('use_hcf', False): + # standard request + return request - def _valid_hcf_request(self, request): - """ - Return True if a request could be enqueued; - return False and log an error otherwise. - """ if request.method != 'GET': self._msg("'use_hcf' meta key is not supported " "for non GET requests (%s)" % request.url, log.ERROR) - return False - - # TODO: more validation rules, - # e.g. for non-default callbacks and extra meta values. - - return True - - def _enqueue_request(self, request, spider): - """ Put request to HCF queue. """ - - slot_callback = getattr(spider, 'hs_slot_for_request', self._get_slot) - slot = slot_callback(request) + return request + # TODO: more validation rules? + # e.g. for non-default callbacks and extra meta values + # which are not supported by this default serialization function hcf_params = request.meta.get('hcf_params') - fp = {'fp': request.url} + data = {'fp': request.url} if hcf_params: - fp.update(hcf_params) - - # This is not necessarily a request to HCF because there is a batch - # uploader in python-hubstorage, so it is fine to send a single item - self.fclient.add(self.hs_frontier, slot, [fp]) - # self._msg("request enqueued to slot(%s)" % slot, log.DEBUG) - - def process_start_requests(self, start_requests, spider): - - self.hs_frontier = getattr(spider, 'hs_frontier', self.hs_frontier) - self._msg('Using HS_FRONTIER=%s' % self.hs_frontier) - - self.hs_consume_from_slot = getattr(spider, 'hs_consume_from_slot', self.hs_consume_from_slot) - self._msg('Using HS_CONSUME_FROM_SLOT=%s' % self.hs_consume_from_slot) - - self.has_new_requests = False - for req in self._get_new_requests(): - self.has_new_requests = True - yield req - - # if there are no links in the hcf, use the start_requests - if not self.has_new_requests: - self._msg('Using start_requests') - for non_hcf_item in self._hcf_process_spider_result(start_requests, spider): - yield non_hcf_item + data.update(hcf_params) + return data def close_spider(self, spider, reason): # Close the frontier client in order to make sure that @@ -213,38 +221,39 @@ def idle_spider(self, spider): self._delete_processed_batches() has_new_requests = False - for request in self._get_new_requests(): + for request in self._get_new_requests(spider): self.crawler.engine.schedule(request, spider) has_new_requests = True if has_new_requests: raise DontCloseSpider() - def _get_new_requests(self): + def _get_new_requests(self, spider): """ Get a new batch of links from the HCF.""" num_links = 0 num_batches = 0 - new_batches = self._get_new_batches(self.hs_max_concurrent_batches) + make_request = getattr(spider, 'hs_make_request', self._hs_make_request) + new_batches = self._get_new_batches(self.hs_max_concurrent_batches) for num_batches, batch in enumerate(new_batches, 1): self._msg("incoming batch: len=%d, id=%s" % (len(batch['requests']), batch['id'])) assert batch['id'] not in self.batches - self.batches[batch['id']] = set(), set(fp for fp, data in batch['requests']) + done, todo = set(), set(fp for fp, data in batch['requests']) + self.batches[batch['id']] = done, todo for fingerprint, data in batch['requests']: - # TODO: hook for custom Request instantiation ========= - meta = { - self.PRIVATE_INFO_KEY: (batch['id'], fingerprint), - 'hcf_params': {'qdata': data}, - } - yield Request(url=fingerprint, meta=meta) - # ====================================================== + request = make_request(fingerprint, data, batch['id']) + request.meta.setdefault(self.PRIVATE_INFO_KEY, (batch['id'], fingerprint)) + yield request num_links += 1 self._msg('Read %d new links from %d batches, slot(%s)' % (num_links, num_batches, self.hs_consume_from_slot)) self._msg('Current batches: %s' % self._get_batch_sizes()) + def _hs_make_request(self, fingerprint, data, batch_id): + return Request(url=fingerprint, meta={'hcf_params': {'qdata': data}}) + def _get_new_batches(self, max_batches): """ Return at most ``max_batches``, fetching them from HCF if necessary. @@ -260,7 +269,7 @@ def _get_new_batches(self, max_batches): # TODO: hook ===================== # e.g. it should be possible to use roundrobin itertools recipe # for fetching batches from several slots - new_batches = self.fclient.read(self.hs_frontier, self.hs_consume_from_slot) + new_batches = self.fclient.read(self.hs_consume_from_frontier, self.hs_consume_from_slot) # ================================ # HCF could return already buffered batches; remove them @@ -287,18 +296,11 @@ def _get_new_batches(self, max_batches): num_read = len(self.batches_buffer) + num_consumed - buffer_size self._msg('Read %d new batches from slot(%s)' % (num_read, self.hs_consume_from_slot)) - def _get_processed_batch_ids(self): - return [batch_id for batch_id, (done, todo) in self.batches.iteritems() if not todo] - - def _get_batch_sizes(self): - return [(len(done), len(todo)) for _, (done, todo) in self.batches.iteritems()] - def _delete_processed_batches(self): """ Delete in the HCF the ids of the processed batches.""" self._msg("Deleting batches: %r" % self._get_batch_sizes()) - ids = self._get_processed_batch_ids() - self.fclient.delete(self.hs_frontier, self.hs_consume_from_slot, ids) + self.fclient.delete(self.hs_consume_from_frontier, self.hs_consume_from_slot, ids) for batch_id in ids: del self.batches[batch_id] @@ -310,6 +312,8 @@ def _delete_processed_batches(self): sum(len(done) for _, (done, todo) in self.batches.iteritems()), )) - def _get_slot(self, request): - """ Determine to which slot should be saved the request.""" - return self.hs_consume_from_slot + def _get_processed_batch_ids(self): + return [batch_id for batch_id, (done, todo) in self.batches.iteritems() if not todo] + + def _get_batch_sizes(self): + return [(len(done), len(todo)) for _, (done, todo) in self.batches.iteritems()] From c0d007aa51101fd37dda19c5373f130f622e63c3 Mon Sep 17 00:00:00 2001 From: Mikhail Korobov Date: Sat, 8 Jun 2013 07:27:45 +0600 Subject: [PATCH 06/14] rename hs_make_request to hs_deserialize_request and change its signature for symmetry with hs_serialize_request --- scrapylib/hcf.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/scrapylib/hcf.py b/scrapylib/hcf.py index 33a0576..8d7cbf1 100644 --- a/scrapylib/hcf.py +++ b/scrapylib/hcf.py @@ -53,13 +53,14 @@ for HCF by providing ``hs_make_request`` and/or ``hs_serialize_request`` methods with the following signatures:: - def hs_make_request(self, fingerprint, data, batch_id): + def hs_deserialize_request(self, hcf_params, batch_id): # ... - return Request(..) + # url = hcf_params['fp'] + # return Request(url) def hs_serialize_request(self, request): # ... - return {...} + # return {'fp': request.url, 'qdata': {...} } This may be useful if your fingerprints are not URLs or you want to customize the process for other reasons (e.g. to make use_hcf flag unnecessary). @@ -203,12 +204,16 @@ def _serialize_request(self, request): # e.g. for non-default callbacks and extra meta values # which are not supported by this default serialization function - hcf_params = request.meta.get('hcf_params') data = {'fp': request.url} - if hcf_params: - data.update(hcf_params) + data.update(request.meta.get('hcf_params', {})) return data + def _hs_deserialize_request(self, hcf_params, batch_id): + return Request( + url=hcf_params['fp'], + meta={'hcf_params': hcf_params} + ) + def close_spider(self, spider, reason): # Close the frontier client in order to make sure that # all the new links are stored. @@ -232,7 +237,7 @@ def _get_new_requests(self, spider): """ Get a new batch of links from the HCF.""" num_links = 0 num_batches = 0 - make_request = getattr(spider, 'hs_make_request', self._hs_make_request) + make_request = getattr(spider, 'hs_deserialize_request', self._hs_deserialize_request) new_batches = self._get_new_batches(self.hs_max_concurrent_batches) for num_batches, batch in enumerate(new_batches, 1): @@ -242,18 +247,15 @@ def _get_new_requests(self, spider): done, todo = set(), set(fp for fp, data in batch['requests']) self.batches[batch['id']] = done, todo - for fingerprint, data in batch['requests']: - request = make_request(fingerprint, data, batch['id']) - request.meta.setdefault(self.PRIVATE_INFO_KEY, (batch['id'], fingerprint)) + for fp, qdata in batch['requests']: + request = make_request({'fp': fp, 'qdata': qdata}, batch['id']) + request.meta.setdefault(self.PRIVATE_INFO_KEY, (batch['id'], fp)) yield request num_links += 1 self._msg('Read %d new links from %d batches, slot(%s)' % (num_links, num_batches, self.hs_consume_from_slot)) self._msg('Current batches: %s' % self._get_batch_sizes()) - def _hs_make_request(self, fingerprint, data, batch_id): - return Request(url=fingerprint, meta={'hcf_params': {'qdata': data}}) - def _get_new_batches(self, max_batches): """ Return at most ``max_batches``, fetching them from HCF if necessary. From c3f722528101fe15c126ebd8203612ebe9326dac Mon Sep 17 00:00:00 2001 From: Mikhail Korobov Date: Sat, 8 Jun 2013 07:48:45 +0600 Subject: [PATCH 07/14] hs_ vs hcf_ --- scrapylib/hcf.py | 50 +++++++++++++++++++++++++----------------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/scrapylib/hcf.py b/scrapylib/hcf.py index 8d7cbf1..e0e39d4 100644 --- a/scrapylib/hcf.py +++ b/scrapylib/hcf.py @@ -21,13 +21,13 @@ HS_ENDPOINT - URL to the API endpoint, i.e: http://localhost:8003. The default value is provided by the python-hubstorage package. - HS_MAX_CONCURRENT_BATCHES - maximum number of concurrently processed + HCF_MAX_CONCURRENT_BATCHES - maximum number of concurrently processed batches. The defaut is 10. By default, middleware will use spider name as HCF frontier and '0' as slot both for getting new requests from HCF and putting requests to HCF. Default values can be overriden from inside a spider using the -spider attributes: "hs_frontier" and "hs_slot" respectively. It is also +spider attributes: "hcf_frontier" and "hcf_slot" respectively. It is also possible to override target frontier and slot using Request meta ('hcf_slot' and 'hcf_frontier' keys). @@ -50,21 +50,23 @@ ``response.meta['hcf_params']['qdata']``. The spider can override how requests are serialized and deserialized -for HCF by providing ``hs_make_request`` and/or ``hs_serialize_request`` +for HCF by providing ``hcf_make_request`` and/or ``hcf_serialize_request`` methods with the following signatures:: - def hs_deserialize_request(self, hcf_params, batch_id): + def hcf_deserialize_request(self, hcf_params, batch_id): # ... # url = hcf_params['fp'] # return Request(url) - def hs_serialize_request(self, request): + def hcf_serialize_request(self, request): + # if not request.meta.get('use_hcf', False): + # return request # ... # return {'fp': request.url, 'qdata': {...} } This may be useful if your fingerprints are not URLs or you want to -customize the process for other reasons (e.g. to make use_hcf flag unnecessary). -If your ``hs_serialize_request`` decided not to serialize a request +customize the process for other reasons (e.g. to make "use_hcf" flag unnecessary). +If your ``hcf_serialize_request`` decided not to serialize a request (or can't serialize it) then return request unchanged - it will be scheduled without HCF. @@ -88,7 +90,7 @@ def __init__(self, crawler): self.hs_endpoint = crawler.settings.get("HS_ENDPOINT") self.hs_auth = self._get_config(crawler, "HS_AUTH") self.hs_projectid = self._get_config(crawler, "HS_PROJECTID") - self.hs_max_concurrent_batches = int(crawler.settings.get('HS_MAX_CONCURRENT_BATCHES', 10)) + self.hcf_max_concurrent_batches = int(crawler.settings.get('HCF_MAX_CONCURRENT_BATCHES', 10)) self.hsclient = HubstorageClient(auth=self.hs_auth, endpoint=self.hs_endpoint) self.project = self.hsclient.get_project(self.hs_projectid) @@ -134,10 +136,10 @@ def process_start_requests(self, start_requests, spider): # XXX: Running this middleware for several spiders concurrently # is not supported; multiple input slots/frontiers are also unsupported # (they complicate e.g. batch removing) - self.hs_consume_from_frontier = getattr(spider, 'hs_frontier', spider.name) - self.hs_consume_from_slot = getattr(spider, 'hs_slot', '0') - self._msg('Input frontier: %s' % self.hs_consume_from_frontier) - self._msg('Input slot: %s' % self.hs_consume_from_slot) + self.consume_from_frontier = getattr(spider, 'hcf_frontier', spider.name) + self.consume_from_slot = getattr(spider, 'hcf_slot', '0') + self._msg('Input frontier: %s' % self.consume_from_frontier) + self._msg('Input slot: %s' % self.consume_from_slot) self.has_new_requests = False for req in self._get_new_requests(spider): @@ -165,7 +167,7 @@ def _hcf_process_spider_result(self, result, spider): Put all applicable Requests from ``result`` iterator to a HCF queue, yield other objects. """ - serialize = getattr(spider, 'hs_serialize_request', self._serialize_request) + serialize = getattr(spider, 'hcf_serialize_request', self._serialize_request) num_enqueued = 0 for request in result: if not isinstance(request, Request): # item or None @@ -187,8 +189,8 @@ def _hcf_process_spider_result(self, result, spider): def _get_output_hcf_path(self, request): """ Determine to which frontier and slot should be saved the request. """ - frontier = request.meta.get('hcf_frontier', self.hs_consume_from_frontier) - slot = request.meta.get('hcf_slot', self.hs_consume_from_slot) + frontier = request.meta.get('hcf_frontier', self.consume_from_frontier) + slot = request.meta.get('hcf_slot', self.consume_from_slot) return frontier, slot def _serialize_request(self, request): @@ -208,7 +210,7 @@ def _serialize_request(self, request): data.update(request.meta.get('hcf_params', {})) return data - def _hs_deserialize_request(self, hcf_params, batch_id): + def _deserialize_request(self, hcf_params, batch_id): return Request( url=hcf_params['fp'], meta={'hcf_params': hcf_params} @@ -237,9 +239,9 @@ def _get_new_requests(self, spider): """ Get a new batch of links from the HCF.""" num_links = 0 num_batches = 0 - make_request = getattr(spider, 'hs_deserialize_request', self._hs_deserialize_request) + deserialize_request = getattr(spider, 'hcf_deserialize_request', self._deserialize_request) - new_batches = self._get_new_batches(self.hs_max_concurrent_batches) + new_batches = self._get_new_batches(self.hcf_max_concurrent_batches) for num_batches, batch in enumerate(new_batches, 1): self._msg("incoming batch: len=%d, id=%s" % (len(batch['requests']), batch['id'])) @@ -248,12 +250,12 @@ def _get_new_requests(self, spider): self.batches[batch['id']] = done, todo for fp, qdata in batch['requests']: - request = make_request({'fp': fp, 'qdata': qdata}, batch['id']) + request = deserialize_request({'fp': fp, 'qdata': qdata}, batch['id']) request.meta.setdefault(self.PRIVATE_INFO_KEY, (batch['id'], fp)) yield request num_links += 1 - self._msg('Read %d new links from %d batches, slot(%s)' % (num_links, num_batches, self.hs_consume_from_slot)) + self._msg('Read %d new links from %d batches, slot(%s)' % (num_links, num_batches, self.consume_from_slot)) self._msg('Current batches: %s' % self._get_batch_sizes()) def _get_new_batches(self, max_batches): @@ -271,7 +273,7 @@ def _get_new_batches(self, max_batches): # TODO: hook ===================== # e.g. it should be possible to use roundrobin itertools recipe # for fetching batches from several slots - new_batches = self.fclient.read(self.hs_consume_from_frontier, self.hs_consume_from_slot) + new_batches = self.fclient.read(self.consume_from_frontier, self.consume_from_slot) # ================================ # HCF could return already buffered batches; remove them @@ -296,18 +298,18 @@ def _get_new_batches(self, max_batches): self.seen_batch_ids.update(b['id'] for b in self.batches_buffer) num_read = len(self.batches_buffer) + num_consumed - buffer_size - self._msg('Read %d new batches from slot(%s)' % (num_read, self.hs_consume_from_slot)) + self._msg('Read %d new batches from slot(%s)' % (num_read, self.consume_from_slot)) def _delete_processed_batches(self): """ Delete in the HCF the ids of the processed batches.""" self._msg("Deleting batches: %r" % self._get_batch_sizes()) ids = self._get_processed_batch_ids() - self.fclient.delete(self.hs_consume_from_frontier, self.hs_consume_from_slot, ids) + self.fclient.delete(self.consume_from_frontier, self.consume_from_slot, ids) for batch_id in ids: del self.batches[batch_id] self._msg('Deleted %d processed batches in slot(%s)' % ( - len(ids), self.hs_consume_from_slot)) + len(ids), self.consume_from_slot)) self._msg('%d remaining batches with %d remaining requests (and %d processed requests)' % ( len(self.batches), sum(len(todo) for _, (done, todo) in self.batches.iteritems()), From b1f4dcdc895cce21bab0044df936e136ad06434d Mon Sep 17 00:00:00 2001 From: Mikhail Korobov Date: Mon, 10 Jun 2013 20:20:22 +0600 Subject: [PATCH 08/14] Fixed an issue with untracked requests: all started batches are deleted in idle_spider --- scrapylib/hcf.py | 39 ++++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/scrapylib/hcf.py b/scrapylib/hcf.py index e0e39d4..97406c9 100644 --- a/scrapylib/hcf.py +++ b/scrapylib/hcf.py @@ -153,14 +153,17 @@ def process_start_requests(self, start_requests, spider): yield non_hcf_item def process_spider_output(self, response, result, spider): - if self.PRIVATE_INFO_KEY in response.meta: - batch_id, fp = response.meta[self.PRIVATE_INFO_KEY] + self._mark_as_done(response.meta) + for non_hcf_item in self._hcf_process_spider_result(result, spider): + yield non_hcf_item + + def _mark_as_done(self, meta): + if self.PRIVATE_INFO_KEY in meta: + batch_id, fp = meta[self.PRIVATE_INFO_KEY] done, todo = self.batches[batch_id] done.add(fp) todo.remove(fp) - - for non_hcf_item in self._hcf_process_spider_result(result, spider): - yield non_hcf_item + self._msg('%s is removed from batch(%s)' % (fp, batch_id)) def _hcf_process_spider_result(self, result, spider): """ @@ -213,19 +216,28 @@ def _serialize_request(self, request): def _deserialize_request(self, hcf_params, batch_id): return Request( url=hcf_params['fp'], - meta={'hcf_params': hcf_params} + meta={'hcf_params': hcf_params}, ) def close_spider(self, spider, reason): + # When spider is closed, some of the scheduled requests + # might be not processed yet; _delete_processed_batches + # doesn't remove such requests. + self._delete_processed_batches() + # Close the frontier client in order to make sure that # all the new links are stored. - self._delete_processed_batches() self.fclient.close() self.hsclient.close() def idle_spider(self, spider): self.fclient.flush() - self._delete_processed_batches() + + # If spider entered idle state then all scheduled requests + # were somehow processed, so we may remove all requests from + # scheduled batches in HCF. It is hard to track all requests + # otherwise because of download errors, DNS errors, redirects, etc. + self._delete_started_batches() has_new_requests = False for request in self._get_new_requests(spider): @@ -300,9 +312,17 @@ def _get_new_batches(self, max_batches): num_read = len(self.batches_buffer) + num_consumed - buffer_size self._msg('Read %d new batches from slot(%s)' % (num_read, self.consume_from_slot)) + def _delete_started_batches(self): + """ Delete all started batches from HCF """ + self._msg("Deleting started batches: %r" % self._get_batch_sizes()) + ids = self.batches.keys() + self.fclient.delete(self.consume_from_frontier, self.consume_from_slot, ids) + for batch_id in ids: + del self.batches[batch_id] + def _delete_processed_batches(self): """ Delete in the HCF the ids of the processed batches.""" - self._msg("Deleting batches: %r" % self._get_batch_sizes()) + self._msg("Deleting processed batches: %r" % self._get_batch_sizes()) ids = self._get_processed_batch_ids() self.fclient.delete(self.consume_from_frontier, self.consume_from_slot, ids) for batch_id in ids: @@ -315,6 +335,7 @@ def _delete_processed_batches(self): sum(len(todo) for _, (done, todo) in self.batches.iteritems()), sum(len(done) for _, (done, todo) in self.batches.iteritems()), )) + # self._msg("remaining: %r" % [todo for _, (done, todo) in self.batches.iteritems()]) def _get_processed_batch_ids(self): return [batch_id for batch_id, (done, todo) in self.batches.iteritems() if not todo] From 5644ebbce4c8554768ac59b2b80fdeed52c7ec74 Mon Sep 17 00:00:00 2001 From: Mikhail Korobov Date: Mon, 10 Jun 2013 21:47:47 +0600 Subject: [PATCH 09/14] delete all started batches if spider finished normally --- scrapylib/hcf.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/scrapylib/hcf.py b/scrapylib/hcf.py index 97406c9..f8e90cf 100644 --- a/scrapylib/hcf.py +++ b/scrapylib/hcf.py @@ -220,10 +220,15 @@ def _deserialize_request(self, hcf_params, batch_id): ) def close_spider(self, spider, reason): - # When spider is closed, some of the scheduled requests - # might be not processed yet; _delete_processed_batches - # doesn't remove such requests. - self._delete_processed_batches() + if reason == 'finished': + # When spider finished normally, all scheduled requests are + # somehow processed, so we can delete them from HCF. + self._delete_started_batches() + else: + # When spider is not finished normally, some of the + # scheduled requests might be not processed yet; + # _delete_processed_batches doesn't remove such requests. + self._delete_processed_batches() # Close the frontier client in order to make sure that # all the new links are stored. From e47f3779c0b94958408d7b1ded871b0bfcf406e9 Mon Sep 17 00:00:00 2001 From: Mikhail Korobov Date: Mon, 10 Jun 2013 21:48:48 +0600 Subject: [PATCH 10/14] set default HCF_MAX_CONCURRENT_BATCHES to 5 (for more frequent checkpoints) --- scrapylib/hcf.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scrapylib/hcf.py b/scrapylib/hcf.py index f8e90cf..81426cc 100644 --- a/scrapylib/hcf.py +++ b/scrapylib/hcf.py @@ -22,7 +22,7 @@ The default value is provided by the python-hubstorage package. HCF_MAX_CONCURRENT_BATCHES - maximum number of concurrently processed - batches. The defaut is 10. + batches. The defaut is 5. By default, middleware will use spider name as HCF frontier and '0' as slot both for getting new requests from HCF and putting requests to HCF. @@ -90,7 +90,7 @@ def __init__(self, crawler): self.hs_endpoint = crawler.settings.get("HS_ENDPOINT") self.hs_auth = self._get_config(crawler, "HS_AUTH") self.hs_projectid = self._get_config(crawler, "HS_PROJECTID") - self.hcf_max_concurrent_batches = int(crawler.settings.get('HCF_MAX_CONCURRENT_BATCHES', 10)) + self.hcf_max_concurrent_batches = int(crawler.settings.get('HCF_MAX_CONCURRENT_BATCHES', 5)) self.hsclient = HubstorageClient(auth=self.hs_auth, endpoint=self.hs_endpoint) self.project = self.hsclient.get_project(self.hs_projectid) From 5abfa2b56922a08874383802365fbb938fe71f88 Mon Sep 17 00:00:00 2001 From: Mikhail Korobov Date: Mon, 10 Jun 2013 21:49:02 +0600 Subject: [PATCH 11/14] minor tweaks --- scrapylib/hcf.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/scrapylib/hcf.py b/scrapylib/hcf.py index 81426cc..8aab9b2 100644 --- a/scrapylib/hcf.py +++ b/scrapylib/hcf.py @@ -105,9 +105,11 @@ def __init__(self, crawler): # because in case of error we reprocess batches that are partially # complete, and all concurrently processed batches could be partially # complete because we don't enforce sequential processing order. + # # For example, if we have 1000 batches and each of them - # have 99 requests done, 1 unfinished then in case of error we'll + # has 99 requests done, 1 unfinished then in case of error we'll # need to reprocess all 99000 finished requests. + # # Another way to solve this is to reschedule unfinished requests to # new batches and mark all existing as processed. self.batches_buffer = collections.deque() @@ -322,8 +324,7 @@ def _delete_started_batches(self): self._msg("Deleting started batches: %r" % self._get_batch_sizes()) ids = self.batches.keys() self.fclient.delete(self.consume_from_frontier, self.consume_from_slot, ids) - for batch_id in ids: - del self.batches[batch_id] + self.batches.clear() def _delete_processed_batches(self): """ Delete in the HCF the ids of the processed batches.""" From 4512d4cfd87fe46bc143c3f23d39cf88abff0dcc Mon Sep 17 00:00:00 2001 From: Mikhail Korobov Date: Mon, 10 Jun 2013 21:56:00 +0600 Subject: [PATCH 12/14] less verbose output --- scrapylib/hcf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scrapylib/hcf.py b/scrapylib/hcf.py index 8aab9b2..c0650bb 100644 --- a/scrapylib/hcf.py +++ b/scrapylib/hcf.py @@ -165,7 +165,7 @@ def _mark_as_done(self, meta): done, todo = self.batches[batch_id] done.add(fp) todo.remove(fp) - self._msg('%s is removed from batch(%s)' % (fp, batch_id)) + self._msg('%s is removed from batch(%s)' % (fp, batch_id), log.DEBUG) def _hcf_process_spider_result(self, result, spider): """ From 95bac7e41109c4b813e11914aed12f60da45e555 Mon Sep 17 00:00:00 2001 From: Mikhail Korobov Date: Wed, 12 Jun 2013 02:55:02 +0600 Subject: [PATCH 13/14] more stats; less verbose log --- scrapylib/hcf.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/scrapylib/hcf.py b/scrapylib/hcf.py index c0650bb..35b8703 100644 --- a/scrapylib/hcf.py +++ b/scrapylib/hcf.py @@ -190,7 +190,10 @@ def _hcf_process_spider_result(self, result, spider): num_enqueued += 1 if num_enqueued: - self._msg("%d requests are put to queue" % num_enqueued) + self._msg("%d requests are put to queue" % num_enqueued, log.DEBUG) + self.crawler.stats.inc_value('hcf/requests/enqueued', num_enqueued) + self.crawler.stats.inc_value('hcf/spider_results_with_hcf_requests') + def _get_output_hcf_path(self, request): """ Determine to which frontier and slot should be saved the request. """ @@ -262,7 +265,7 @@ def _get_new_requests(self, spider): new_batches = self._get_new_batches(self.hcf_max_concurrent_batches) for num_batches, batch in enumerate(new_batches, 1): - self._msg("incoming batch: len=%d, id=%s" % (len(batch['requests']), batch['id'])) + self._msg("incoming batch: len=%d, id=%s" % (len(batch['requests']), batch['id']), log.DEBUG) assert batch['id'] not in self.batches done, todo = set(), set(fp for fp, data in batch['requests']) @@ -274,6 +277,8 @@ def _get_new_requests(self, spider): yield request num_links += 1 + self.crawler.stats.inc_value('hcf/batches/fetched', num_batches) + self.crawler.stats.inc_value('hcf/requests/fetched', num_links) self._msg('Read %d new links from %d batches, slot(%s)' % (num_links, num_batches, self.consume_from_slot)) self._msg('Current batches: %s' % self._get_batch_sizes()) @@ -321,16 +326,25 @@ def _get_new_batches(self, max_batches): def _delete_started_batches(self): """ Delete all started batches from HCF """ + sizes = self._get_batch_sizes() self._msg("Deleting started batches: %r" % self._get_batch_sizes()) ids = self.batches.keys() self.fclient.delete(self.consume_from_frontier, self.consume_from_slot, ids) + self.crawler.stats.inc_value('hcf/batches/dequeued', len(ids)) + self.crawler.stats.inc_value('hcf/requests/dequeued', sum(done+todo for done, todo in sizes)) self.batches.clear() def _delete_processed_batches(self): """ Delete in the HCF the ids of the processed batches.""" - self._msg("Deleting processed batches: %r" % self._get_batch_sizes()) + sizes = self._get_batch_sizes() + self._msg("Deleting processed batches: %r" % sizes) ids = self._get_processed_batch_ids() self.fclient.delete(self.consume_from_frontier, self.consume_from_slot, ids) + + dequeued_requests_num = sum(len(self.batches[id_][0]) for id_ in ids) + self.crawler.stats.inc_value('hcf/batches/dequeued', len(ids)) + self.crawler.stats.inc_value('hcf/requests/dequeued', dequeued_requests_num) + for batch_id in ids: del self.batches[batch_id] From b6e2ca447125edf95136c08cec991016e82d7664 Mon Sep 17 00:00:00 2001 From: Mikhail Korobov Date: Tue, 2 Jul 2013 23:42:25 +0600 Subject: [PATCH 14/14] remove pointless self --- scrapylib/hcf.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scrapylib/hcf.py b/scrapylib/hcf.py index 35b8703..7e641a3 100644 --- a/scrapylib/hcf.py +++ b/scrapylib/hcf.py @@ -143,13 +143,13 @@ def process_start_requests(self, start_requests, spider): self._msg('Input frontier: %s' % self.consume_from_frontier) self._msg('Input slot: %s' % self.consume_from_slot) - self.has_new_requests = False + has_new_requests = False for req in self._get_new_requests(spider): - self.has_new_requests = True + has_new_requests = True yield req # if there are no links in the hcf, use the start_requests - if not self.has_new_requests: + if not has_new_requests: self._msg('Using start_requests') for non_hcf_item in self._hcf_process_spider_result(start_requests, spider): yield non_hcf_item