diff --git a/downloader.py b/downloader.py index 52d9ae6..fd125b9 100644 --- a/downloader.py +++ b/downloader.py @@ -5,4 +5,4 @@ from settings import DATABASES, SNAPSHOT_DIR, DOWNLOADER_USER_AGENT, DEBUG port = sys.argv[1] if len(sys.argv) >= 2 else 1234 -Server(port, None, SNAPSHOT_DIR, DOWNLOADER_USER_AGENT).run() # DATABASES['default'] +Server(port, DATABASES['default'], SNAPSHOT_DIR, DOWNLOADER_USER_AGENT).run() diff --git a/feed.py b/feed.py deleted file mode 100644 index 7de32ce..0000000 --- a/feed.py +++ /dev/null @@ -1,186 +0,0 @@ -import w3lib.url -import w3lib.html - -from lxml import etree -import re, sys -from hashlib import md5 - -from feedgenerator import Rss201rev2Feed, Enclosure -import datetime - -import MySQLdb -from contextlib import closing -from settings import DATABASES, DOWNLOADER_USER_AGENT -from twisted.logger import Logger - - -log = Logger() - -url_hash_regexp = re.compile('(#.*)?$') - -POST_TIME_DISTANCE = 15 # minutes, RSS Feed Reader skip same titles created in 10 min interval - -FIELD_IDS = {'title': 1, 'description': 2, 'link': 3} - -def save_post(conn, created, feed_id, post_fields): - with conn as cur: - cur.execute("""insert into frontend_post (md5sum, created, feed_id) - values (%s, %s, %s)""", (post_fields['md5'], created, feed_id)) - - post_id = conn.insert_id() - for key in ['title', 'description', 'title_link']: - if key in post_fields: - cur.execute("""insert into frontend_postfield (field_id, post_id, `text`) - values (%s, %s, %s)""", (FIELD_IDS[key], post_id, post_fields[key].encode('utf-8'))) - log.info('Post saved id:{id!r}', id=post_id) - -def fill_time(feed_id, items): - if not items: - return 0 - - new_post_cnt = 0 - for item in items: - #create md5 - h = md5('') - for key in ['title', 'description', 'link']: - if key in item: - h.update(item[key].encode('utf-8')) - item['md5'] = h.hexdigest() - - #fetch dates from db - fetched_dates = {} - with closing(get_conn()) as conn: - with conn as cur: - quoted_hashes = ','.join(["'%s'" % (i['md5']) for i in items]) - - cur.execute("""select p.md5sum, p.created, p.id - from frontend_post p - where p.md5sum in (%s) - and p.feed_id=%s""" % (quoted_hashes, feed_id,)) - rows = cur.fetchall() - log.debug('Selected {count!r} posts', count=len(rows)) - for row in rows: - md5hash = row[0] - created = row[1] - post_id = row[2] - fetched_dates[md5hash] = created - - cur_time = datetime.datetime.utcnow() - for item in items: - if item['md5'] in fetched_dates: - item['time'] = fetched_dates[item['md5']] - else: - item['time'] = cur_time - save_post(conn, cur_time, feed_id, item) - new_post_cnt += 1 - cur_time -= datetime.timedelta(minutes=POST_TIME_DISTANCE) - return new_post_cnt - -def decode(text, encoding): # it's strange but true - if isinstance(text, unicode): - return text - else: - return text.decode(encoding) - -def element_to_unicode(element, encoding): - if isinstance(element, basestring): # attribute - return decode(element, encoding) - - s = [decode(element.text, encoding)] if element.text else [] - for sub_element in element: - s.append(decode(etree.tostring(sub_element), encoding)) - return u''.join(s) - -def _build_link(html, doc_url, url): - base_url = w3lib.html.get_base_url(html, doc_url) - return w3lib.url.urljoin_rfc(base_url, url).decode('utf-8') - -def buildFeed(response, feed_config): - response.selector.remove_namespaces() - - selector = response.selector - tree = selector.root.getroottree() - # get data from html - items = [] - for node in selector.xpath(feed_config['xpath']): - item = {} - required_count = 0 - required_found = 0 - for field_name in ['title', 'description', 'link']: - if field_name in feed_config['fields']: - if feed_config['required'][field_name]: - required_count += 1 - - extracted = node.xpath(feed_config['fields'][field_name]).extract() - if extracted: - item[field_name] = u''.join(extracted) - if feed_config['required'][field_name]: - required_found += 1 - if field_name == 'link': - item['link'] = _build_link(response.body_as_unicode(), feed_config['uri'], item[field_name]) - - if required_count == required_found: - items.append(item) - - title = selector.xpath('//title/text()').extract() - - #build feed - feed = Rss201rev2Feed( - title = title[0] if title else 'Polite Pol: ' + feed_config['uri'], - link=feed_config['uri'], - description="Generated by PolitePol.com.\n"+\ - "Source page url: " + feed_config['uri'], - language="en", - ) - new_post_cnt = fill_time(feed_config['id'], items) - - for item in items: - title = item['title'] if 'title' in item else '' - desc = item['description'] if 'description' in item else '' - time = item['time'] if 'time' in item else datetime.datetime.utcnow() - if 'link' in item: - link = item['link'] - else: - link = url_hash_regexp.sub('#' + md5((title+desc).encode('utf-8')).hexdigest(), feed_config['uri']) - feed.add_item( - title = title, - link = link, - unique_id = link, - description = desc, - #enclosure=Enclosure(fields[4], "32000", "image/jpeg") if 4 in fields else None, #"Image" - pubdate = time - ) - return [feed.writeString('utf-8'), len(items), new_post_cnt] - -def getFeedData(request, feed_id): - # get url, xpathes - feed = {} - - with closing(get_conn()) as conn: - with conn as cur: - cur.execute("""select f.uri, f.xpath, fi.name, ff.xpath, fi.required from frontend_feed f - right join frontend_feedfield ff on ff.feed_id=f.id - left join frontend_field fi on fi.id=ff.field_id - where f.id=%s""", (feed_id,)) - rows = cur.fetchall() - - for row in rows: - if not feed: - feed['id'] = feed_id - feed['uri'] = row[0] - feed['xpath'] = row[1] - feed['fields'] = {} - feed['required'] = {} - feed['fields'][row[2]] = row[3] - feed['required'][row[2]] = row[4] - - if feed: - return [feed['uri'], feed] - else: - return 'Feed generator error: config of feed is empty' - -def get_conn(): - creds = DATABASES['default'] - db = MySQLdb.connect(host=creds['HOST'], port=int(creds['PORT']), user=creds['USER'], passwd=creds['PASSWORD'], db=creds['NAME'], init_command='SET NAMES utf8mb4') - db.autocommit(True) - return db diff --git a/pol/feed.py b/pol/feed.py index 7273073..be1ba6f 100755 --- a/pol/feed.py +++ b/pol/feed.py @@ -39,7 +39,7 @@ class Feed(object): for key in ['title', 'description', 'title_link']: if key in post_fields: cur.execute("""insert into frontend_postfield (field_id, post_id, `text`) - values (%s, %s, %s)""", (FIELD_IDS[key], post_id, post_fields[key].encode('utf-8'))) + values (%s, %s, %s)""", (self.FIELD_IDS[key], post_id, post_fields[key].encode('utf-8'))) log.info('Post saved id:{id!r}', id=post_id) def fill_time(self, feed_id, items): @@ -66,7 +66,7 @@ class Feed(object): where p.md5sum in (%s) and p.feed_id=%s""" % (quoted_hashes, feed_id,)) rows = cur.fetchall() - self.log.debug('Selected {count!r} posts', count=len(rows)) + log.debug('Selected {count!r} posts', count=len(rows)) for row in rows: md5hash = row[0] created = row[1] @@ -81,7 +81,7 @@ class Feed(object): item['time'] = cur_time self.save_post(conn, cur_time, feed_id, item) new_post_cnt += 1 - cur_time -= datetime.timedelta(minutes=POST_TIME_DISTANCE) + cur_time -= datetime.timedelta(minutes=self.POST_TIME_DISTANCE) return new_post_cnt def _build_link(self, html, doc_url, url): @@ -110,7 +110,7 @@ class Feed(object): if feed_config['required'][field_name]: required_found += 1 if field_name == 'link': - item['link'] = _build_link(response.body_as_unicode(), feed_config['uri'], item[field_name]) + item['link'] = self._build_link(response.body_as_unicode(), feed_config['uri'], item[field_name]) if required_count == required_found: items.append(item) @@ -125,7 +125,7 @@ class Feed(object): "Source page url: " + feed_config['uri'], language="en", ) - new_post_cnt = fill_time(feed_config['id'], items) + new_post_cnt = self.fill_time(feed_config['id'], items) for item in items: title = item['title'] if 'title' in item else '' diff --git a/pol/log.py b/pol/log.py index f039a62..1d07303 100755 --- a/pol/log.py +++ b/pol/log.py @@ -16,13 +16,13 @@ class bcolors: class LogHandler(object): """Handler of twisted log meaasges""" - def __init__(self, stat_tool=None): - self.stat_tool = stat_tool + def __init__(self): + #import pdb;pdb.set_trace() # requred, discardBuffer gets rid of the LimitedHistoryLogObserver, redirectStandardIO will loop print action globalLogBeginner.beginLoggingTo([self.print_log], discardBuffer=True, redirectStandardIO=False) - def print_log(event): + def print_log(self, event): if 'isError' in event and event['isError']: sys.stdout.write(bcolors.FAIL + formatEventAsClassicLogText(event) + bcolors.ENDC) sys.stderr.write(formatEventAsClassicLogText(event)) diff --git a/pol/server.py b/pol/server.py index 400f8b3..6768ddb 100755 --- a/pol/server.py +++ b/pol/server.py @@ -34,33 +34,39 @@ log = Logger() class Downloader(object): - def __init__(self, debug, stat_tool=None, mem_mon=None, limiter=None): + def __init__(self, feed, debug, snapshot_dir='/tmp', stat_tool=None, mem_mon=None): + self.feed = feed self.debug = debug + self.snapshot_dir = snapshot_dir self.stat_tool = stat_tool self.mem_mon = mem_mon - self.limiter = limiter def html2json(self, el): return [ el.tag, {"tag-id": el.attrib["tag-id"]}, - [html2json(e) for e in el.getchildren() if isinstance(e, etree.ElementBase)] + [self.html2json(e) for e in el.getchildren() if isinstance(e, etree.ElementBase)] ] - def setBaseAndRemoveScriptsAndMore(self, response, url): - response.selector.remove_namespaces() - - tree = response.selector.root.getroottree() - + def _saveResponse(self, response, url, tree): # save html for extended selectors file_name = '%s_%s' % (time.time(), md5(url).hexdigest()) - file_path = SNAPSHOT_DIR + '/' + file_name + file_path = self.snapshot_dir + '/' + file_name with open(file_path, 'w') as f: f.write(url + '\n') for k, v in response.headers.iteritems(): for vv in v: f.write('%s: %s\n' % (k, vv)) f.write('\n\n' + etree.tostring(tree, encoding='utf-8', method='html')) + return file_name + + + def setBaseAndRemoveScriptsAndMore(self, response, url): + response.selector.remove_namespaces() + + tree = response.selector.root.getroottree() + + file_name = self._saveResponse(response, url, tree) # set base url to html document head = tree.xpath("//head") @@ -125,15 +131,15 @@ class Downloader(object): if error.type is PartialDownloadError and error.value.status == '200': d = defer.Deferred() reactor.callLater(0, d.callback, error.value.response) # error.value.response is response_str - d.addCallback(downloadDone, request=request, response=response, feed_config=feed_config) - d.addErrback(downloadError, request=request, url=url, response=response, feed_config=feed_config) + d.addCallback(self.downloadDone, request=request, response=response, feed_config=feed_config) + d.addErrback(self.downloadError, request=request, url=url, response=response, feed_config=feed_config) return if self.debug: request.write('Downloader error: ' + error.getErrorMessage()) request.write('Traceback: ' + error.getTraceback()) else: - request.write(self.error_html('

PolitePol says: "Something wrong"

Try to refresh page or contact us by email: politepol.com@gmail.com\n(Help us to improve our service with your feedback)

Scary mantra: %s

' % escape(error.getErrorMessage()))) + request.write(self.error_html('

PolitePol says: "Something wrong"

Try to refresh page or contact us by email: politepol.com@gmail.com\n(Help us to improve our service with your feedback)

Scary mantra: %s

' % escape(error.getErrorMessage()))) sys.stderr.write('\n'.join([str(datetime.utcnow()), request.uri, url, 'Downloader error: ' + error.getErrorMessage(), 'Traceback: ' + error.getTraceback()])) request.finish() @@ -157,17 +163,17 @@ class Downloader(object): traceback.print_exc(file=sys.stdout) - def downloadStarted(response, request, url, feed_config): + def downloadStarted(self, response, request, url, feed_config): d = readBody(response) - d.addCallback(downloadDone, request=request, response=response, feed_config=feed_config) - d.addErrback(downloadError, request=request, url=url, response=response, feed_config=feed_config) + d.addCallback(self.downloadDone, request=request, response=response, feed_config=feed_config) + d.addErrback(self.downloadError, request=request, url=url, response=response, feed_config=feed_config) return response - def downloadDone(response_str, request, response, feed_config): + def downloadDone(self, response_str, request, response, feed_config): url = response.request.absoluteURI print('Response <%s> ready (%s bytes)' % (url, len(response_str))) - response = buildScrapyResponse(response, response_str, url) + response = self.buildScrapyResponse(response, response_str, url) response = HttpCompressionMiddleware().process_response(Request(url), response, None) response = DecompressionMiddleware().process_response(None, response, None) @@ -177,36 +183,22 @@ class Downloader(object): if feed_config: [response_str, post_cnt, new_post_cnt] = self.feed.buildFeed(response, feed_config) request.setHeader(b"Content-Type", b'text/xml; charset=utf-8') - log.info('Stat: ip={request.ip} feed_id={request.feed_id} post_cnt={request.post_cnt} new_post_cnt={request.new_post_cnt}', request=RequestStat( - ip=ip, - feed_id=feed_config['id'], - post_cnt=post_cnt, - new_post_cnt=new_post_cnt - ), - stat=True - ) + if self.stat_tool: + self.stat_tool.trace(ip=ip, feed_id=feed_config['id'], post_cnt=post_cnt, new_post_cnt=new_post_cnt) else: - response_str, file_name = setBaseAndRemoveScriptsAndMore(response, url) - log.info('Stat: ip={request.ip} url={request.url}', request=RequestStat( - ip=ip, - feed_id=0, - post_cnt=0, - new_post_cnt=0, - url=url - ), - stat=True - ) + response_str, file_name = self.setBaseAndRemoveScriptsAndMore(response, url) + if self.stat_tool: + self.stat_tool.trace(ip=ip, feed_id=0, post_cnt=0, new_post_cnt=0, url=url) request.write(response_str) request.finish() - run_mem_mon() + self.run_mem_mon() - def run_mem_mon(): - global mem_mon - if mem_mon: + def run_mem_mon(self): + if self.mem_mon: d = defer.Deferred() reactor.callLater(0, d.callback, None) - d.addCallback(mem_mon.show_diff) + d.addCallback(self.mem_mon.show_diff) d.addErrback(lambda err: print("Memory Monitor error: %s\nPGC traceback: %s" % (err.getErrorMessage(), err.getTraceback()))) @@ -215,14 +207,14 @@ class Site(resource.Resource): feed_regexp = re.compile('^/feed1?/(\d{1,10})$') - def __init__(self, db_creds, snapshot_dir, user_agent, debug=False, limiter=None): + def __init__(self, db_creds, snapshot_dir, user_agent, debug=False, limiter=None, mem_mon=None, stat_tool=None): self.db_creds = db_creds self.snapshot_dir = snapshot_dir self.user_agent = user_agent self.limiter = limiter - self.downloader = Downloader(debug) self.feed = Feed(db_creds) + self.downloader = Downloader(self.feed, debug, snapshot_dir, stat_tool, mem_mon) def startRequest(self, request, url, feed_config = None): agent = BrowserLikeRedirectAgent( @@ -279,20 +271,17 @@ class Site(resource.Resource): class Server(object): - def __init__(self, port, db_creds, snapshot_dir, user_agent, debug=False, limiter=None): + def __init__(self, port, db_creds, snapshot_dir, user_agent, debug=False, limiter=None, mem_mon=None): self.port = port self.db_creds = db_creds self.snapshot_dir = snapshot_dir self.user_agent = user_agent self.debug = debug self.limiter = limiter + self.mem_mon = mem_mon self.log_handler = LogHandler() - def setMemMonitor(self, _mem_mon=None): - global mem_mon - mem_mon = _mem_mon - def run(self): - endpoints.serverFromString(reactor, "tcp:%s" % self.port).listen(server.Site(Site(self.db_creds, self.snapshot_dir, self.user_agent, self.debug, self.limiter))) + endpoints.serverFromString(reactor, "tcp:%s" % self.port).listen(server.Site(Site(self.db_creds, self.snapshot_dir, self.user_agent, self.debug, self.limiter, self.mem_mon))) reactor.run() \ No newline at end of file