diff --git a/downloader.py b/downloader.py index 02aaa1f..5a58e56 100644 --- a/downloader.py +++ b/downloader.py @@ -1,368 +1,6 @@ -from __future__ import print_function -import json -import time, sys, traceback -from hashlib import md5 -from datetime import datetime - -from twisted.logger import globalLogBeginner, formatEventAsClassicLogText, Logger -from twisted.web import server, resource -from twisted.internet import reactor, endpoints, defer -from twisted.web.client import Agent, BrowserLikeRedirectAgent, readBody, PartialDownloadError, HTTPConnectionPool -from twisted.web.server import NOT_DONE_YET -from twisted.web.http_headers import Headers -from twisted.web.html import escape -twisted_headers = Headers - -from scrapy.http.response.text import TextResponse -from scrapy.downloadermiddlewares.httpcompression import HttpCompressionMiddleware -from scrapy.downloadermiddlewares.decompression import DecompressionMiddleware -from scrapy.selector import Selector - -from scrapy.http.request import Request -from scrapy.http import Headers -from scrapy.responsetypes import responsetypes -from scrapy.core.downloader.contextfactory import ScrapyClientContextFactory - -from lxml import etree -import re - -from feed import getFeedData, buildFeed, get_conn -from contextlib import closing - -from settings import DOWNLOADER_USER_AGENT, FEED_REQUEST_PERIOD_LIMIT, DEBUG, SNAPSHOT_DIR - - -class bcolors: - HEADER = '\033[95m' - OKBLUE = '\033[94m' - OKGREEN = '\033[92m' - WARNING = '\033[93m' - FAIL = '\033[91m' - ENDC = '\033[0m' - BOLD = '\033[1m' - UNDERLINE = '\033[4m' - -class RequestStat: - def __init__(self, ip, feed_id, post_cnt, new_post_cnt, url=None, ex_msg=None, ex_callstack=None): - self.ip = ip - self.feed_id = feed_id - self.post_cnt = post_cnt - self.new_post_cnt = new_post_cnt - self.url = url - self.ex_msg = ex_msg - self.ex_callstack = ex_callstack - -def get_ip_id(ip, cur): - #import pdb;pdb.set_trace() - cur.execute("""select id from ips where address=%s""", (ip,)) - ip_id = cur.fetchone() - if not ip_id: - cur.execute("insert into ips (address) values (%s)", (ip,)) - ip_id = cur.lastrowid - return ip_id - - -def save_stat(stat): - try: - with closing(get_conn()) as conn: - with conn as cur: - ip_id = get_ip_id(stat.ip, cur) - cur.execute("""insert into requests (ip_id, feed_id, post_cnt, new_post_cnt) - values (%s, %s, %s, %s)""", (ip_id, stat.feed_id, stat.post_cnt, stat.new_post_cnt)) - stat_id = cur.lastrowid - if not stat.feed_id: - cur.execute("insert into request_urls (url, request_id) values (%s, %s)", (stat.url.encode('utf-8')[:2000], stat_id)) - - if stat.ex_msg: - cur.execute("""insert into request_fails (request_id, ex_msg, ex_callstack) - values (%s, %s, %s)""", (stat_id, stat.ex_msg[0:2000], stat.ex_callstack[:2000])) - except: - traceback.print_exc(file=sys.stdout) - -def print_log(event): - if 'isError' in event and event['isError']: - sys.stdout.write(bcolors.FAIL + formatEventAsClassicLogText(event) + bcolors.ENDC) - sys.stderr.write(formatEventAsClassicLogText(event)) - sys.stderr.flush() - else: - if 'stat' in event and event['stat']: - save_stat(event['request']) - sys.stdout.write(formatEventAsClassicLogText(event)) - sys.stdout.flush() - -globalLogBeginner.beginLoggingTo([print_log], discardBuffer=True, redirectStandardIO=False) # requred, discardBuffer gets rid of the LimitedHistoryLogObserver, redirectStandardIO will loop print action - -log = Logger() - -if FEED_REQUEST_PERIOD_LIMIT: - import redis - -def check_feed_request_time_limit(url): - if FEED_REQUEST_PERIOD_LIMIT: - r = redis.StrictRedis(host='localhost', port=6379, db=0) - previous_timestamp = r.get(url) - if previous_timestamp: - previous_timestamp = int(r.get(url)) - time_passed = int(time.time()) - previous_timestamp - if time_passed <= FEED_REQUEST_PERIOD_LIMIT: - # time left to wait - return FEED_REQUEST_PERIOD_LIMIT - time_passed - r.set(url, int(time.time())) - return 0 - - -#pool = HTTPConnectionPool(reactor, persistent=False) -#pool.cachedConnectionTimeout = 3 - -agent = BrowserLikeRedirectAgent( - Agent(reactor, - contextFactory=ScrapyClientContextFactory(), # skip certificate verification - connectTimeout=10), - #pool=pool), - redirectLimit=5 - ) - -def html2json(el): - return [ - el.tag, - {"tag-id": el.attrib["tag-id"]}, - [html2json(e) for e in el.getchildren() if isinstance(e, etree.ElementBase)] - ] - -def setBaseAndRemoveScriptsAndMore(response, url): - response.selector.remove_namespaces() - - tree = response.selector.root.getroottree() - - # save html for extended selectors - file_name = '%s_%s' % (time.time(), md5(url).hexdigest()) - file_path = SNAPSHOT_DIR + '/' + file_name - with open(file_path, 'w') as f: - f.write(url + '\n') - for k, v in response.headers.iteritems(): - for vv in v: - f.write('%s: %s\n' % (k, vv)) - f.write('\n\n' + etree.tostring(tree, encoding='utf-8', method='html')) - - # set base url to html document - head = tree.xpath("//head") - if head: - head = head[0] - base = head.xpath("./base") - if base: - base = base[0] - else: - base = etree.Element("base") - head.insert(0, base) - base.set('href', url) - - i = 1 - for bad in tree.xpath("//*"): - # remove scripts - if bad.tag == 'script': - bad.getparent().remove(bad) - else: - # set tag-id attribute - bad.attrib['tag-id'] = str(i) - i += 1 - - # sanitize anchors - if bad.tag == 'a' and 'href' in bad.attrib: - bad.attrib['origin-href'] = bad.attrib['href'] - del bad.attrib['href'] - - # remove html events - for attr in bad.attrib: - if attr.startswith('on'): - del bad.attrib[attr] - - # sanitize forms - if bad.tag == 'form': - bad.attrib['onsubmit'] = "return false" - - body = tree.xpath("//body") - if body: - # append html2json js object - jsobj = html2json(tree.getroot()) - script = etree.Element('script', {'type': 'text/javascript'}) - script.text = '\n'.join(( - 'var html2json = ' + json.dumps(jsobj) + ';', - 'var snapshot_time = "' + file_name + '";' - )) - body[0].append(script) - - return (etree.tostring(tree, method='html'), file_name) - -def buildScrapyResponse(response, body, url): - status = response.code - headers = Headers({k:','.join(v) for k,v in response.headers.getAllRawHeaders()}) - respcls = responsetypes.from_args(headers=headers, url=url) - return respcls(url=url, status=status, headers=headers, body=body) - -def buildScrapyRequest(url): - return Request(url) - -def downloadStarted(response, request, url, feed_config): - d = readBody(response) - d.addCallback(downloadDone, request=request, response=response, feed_config=feed_config) - d.addErrback(downloadError, request=request, url=url, response=response, feed_config=feed_config) - return response - -def downloadDone(response_str, request, response, feed_config): - url = response.request.absoluteURI - - print('Response <%s> ready (%s bytes)' % (url, len(response_str))) - response = buildScrapyResponse(response, response_str, url) - - response = HttpCompressionMiddleware().process_response(Request(url), response, None) - response = DecompressionMiddleware().process_response(None, response, None) - - if (isinstance(response, TextResponse)): - ip = request.getHeader('x-real-ip') or request.client.host - if feed_config: - [response_str, post_cnt, new_post_cnt] = buildFeed(response, feed_config) - request.setHeader(b"Content-Type", b'text/xml; charset=utf-8') - log.info('Stat: ip={request.ip} feed_id={request.feed_id} post_cnt={request.post_cnt} new_post_cnt={request.new_post_cnt}', request=RequestStat( - ip=ip, - feed_id=feed_config['id'], - post_cnt=post_cnt, - new_post_cnt=new_post_cnt - ), - stat=True - ) - else: - response_str, file_name = setBaseAndRemoveScriptsAndMore(response, url) - log.info('Stat: ip={request.ip} url={request.url}', request=RequestStat( - ip=ip, - feed_id=0, - post_cnt=0, - new_post_cnt=0, - url=url - ), - stat=True - ) - - request.write(response_str) - request.finish() - run_pgc() - -from pympler import tracker -import gc - -tr = tracker.SummaryTracker() -MON_PERIOD_SECONDS = 10 * 60 # 1 hours -mon_time = None -def mon(none): - global mon_time - tm = int(time.time()) - if not mon_time or tm - mon_time >= MON_PERIOD_SECONDS: - #global pool - #pool.closeCachedConnections() - gc.collect() - global tr - for line in tr.format_diff(): - log.info(line) - mon_time = tm - -def run_pgc(): - d = defer.Deferred() - reactor.callLater(0, d.callback, None) - d.addCallback(mon) - d.addErrback(lambda err: print("PGC error: %s\nPGC traceback: %s" % (err.getErrorMessage(), err.getTraceback()))) - -def error_html(msg): - return "%s\n") - -def downloadError(error, request=None, url=None, response=None, feed_config=None): - # read for details: https://stackoverflow.com/questions/29423986/twisted-giving-twisted-web-client-partialdownloaderror-200-ok - if error.type is PartialDownloadError and error.value.status == '200': - d = defer.Deferred() - reactor.callLater(0, d.callback, error.value.response) # error.value.response is response_str - d.addCallback(downloadDone, request=request, response=response, feed_config=feed_config) - d.addErrback(downloadError, request=request, url=url, response=response, feed_config=feed_config) - return - - if DEBUG: - request.write('Downloader error: ' + error.getErrorMessage()) - request.write('Traceback: ' + error.getTraceback()) - else: - request.write(error_html('Something wrong. Contact us by email: politepol.com@gmail.com \n Scary mantra: ' + error.getErrorMessage())) - sys.stderr.write('\n'.join([str(datetime.utcnow()), request.uri, url, 'Downloader error: ' + error.getErrorMessage(), 'Traceback: ' + error.getTraceback()])) - request.finish() - - try: - feed_id = feed_config and feed_config['id'] - s_url = None - if not feed_id: - feed_id = 0 - s_url = url - log.info('Stat: ip={request.ip} feed_id={request.feed_id} url="{request.url}" error="{request.ex_msg}"', request=RequestStat( - ip = request.getHeader('x-real-ip') or request.client.host, - feed_id = feed_id, - post_cnt=0, - new_post_cnt=0, - url=s_url, - ex_msg=error.getErrorMessage(), - ex_callstack=error.getTraceback() - ), - stat=True - ) - except: - traceback.print_exc(file=sys.stdout) - - -class Downloader(resource.Resource): - isLeaf = True - - feed_regexp = re.compile('^/feed1?/(\d{1,10})$') - - def startRequest(self, request, url, feed_config = None): - d = agent.request( - 'GET', - url, - twisted_headers({ - 'Accept': ['text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'], - 'Accept-Encoding': ['gzip, deflate, sdch'], - 'User-Agent': [DOWNLOADER_USER_AGENT] - }), - None - ) - print('Request started' % (url,)) - d.addCallback(downloadStarted, request=request, url=url, feed_config=feed_config) - d.addErrback(downloadError, request=request, url=url) - - def render_GET(self, request): - ''' - Render page for frontend or RSS feed - ''' - if 'url' in request.args: # page for frontend - url = request.args['url'][0] - - self.startRequest(request, url) - return NOT_DONE_YET - elif self.feed_regexp.match(request.uri) is not None: # feed - feed_id = self.feed_regexp.match(request.uri).groups()[0] - - time_left = check_feed_request_time_limit(request.uri) - if time_left: - request.setResponseCode(429) - request.setHeader('Retry-After', str(time_left) + ' seconds') - return 'Too Many Requests. Retry after %s seconds' % (str(time_left)) - else: - res = getFeedData(request, feed_id) - - if isinstance(res, basestring): # error message - return res - - url, feed_config = res - self.startRequest(request, url, feed_config) - return NOT_DONE_YET - else: # neither page and feed - return 'Url is required' +from pol import Server port = sys.argv[1] if len(sys.argv) >= 2 else 1234 -endpoints.serverFromString(reactor, "tcp:%s" % port).listen(server.Site(Downloader())) -print('Server starting at http://localhost:%s' % port) -reactor.run() \ No newline at end of file +Server(port, DATABASES['default'], SNAPSHOT_DIR, DOWNLOADER_USER_AGENT, DEBUG).run() diff --git a/pol/db.py b/pol/db.py new file mode 100755 index 0000000..947015b --- /dev/null +++ b/pol/db.py @@ -0,0 +1,7 @@ +import MySQLdb + + +def get_conn(creds): + db = MySQLdb.connect(host=creds['HOST'], port=int(creds['PORT']), user=creds['USER'], passwd=creds['PASSWORD'], db=creds['NAME'], init_command='SET NAMES utf8mb4') + db.autocommit(True) + return db diff --git a/pol/feed.py b/pol/feed.py new file mode 100755 index 0000000..dc8ab1d --- /dev/null +++ b/pol/feed.py @@ -0,0 +1,172 @@ +import w3lib.url +import w3lib.html + +from lxml import etree +import re, sys +from hashlib import md5 + +from feedgenerator import Rss201rev2Feed, Enclosure +import datetime + +import MySQLdb +from contextlib import closing +from settings import DATABASES, DOWNLOADER_USER_AGENT +from twisted.logger import Logger + + +log = Logger() + +class Feed(object): + + url_hash_regexp = re.compile('(#.*)?$') + + POST_TIME_DISTANCE = 15 # minutes, RSS Feed Reader skip same titles created in 10 min interval + + FIELD_IDS = {'title': 1, 'description': 2, 'link': 3} + + def __init__(self, db_creds, log): + self.db_creds = db_creds + self.log = log + + + def save_post(self, conn, created, feed_id, post_fields): + with conn as cur: + cur.execute("""insert into frontend_post (md5sum, created, feed_id) + values (%s, %s, %s)""", (post_fields['md5'], created, feed_id)) + + post_id = conn.insert_id() + for key in ['title', 'description', 'title_link']: + if key in post_fields: + cur.execute("""insert into frontend_postfield (field_id, post_id, `text`) + values (%s, %s, %s)""", (FIELD_IDS[key], post_id, post_fields[key].encode('utf-8'))) + log.info('Post saved id:{id!r}', id=post_id) + + def fill_time(self, feed_id, items): + if not items: + return 0 + + new_post_cnt = 0 + for item in items: + #create md5 + h = md5('') + for key in ['title', 'description', 'link']: + if key in item: + h.update(item[key].encode('utf-8')) + item['md5'] = h.hexdigest() + + #fetch dates from db + fetched_dates = {} + with closing(get_conn(self.db_creds)) as conn: + with conn as cur: + quoted_hashes = ','.join(["'%s'" % (i['md5']) for i in items]) + + cur.execute("""select p.md5sum, p.created, p.id + from frontend_post p + where p.md5sum in (%s) + and p.feed_id=%s""" % (quoted_hashes, feed_id,)) + rows = cur.fetchall() + self.log.debug('Selected {count!r} posts', count=len(rows)) + for row in rows: + md5hash = row[0] + created = row[1] + post_id = row[2] + fetched_dates[md5hash] = created + + cur_time = datetime.datetime.utcnow() + for item in items: + if item['md5'] in fetched_dates: + item['time'] = fetched_dates[item['md5']] + else: + item['time'] = cur_time + self.save_post(conn, cur_time, feed_id, item) + new_post_cnt += 1 + cur_time -= datetime.timedelta(minutes=POST_TIME_DISTANCE) + return new_post_cnt + + def _build_link(self, html, doc_url, url): + base_url = w3lib.html.get_base_url(html, doc_url) + return w3lib.url.urljoin_rfc(base_url, url).decode('utf-8') + + def buildFeed(self, response, feed_config): + response.selector.remove_namespaces() + + selector = response.selector + tree = selector.root.getroottree() + # get data from html + items = [] + for node in selector.xpath(feed_config['xpath']): + item = {} + required_count = 0 + required_found = 0 + for field_name in ['title', 'description', 'link']: + if field_name in feed_config['fields']: + if feed_config['required'][field_name]: + required_count += 1 + + extracted = node.xpath(feed_config['fields'][field_name]).extract() + if extracted: + item[field_name] = u''.join(extracted) + if feed_config['required'][field_name]: + required_found += 1 + if field_name == 'link': + item['link'] = _build_link(response.body_as_unicode(), feed_config['uri'], item[field_name]) + + if required_count == required_found: + items.append(item) + + title = selector.xpath('//title/text()').extract() + + #build feed + feed = Rss201rev2Feed( + title = title[0] if title else 'Polite Pol: ' + feed_config['uri'], + link=feed_config['uri'], + description="Generated by PolitePol.com.\n"+\ + "Source page url: " + feed_config['uri'], + language="en", + ) + new_post_cnt = fill_time(feed_config['id'], items) + + for item in items: + title = item['title'] if 'title' in item else '' + desc = item['description'] if 'description' in item else '' + time = item['time'] if 'time' in item else datetime.datetime.utcnow() + if 'link' in item: + link = item['link'] + else: + link = url_hash_regexp.sub('#' + md5((title+desc).encode('utf-8')).hexdigest(), feed_config['uri']) + feed.add_item( + title = title, + link = link, + unique_id = link, + description = desc, + #enclosure=Enclosure(fields[4], "32000", "image/jpeg") if 4 in fields else None, #"Image" + pubdate = time + ) + return [feed.writeString('utf-8'), len(items), new_post_cnt] + + def getFeedData(self, request, feed_id): + # get url, xpathes + feed = {} + + with closing(get_conn(self.db_creds)) as conn: + with conn as cur: + cur.execute("""select f.uri, f.xpath, fi.name, ff.xpath, fi.required from frontend_feed f + right join frontend_feedfield ff on ff.feed_id=f.id + left join frontend_field fi on fi.id=ff.field_id + where f.id=%s""", (feed_id,)) + rows = cur.fetchall() + + for row in rows: + if not feed: + feed['id'] = feed_id + feed['uri'] = row[0] + feed['xpath'] = row[1] + feed['fields'] = {} + feed['required'] = {} + feed['fields'][row[2]] = row[3] + feed['required'][row[2]] = row[4] + + if feed: + return [feed['uri'], feed] + else: + return 'Feed generator error: config of feed is empty' diff --git a/pol/log.py b/pol/log.py new file mode 100755 index 0000000..2d79f12 --- /dev/null +++ b/pol/log.py @@ -0,0 +1,33 @@ +import sys +from twisted.logger import globalLogBeginner, formatEventAsClassicLogText + + +class bcolors: + HEADER = '\033[95m' + OKBLUE = '\033[94m' + OKGREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + + +class LogHandler(object): + """Handler of twisted log meaasges""" + + def __init__(self, stat_tool=None): + self.stat_tool = stat_tool + # requred, discardBuffer gets rid of the LimitedHistoryLogObserver, redirectStandardIO will loop print action + globalLogBeginner.beginLoggingTo([self.print_log], discardBuffer=True, redirectStandardIO=False) + + + def print_log(event): + if 'isError' in event and event['isError']: + sys.stdout.write(bcolors.FAIL + formatEventAsClassicLogText(event) + bcolors.ENDC) + sys.stderr.write(formatEventAsClassicLogText(event)) + sys.stderr.flush() + else: + sys.stdout.write(formatEventAsClassicLogText(event)) + sys.stdout.flush() + diff --git a/pol/memmon.py b/pol/memmon.py new file mode 100755 index 0000000..236918a --- /dev/null +++ b/pol/memmon.py @@ -0,0 +1,21 @@ +import time +from pympler import tracker +import gc + + +class Monitor(object): + + prev_time = None + + def __init__(self, period_second=10 * 60, log=None): + self.period_second = period_second + self.log = log + self.tr = tracker.SummaryTracker() + + def show_diff(none): + tm = int(time.time()) + if not self.prev_time or tm - prev_time >= self.period_second: + gc.collect() + for line in tr.format_diff(): + self.log.info(line) + self.prev_time = tm diff --git a/pol/server.py b/pol/server.py new file mode 100755 index 0000000..6415c06 --- /dev/null +++ b/pol/server.py @@ -0,0 +1,286 @@ +from datetime import datetime +from hashlib import md5 +import json +import time, sys, traceback +import re + +from lxml import etree + +from twisted.web import server, resource +from twisted.internet import reactor, endpoints, +from twisted.web.client import Agent, BrowserLikeRedirectAgent, readBody, PartialDownloadError, HTTPConnectionPool +from twisted.web.server import +from twisted.web.http_headers import Headers +from twisted.web.html import escape +twisted_headers = Headers +from twisted.logger import Logger + +from scrapy.http.response.text import TextResponse +from scrapy.downloadermiddlewares.httpcompression import HttpCompressionMiddleware +from scrapy.downloadermiddlewares.decompression import DecompressionMiddleware +from scrapy.http.request import Request +from scrapy.http import Headers +from scrapy.responsetypes import responsetypes +from scrapy.core.downloader.contextfactory import ScrapyClientContextFactory + +from .feed import Feed + +class Downloader(object): + + def __init__(self, debug, stat_tool=None, mem_mon=None): + self.debug = debug + self.stat_tool = stat_tool + self.mem_mon = mem_mon + + def html2json(self, el): + return [ + el.tag, + {"tag-id": el.attrib["tag-id"]}, + [html2json(e) for e in el.getchildren() if isinstance(e, etree.ElementBase)] + ] + + def setBaseAndRemoveScriptsAndMore(self, response, url): + response.selector.remove_namespaces() + + tree = response.selector.root.getroottree() + + # save html for extended selectors + file_name = '%s_%s' % (time.time(), md5(url).hexdigest()) + file_path = SNAPSHOT_DIR + '/' + file_name + with open(file_path, 'w') as f: + f.write(url + '\n') + for k, v in response.headers.iteritems(): + for vv in v: + f.write('%s: %s\n' % (k, vv)) + f.write('\n\n' + etree.tostring(tree, encoding='utf-8', method='html')) + + # set base url to html document + head = tree.xpath("//head") + if head: + head = head[0] + base = head.xpath("./base") + if base: + base = base[0] + else: + base = etree.Element("base") + head.insert(0, base) + base.set('href', url) + + i = 1 + for bad in tree.xpath("//*"): + # remove scripts + if bad.tag == 'script': + bad.getparent().remove(bad) + else: + # set tag-id attribute + bad.attrib['tag-id'] = str(i) + i += 1 + + # sanitize anchors + if bad.tag == 'a' and 'href' in bad.attrib: + bad.attrib['origin-href'] = bad.attrib['href'] + del bad.attrib['href'] + + # remove html events + for attr in bad.attrib: + if attr.startswith('on'): + del bad.attrib[attr] + + # sanitize forms + if bad.tag == 'form': + bad.attrib['onsubmit'] = "return false" + + body = tree.xpath("//body") + if body: + # append html2json js object + jsobj = self.html2json(tree.getroot()) + script = etree.Element('script', {'type': 'text/javascript'}) + script.text = '\n'.join(( + 'var html2json = ' + json.dumps(jsobj) + ';', + 'var snapshot_time = "' + file_name + '";' + )) + body[0].append(script) + + return (etree.tostring(tree, method='html'), file_name) + + def buildScrapyResponse(self, response, body, url): + status = response.code + headers = Headers({k:','.join(v) for k,v in response.headers.getAllRawHeaders()}) + respcls = responsetypes.from_args(headers=headers, url=url) + return respcls(url=url, status=status, headers=headers, body=body) + + def error_html(self, msg): + return "%s\n") + + def downloadError(self, error, request=None, url=None, response=None, feed_config=None): + # read for details: https://stackoverflow.com/questions/29423986/twisted-giving-twisted-web-client-partialdownloaderror-200-ok + if error.type is PartialDownloadError and error.value.status == '200': + d = defer.Deferred() + reactor.callLater(0, d.callback, error.value.response) # error.value.response is response_str + d.addCallback(downloadDone, request=request, response=response, feed_config=feed_config) + d.addErrback(downloadError, request=request, url=url, response=response, feed_config=feed_config) + return + + if self.debug: + request.write('Downloader error: ' + error.getErrorMessage()) + request.write('Traceback: ' + error.getTraceback()) + else: + request.write(self.error_html('Something wrong. Contact us by email: politepol.com@gmail.com \n Scary mantra: ' + error.getErrorMessage())) + sys.stderr.write('\n'.join([str(datetime.utcnow()), request.uri, url, 'Downloader error: ' + error.getErrorMessage(), 'Traceback: ' + error.getTraceback()])) + request.finish() + + try: + feed_id = feed_config and feed_config['id'] + s_url = None + if not feed_id: + feed_id = 0 + s_url = url + log.info('Stat: ip={request.ip} feed_id={request.feed_id} url="{request.url}" error="{request.ex_msg}"', request=RequestStat( + ip = request.getHeader('x-real-ip') or request.client.host, + feed_id = feed_id, + post_cnt=0, + new_post_cnt=0, + url=s_url, + ex_msg=error.getErrorMessage(), + ex_callstack=error.getTraceback() + ), + stat=True + ) + except: + traceback.print_exc(file=sys.stdout) + + + def downloadStarted(response, request, url, feed_config): + d = readBody(response) + d.addCallback(downloadDone, request=request, response=response, feed_config=feed_config) + d.addErrback(downloadError, request=request, url=url, response=response, feed_config=feed_config) + return response + + def downloadDone(response_str, request, response, feed_config): + url = response.request.absoluteURI + + print('Response <%s> ready (%s bytes)' % (url, len(response_str))) + response = buildScrapyResponse(response, response_str, url) + + response = HttpCompressionMiddleware().process_response(Request(url), response, None) + response = DecompressionMiddleware().process_response(None, response, None) + + if (isinstance(response, TextResponse)): + ip = request.getHeader('x-real-ip') or request.client.host + if feed_config: + [response_str, post_cnt, new_post_cnt] = self.feed.buildFeed(response, feed_config) + request.setHeader(b"Content-Type", b'text/xml; charset=utf-8') + log.info('Stat: ip={request.ip} feed_id={request.feed_id} post_cnt={request.post_cnt} new_post_cnt={request.new_post_cnt}', request=RequestStat( + ip=ip, + feed_id=feed_config['id'], + post_cnt=post_cnt, + new_post_cnt=new_post_cnt + ), + stat=True + ) + else: + response_str, file_name = setBaseAndRemoveScriptsAndMore(response, url) + log.info('Stat: ip={request.ip} url={request.url}', request=RequestStat( + ip=ip, + feed_id=0, + post_cnt=0, + new_post_cnt=0, + url=url + ), + stat=True + ) + + request.write(response_str) + request.finish() + run_mem_mon() + + def run_mem_mon(): + global mem_mon + if mem_mon: + d = defer.Deferred() + reactor.callLater(0, d.callback, None) + d.addCallback(mem_mon.show_diff) + d.addErrback(lambda err: print("Memory Monitor error: %s\nPGC traceback: %s" % (err.getErrorMessage(), err.getTraceback()))) + + +class Site(resource.Resource): + isLeaf = True + + feed_regexp = re.compile('^/feed1?/(\d{1,10})$') + + def __init__(self, db_creds, snapshot_dir, user_agent, debug): + self.db_creds = db_creds + self.snapshot_dir = snapshot_dir + self.user_agent = user_agent + + self.downloader = Downloader(debug) + self.feed = Feed(db_creds, log) + + def startRequest(self, request, url, feed_config = None): + agent = BrowserLikeRedirectAgent( + Agent(reactor, + contextFactory=ScrapyClientContextFactory(), # skip certificate verification + connectTimeout=10), + #pool=pool), + redirectLimit=5 + ) + + d = agent.request( + 'GET', + url, + twisted_headers({ + 'Accept': ['text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'], + 'Accept-Encoding': ['gzip, deflate, sdch'], + 'User-Agent': [self.user_agent] + }), + None + ) + print('Request started' % (url,)) + d.addCallback(self.downloader.downloadStarted, request=request, url=url, feed_config=feed_config) + d.addErrback(self.downloader.downloadError, request=request, url=url) + + def render_GET(self, request): + ''' + Render page for frontend or RSS feed + ''' + if 'url' in request.args: # page for frontend + url = request.args['url'][0] + + self.startRequest(request, url) + return NOT_DONE_YET + elif self.feed_regexp.match(request.uri) is not None: # feed + feed_id = self.feed_regexp.match(request.uri).groups()[0] + + time_left = check_feed_request_time_limit(request.uri) + if time_left: + request.setResponseCode(429) + request.setHeader('Retry-After', str(time_left) + ' seconds') + return 'Too Many Requests. Retry after %s seconds' % (str(time_left)) + else: + res = self.feed.getFeedData(request, feed_id) + + if isinstance(res, basestring): # error message + return res + + url, feed_config = res + self.startRequest(request, url, feed_config) + return NOT_DONE_YET + else: # neither page and feed + return 'Url is required' + + +class Server(object): + + def __init__(self, port, db_creds, snapshot_dir, user_agent, debug): + self.port = port + self.db_creds = db_creds + self.snapshot_dir = snapshot_dir + self.user_agent = user_agent + + def setMemMonitor(_mem_mon=None) + global mem_mon + mem_mon = _mem_mon + + def run(self): + endpoints.serverFromString(reactor, "tcp:%s" % self.port).listen(server.Site(Site(self.db_creds, self.snapshot_dir, self.user_agent, self.debug))) + reactor.run() \ No newline at end of file