v/pol
1
0
mirror of https://github.com/taroved/pol synced 2025-05-20 08:00:14 -07:00
This commit is contained in:
Alexandr Nesterenko 2017-10-19 17:08:26 -04:00
parent 2f2610fc2d
commit 0385a09008
2 changed files with 122 additions and 81 deletions

View File

@ -88,10 +88,10 @@ class Feed(object):
base_url = w3lib.html.get_base_url(html, doc_url) base_url = w3lib.html.get_base_url(html, doc_url)
return w3lib.url.urljoin_rfc(base_url, url).decode('utf-8') return w3lib.url.urljoin_rfc(base_url, url).decode('utf-8')
def buildFeed(self, response, feed_config): def buildFeed(self, selector, page_unicode, feed_config):
response.selector.remove_namespaces() selector.remove_namespaces()
selector = response.selector selector = selector
tree = selector.root.getroottree() tree = selector.root.getroottree()
# get data from html # get data from html
items = [] items = []
@ -110,7 +110,7 @@ class Feed(object):
if feed_config['required'][field_name]: if feed_config['required'][field_name]:
required_found += 1 required_found += 1
if field_name == 'link': if field_name == 'link':
item['link'] = self._build_link(response.body_as_unicode(), feed_config['uri'], item[field_name]) item['link'] = self._build_link(page_unicode, feed_config['uri'], item[field_name])
if required_count == required_found: if required_count == required_found:
items.append(item) items.append(item)
@ -145,7 +145,7 @@ class Feed(object):
) )
return [feed.writeString('utf-8'), len(items), new_post_cnt] return [feed.writeString('utf-8'), len(items), new_post_cnt]
def getFeedData(self, request, feed_id): def getFeedData(self, feed_id):
# get url, xpathes # get url, xpathes
feed = {} feed = {}

View File

@ -4,6 +4,7 @@ from hashlib import md5
import json import json
import time, sys, traceback import time, sys, traceback
import re import re
from urlparse import urlparse
from lxml import etree from lxml import etree
@ -23,6 +24,7 @@ from scrapy.http.request import Request
from scrapy.http import Headers from scrapy.http import Headers
from scrapy.responsetypes import responsetypes from scrapy.responsetypes import responsetypes
from scrapy.core.downloader.contextfactory import ScrapyClientContextFactory from scrapy.core.downloader.contextfactory import ScrapyClientContextFactory
from scrapy.selector import Selector
from pol.log import LogHandler from pol.log import LogHandler
from .feed import Feed from .feed import Feed
@ -48,25 +50,25 @@ class Downloader(object):
[self.html2json(e) for e in el.getchildren() if isinstance(e, etree.ElementBase)] [self.html2json(e) for e in el.getchildren() if isinstance(e, etree.ElementBase)]
] ]
def _saveResponse(self, response, url, tree): def _saveResponse(self, headers, url, tree):
# save html for extended selectors # save html for extended selectors
file_name = '%s_%s' % (time.time(), md5(url).hexdigest()) file_name = '%s_%s' % (time.time(), md5(url).hexdigest())
file_path = self.snapshot_dir + '/' + file_name file_path = self.snapshot_dir + '/' + file_name
with open(file_path, 'w') as f: with open(file_path, 'w') as f:
f.write(url + '\n') f.write(url + '\n')
for k, v in response.headers.iteritems(): for k, v in headers.iteritems():
for vv in v: for vv in v:
f.write('%s: %s\n' % (k, vv)) f.write('%s: %s\n' % (k, vv))
f.write('\n\n' + etree.tostring(tree, encoding='utf-8', method='html')) f.write('\n\n' + etree.tostring(tree, encoding='utf-8', method='html'))
return file_name return file_name
def setBaseAndRemoveScriptsAndMore(self, response, url): def setBaseAndRemoveScriptsAndMore(self, selector, headers, url):
response.selector.remove_namespaces() selector.remove_namespaces()
tree = response.selector.root.getroottree() tree = selector.root.getroottree()
file_name = self._saveResponse(response, url, tree) file_name = self._saveResponse(headers, url, tree)
# set base url to html document # set base url to html document
head = tree.xpath("//head") head = tree.xpath("//head")
@ -126,15 +128,18 @@ class Downloader(object):
def error_html(self, msg): def error_html(self, msg):
return "<html><body>%s</body></html" % msg.replace("\n", "<br/>\n") return "<html><body>%s</body></html" % msg.replace("\n", "<br/>\n")
def downloadError(self, error, request=None, url=None, response=None, feed_config=None): def downloadError(self, error, request=None, url=None, response=None, feed_config=None, selector_defer=None):
# read for details: https://stackoverflow.com/questions/29423986/twisted-giving-twisted-web-client-partialdownloaderror-200-ok # read for details: https://stackoverflow.com/questions/29423986/twisted-giving-twisted-web-client-partialdownloaderror-200-ok
if error.type is PartialDownloadError and error.value.status == '200': if error.type is PartialDownloadError and error.value.status == '200':
d = defer.Deferred() d = defer.Deferred()
reactor.callLater(0, d.callback, error.value.response) # error.value.response is response_str reactor.callLater(0, d.callback, error.value.response) # error.value.response is response_str
d.addCallback(self.downloadDone, request=request, response=response, feed_config=feed_config) d.addCallback(self.downloadDone, request=request, response=response, feed_config=feed_config, selector_defer=selector_defer)
d.addErrback(self.downloadError, request=request, url=url, response=response, feed_config=feed_config) d.addErrback(self.downloadError, request=request, url=url, response=response, feed_config=feed_config, selector_defer=selector_defer)
return return
if selector_defer:
selector_defer.errback(error)
else:
if self.debug: if self.debug:
request.write('Downloader error: ' + error.getErrorMessage()) request.write('Downloader error: ' + error.getErrorMessage())
request.write('Traceback: ' + error.getTraceback()) request.write('Traceback: ' + error.getTraceback())
@ -163,13 +168,13 @@ class Downloader(object):
traceback.print_exc(file=sys.stdout) traceback.print_exc(file=sys.stdout)
def downloadStarted(self, response, request, url, feed_config): def downloadStarted(self, response, request, url, feed_config, selector_defer):
d = readBody(response) d = readBody(response)
d.addCallback(self.downloadDone, request=request, response=response, feed_config=feed_config) d.addCallback(self.downloadDone, request=request, response=response, feed_config=feed_config, selector_defer=selector_defer)
d.addErrback(self.downloadError, request=request, url=url, response=response, feed_config=feed_config) d.addErrback(self.downloadError, request=request, url=url, response=response, feed_config=feed_config, selector_defer=selector_defer)
return response return response
def downloadDone(self, response_str, request, response, feed_config): def downloadDone(self, response_str, request, response, feed_config, selector_defer):
url = response.request.absoluteURI url = response.request.absoluteURI
print('Response <%s> ready (%s bytes)' % (url, len(response_str))) print('Response <%s> ready (%s bytes)' % (url, len(response_str)))
@ -178,22 +183,31 @@ class Downloader(object):
response = HttpCompressionMiddleware().process_response(Request(url), response, None) response = HttpCompressionMiddleware().process_response(Request(url), response, None)
response = DecompressionMiddleware().process_response(None, response, None) response = DecompressionMiddleware().process_response(None, response, None)
if selector_defer:
selector_defer.callback(response.selector)
else:
if (isinstance(response, TextResponse)): if (isinstance(response, TextResponse)):
ip = request.getHeader('x-real-ip') or request.client.host ip = request.getHeader('x-real-ip') or request.client.host
response_str = self.prepare_response_str(response.selector, response.headers, response.body_as_unicode(), url, feed_config, ip)
if feed_config: if feed_config:
[response_str, post_cnt, new_post_cnt] = self.feed.buildFeed(response, feed_config)
request.setHeader(b"Content-Type", b'text/xml; charset=utf-8') request.setHeader(b"Content-Type", b'text/xml; charset=utf-8')
if self.stat_tool:
self.stat_tool.trace(ip=ip, feed_id=feed_config['id'], post_cnt=post_cnt, new_post_cnt=new_post_cnt)
else:
response_str, file_name = self.setBaseAndRemoveScriptsAndMore(response, url)
if self.stat_tool:
self.stat_tool.trace(ip=ip, feed_id=0, post_cnt=0, new_post_cnt=0, url=url)
request.write(response_str) request.write(response_str)
request.finish() request.finish()
self.run_memon() self.run_memon()
def prepare_response_str(self, selector, headers, page_unicode, url, feed_config, ip=None):
if feed_config:
[response_str, post_cnt, new_post_cnt] = self.feed.buildFeed(selector, page_unicode, feed_config)
if self.stat_tool:
self.stat_tool.trace(ip=ip, feed_id=feed_config['id'], post_cnt=post_cnt, new_post_cnt=new_post_cnt)
else:
response_str, file_name = self.setBaseAndRemoveScriptsAndMore(selector, headers, url)
if self.stat_tool:
self.stat_tool.trace(ip=ip, feed_id=0, post_cnt=0, new_post_cnt=0, url=url)
return response_str
def run_memon(self): def run_memon(self):
if self.memon: if self.memon:
d = defer.Deferred() d = defer.Deferred()
@ -216,7 +230,17 @@ class Site(resource.Resource):
self.feed = Feed(db_creds) self.feed = Feed(db_creds)
self.downloader = Downloader(self.feed, debug, snapshot_dir, stat_tool, memon) self.downloader = Downloader(self.feed, debug, snapshot_dir, stat_tool, memon)
def startRequest(self, request, url, feed_config = None): def startRequest(self, request, url, feed_config = None, selector_defer=None):
response_str = self.tryLocalPage(url)
if response_str:
response_str = response_str.decode('utf-8')
selector = Selector(text=response_str)
response_str = self.downloader.prepare_response_str(selector, {}, response_str, url, feed_config)
request.setHeader(b"Content-Type", b'text/xml; charset=utf-8')
request.write(response_str)
request.finish()
print('Request <GET %s> local' % (url,))
else:
agent = BrowserLikeRedirectAgent( agent = BrowserLikeRedirectAgent(
Agent(reactor, Agent(reactor,
contextFactory=ScrapyClientContextFactory(), # skip certificate verification contextFactory=ScrapyClientContextFactory(), # skip certificate verification
@ -236,8 +260,18 @@ class Site(resource.Resource):
None None
) )
print('Request <GET %s> started' % (url,)) print('Request <GET %s> started' % (url,))
d.addCallback(self.downloader.downloadStarted, request=request, url=url, feed_config=feed_config) d.addCallback(self.downloader.downloadStarted, request=request, url=url, feed_config=feed_config, selector_defer=selector_defer)
d.addErrback(self.downloader.downloadError, request=request, url=url, feed_config=feed_config) d.addErrback(self.downloader.downloadError, request=request, url=url, feed_config=feed_config, selector_defer=selector_defer)
def tryLocalPage(self, url):
m = md5(url).hexdigest()
domain = urlparse(url).netloc
try:
with open('/home/taroved/pages/' + m + '.' + domain) as f:
return f.read()
except IOError:
return None
return None
def render_GET(self, request): def render_GET(self, request):
''' '''
@ -257,7 +291,7 @@ class Site(resource.Resource):
request.setHeader('Retry-After', str(time_left) + ' seconds') request.setHeader('Retry-After', str(time_left) + ' seconds')
return 'Too Many Requests. Retry after %s seconds' % (str(time_left)) return 'Too Many Requests. Retry after %s seconds' % (str(time_left))
else: else:
res = self.feed.getFeedData(request, feed_id) res = self.feed.getFeedData(feed_id)
if isinstance(res, basestring): # error message if isinstance(res, basestring): # error message
return res return res
@ -283,6 +317,13 @@ class Server(object):
self.log_handler = LogHandler() self.log_handler = LogHandler()
self.site = Site(self.db_creds, self.snapshot_dir, self.user_agent, self.debug, self.limiter, self.memon, self.stat_tool)
def requestSelector(self, url=None, feed_config=None):
d = defer.Deferred()
self.site.startRequest(None, url, feed_config=feed_config, selector_defer=d)
return d
def run(self): def run(self):
endpoints.serverFromString(reactor, "tcp:%s" % self.port).listen(server.Site(Site(self.db_creds, self.snapshot_dir, self.user_agent, self.debug, self.limiter, self.memon, self.stat_tool))) endpoints.serverFromString(reactor, "tcp:%s" % self.port).listen(server.Site(self.site))
reactor.run() reactor.run()