mirror of
https://github.com/taroved/pol
synced 2025-05-15 13:50:09 -07:00
lib start
This commit is contained in:
parent
89b9ffb78d
commit
6e9884e34a
@ -5,4 +5,4 @@ from settings import DATABASES, SNAPSHOT_DIR, DOWNLOADER_USER_AGENT, DEBUG
|
||||
|
||||
port = sys.argv[1] if len(sys.argv) >= 2 else 1234
|
||||
|
||||
Server(port, None, SNAPSHOT_DIR, DOWNLOADER_USER_AGENT).run() # DATABASES['default']
|
||||
Server(port, DATABASES['default'], SNAPSHOT_DIR, DOWNLOADER_USER_AGENT).run()
|
||||
|
186
feed.py
186
feed.py
@ -1,186 +0,0 @@
|
||||
import w3lib.url
|
||||
import w3lib.html
|
||||
|
||||
from lxml import etree
|
||||
import re, sys
|
||||
from hashlib import md5
|
||||
|
||||
from feedgenerator import Rss201rev2Feed, Enclosure
|
||||
import datetime
|
||||
|
||||
import MySQLdb
|
||||
from contextlib import closing
|
||||
from settings import DATABASES, DOWNLOADER_USER_AGENT
|
||||
from twisted.logger import Logger
|
||||
|
||||
|
||||
log = Logger()
|
||||
|
||||
url_hash_regexp = re.compile('(#.*)?$')
|
||||
|
||||
POST_TIME_DISTANCE = 15 # minutes, RSS Feed Reader skip same titles created in 10 min interval
|
||||
|
||||
FIELD_IDS = {'title': 1, 'description': 2, 'link': 3}
|
||||
|
||||
def save_post(conn, created, feed_id, post_fields):
|
||||
with conn as cur:
|
||||
cur.execute("""insert into frontend_post (md5sum, created, feed_id)
|
||||
values (%s, %s, %s)""", (post_fields['md5'], created, feed_id))
|
||||
|
||||
post_id = conn.insert_id()
|
||||
for key in ['title', 'description', 'title_link']:
|
||||
if key in post_fields:
|
||||
cur.execute("""insert into frontend_postfield (field_id, post_id, `text`)
|
||||
values (%s, %s, %s)""", (FIELD_IDS[key], post_id, post_fields[key].encode('utf-8')))
|
||||
log.info('Post saved id:{id!r}', id=post_id)
|
||||
|
||||
def fill_time(feed_id, items):
|
||||
if not items:
|
||||
return 0
|
||||
|
||||
new_post_cnt = 0
|
||||
for item in items:
|
||||
#create md5
|
||||
h = md5('')
|
||||
for key in ['title', 'description', 'link']:
|
||||
if key in item:
|
||||
h.update(item[key].encode('utf-8'))
|
||||
item['md5'] = h.hexdigest()
|
||||
|
||||
#fetch dates from db
|
||||
fetched_dates = {}
|
||||
with closing(get_conn()) as conn:
|
||||
with conn as cur:
|
||||
quoted_hashes = ','.join(["'%s'" % (i['md5']) for i in items])
|
||||
|
||||
cur.execute("""select p.md5sum, p.created, p.id
|
||||
from frontend_post p
|
||||
where p.md5sum in (%s)
|
||||
and p.feed_id=%s""" % (quoted_hashes, feed_id,))
|
||||
rows = cur.fetchall()
|
||||
log.debug('Selected {count!r} posts', count=len(rows))
|
||||
for row in rows:
|
||||
md5hash = row[0]
|
||||
created = row[1]
|
||||
post_id = row[2]
|
||||
fetched_dates[md5hash] = created
|
||||
|
||||
cur_time = datetime.datetime.utcnow()
|
||||
for item in items:
|
||||
if item['md5'] in fetched_dates:
|
||||
item['time'] = fetched_dates[item['md5']]
|
||||
else:
|
||||
item['time'] = cur_time
|
||||
save_post(conn, cur_time, feed_id, item)
|
||||
new_post_cnt += 1
|
||||
cur_time -= datetime.timedelta(minutes=POST_TIME_DISTANCE)
|
||||
return new_post_cnt
|
||||
|
||||
def decode(text, encoding): # it's strange but true
|
||||
if isinstance(text, unicode):
|
||||
return text
|
||||
else:
|
||||
return text.decode(encoding)
|
||||
|
||||
def element_to_unicode(element, encoding):
|
||||
if isinstance(element, basestring): # attribute
|
||||
return decode(element, encoding)
|
||||
|
||||
s = [decode(element.text, encoding)] if element.text else []
|
||||
for sub_element in element:
|
||||
s.append(decode(etree.tostring(sub_element), encoding))
|
||||
return u''.join(s)
|
||||
|
||||
def _build_link(html, doc_url, url):
|
||||
base_url = w3lib.html.get_base_url(html, doc_url)
|
||||
return w3lib.url.urljoin_rfc(base_url, url).decode('utf-8')
|
||||
|
||||
def buildFeed(response, feed_config):
|
||||
response.selector.remove_namespaces()
|
||||
|
||||
selector = response.selector
|
||||
tree = selector.root.getroottree()
|
||||
# get data from html
|
||||
items = []
|
||||
for node in selector.xpath(feed_config['xpath']):
|
||||
item = {}
|
||||
required_count = 0
|
||||
required_found = 0
|
||||
for field_name in ['title', 'description', 'link']:
|
||||
if field_name in feed_config['fields']:
|
||||
if feed_config['required'][field_name]:
|
||||
required_count += 1
|
||||
|
||||
extracted = node.xpath(feed_config['fields'][field_name]).extract()
|
||||
if extracted:
|
||||
item[field_name] = u''.join(extracted)
|
||||
if feed_config['required'][field_name]:
|
||||
required_found += 1
|
||||
if field_name == 'link':
|
||||
item['link'] = _build_link(response.body_as_unicode(), feed_config['uri'], item[field_name])
|
||||
|
||||
if required_count == required_found:
|
||||
items.append(item)
|
||||
|
||||
title = selector.xpath('//title/text()').extract()
|
||||
|
||||
#build feed
|
||||
feed = Rss201rev2Feed(
|
||||
title = title[0] if title else 'Polite Pol: ' + feed_config['uri'],
|
||||
link=feed_config['uri'],
|
||||
description="Generated by PolitePol.com.\n"+\
|
||||
"Source page url: " + feed_config['uri'],
|
||||
language="en",
|
||||
)
|
||||
new_post_cnt = fill_time(feed_config['id'], items)
|
||||
|
||||
for item in items:
|
||||
title = item['title'] if 'title' in item else ''
|
||||
desc = item['description'] if 'description' in item else ''
|
||||
time = item['time'] if 'time' in item else datetime.datetime.utcnow()
|
||||
if 'link' in item:
|
||||
link = item['link']
|
||||
else:
|
||||
link = url_hash_regexp.sub('#' + md5((title+desc).encode('utf-8')).hexdigest(), feed_config['uri'])
|
||||
feed.add_item(
|
||||
title = title,
|
||||
link = link,
|
||||
unique_id = link,
|
||||
description = desc,
|
||||
#enclosure=Enclosure(fields[4], "32000", "image/jpeg") if 4 in fields else None, #"Image"
|
||||
pubdate = time
|
||||
)
|
||||
return [feed.writeString('utf-8'), len(items), new_post_cnt]
|
||||
|
||||
def getFeedData(request, feed_id):
|
||||
# get url, xpathes
|
||||
feed = {}
|
||||
|
||||
with closing(get_conn()) as conn:
|
||||
with conn as cur:
|
||||
cur.execute("""select f.uri, f.xpath, fi.name, ff.xpath, fi.required from frontend_feed f
|
||||
right join frontend_feedfield ff on ff.feed_id=f.id
|
||||
left join frontend_field fi on fi.id=ff.field_id
|
||||
where f.id=%s""", (feed_id,))
|
||||
rows = cur.fetchall()
|
||||
|
||||
for row in rows:
|
||||
if not feed:
|
||||
feed['id'] = feed_id
|
||||
feed['uri'] = row[0]
|
||||
feed['xpath'] = row[1]
|
||||
feed['fields'] = {}
|
||||
feed['required'] = {}
|
||||
feed['fields'][row[2]] = row[3]
|
||||
feed['required'][row[2]] = row[4]
|
||||
|
||||
if feed:
|
||||
return [feed['uri'], feed]
|
||||
else:
|
||||
return 'Feed generator error: config of feed is empty'
|
||||
|
||||
def get_conn():
|
||||
creds = DATABASES['default']
|
||||
db = MySQLdb.connect(host=creds['HOST'], port=int(creds['PORT']), user=creds['USER'], passwd=creds['PASSWORD'], db=creds['NAME'], init_command='SET NAMES utf8mb4')
|
||||
db.autocommit(True)
|
||||
return db
|
10
pol/feed.py
10
pol/feed.py
@ -39,7 +39,7 @@ class Feed(object):
|
||||
for key in ['title', 'description', 'title_link']:
|
||||
if key in post_fields:
|
||||
cur.execute("""insert into frontend_postfield (field_id, post_id, `text`)
|
||||
values (%s, %s, %s)""", (FIELD_IDS[key], post_id, post_fields[key].encode('utf-8')))
|
||||
values (%s, %s, %s)""", (self.FIELD_IDS[key], post_id, post_fields[key].encode('utf-8')))
|
||||
log.info('Post saved id:{id!r}', id=post_id)
|
||||
|
||||
def fill_time(self, feed_id, items):
|
||||
@ -66,7 +66,7 @@ class Feed(object):
|
||||
where p.md5sum in (%s)
|
||||
and p.feed_id=%s""" % (quoted_hashes, feed_id,))
|
||||
rows = cur.fetchall()
|
||||
self.log.debug('Selected {count!r} posts', count=len(rows))
|
||||
log.debug('Selected {count!r} posts', count=len(rows))
|
||||
for row in rows:
|
||||
md5hash = row[0]
|
||||
created = row[1]
|
||||
@ -81,7 +81,7 @@ class Feed(object):
|
||||
item['time'] = cur_time
|
||||
self.save_post(conn, cur_time, feed_id, item)
|
||||
new_post_cnt += 1
|
||||
cur_time -= datetime.timedelta(minutes=POST_TIME_DISTANCE)
|
||||
cur_time -= datetime.timedelta(minutes=self.POST_TIME_DISTANCE)
|
||||
return new_post_cnt
|
||||
|
||||
def _build_link(self, html, doc_url, url):
|
||||
@ -110,7 +110,7 @@ class Feed(object):
|
||||
if feed_config['required'][field_name]:
|
||||
required_found += 1
|
||||
if field_name == 'link':
|
||||
item['link'] = _build_link(response.body_as_unicode(), feed_config['uri'], item[field_name])
|
||||
item['link'] = self._build_link(response.body_as_unicode(), feed_config['uri'], item[field_name])
|
||||
|
||||
if required_count == required_found:
|
||||
items.append(item)
|
||||
@ -125,7 +125,7 @@ class Feed(object):
|
||||
"Source page url: " + feed_config['uri'],
|
||||
language="en",
|
||||
)
|
||||
new_post_cnt = fill_time(feed_config['id'], items)
|
||||
new_post_cnt = self.fill_time(feed_config['id'], items)
|
||||
|
||||
for item in items:
|
||||
title = item['title'] if 'title' in item else ''
|
||||
|
@ -16,13 +16,13 @@ class bcolors:
|
||||
class LogHandler(object):
|
||||
"""Handler of twisted log meaasges"""
|
||||
|
||||
def __init__(self, stat_tool=None):
|
||||
self.stat_tool = stat_tool
|
||||
def __init__(self):
|
||||
#import pdb;pdb.set_trace()
|
||||
# requred, discardBuffer gets rid of the LimitedHistoryLogObserver, redirectStandardIO will loop print action
|
||||
globalLogBeginner.beginLoggingTo([self.print_log], discardBuffer=True, redirectStandardIO=False)
|
||||
|
||||
|
||||
def print_log(event):
|
||||
def print_log(self, event):
|
||||
if 'isError' in event and event['isError']:
|
||||
sys.stdout.write(bcolors.FAIL + formatEventAsClassicLogText(event) + bcolors.ENDC)
|
||||
sys.stderr.write(formatEventAsClassicLogText(event))
|
||||
|
@ -34,33 +34,39 @@ log = Logger()
|
||||
|
||||
class Downloader(object):
|
||||
|
||||
def __init__(self, debug, stat_tool=None, mem_mon=None, limiter=None):
|
||||
def __init__(self, feed, debug, snapshot_dir='/tmp', stat_tool=None, mem_mon=None):
|
||||
self.feed = feed
|
||||
self.debug = debug
|
||||
self.snapshot_dir = snapshot_dir
|
||||
self.stat_tool = stat_tool
|
||||
self.mem_mon = mem_mon
|
||||
self.limiter = limiter
|
||||
|
||||
def html2json(self, el):
|
||||
return [
|
||||
el.tag,
|
||||
{"tag-id": el.attrib["tag-id"]},
|
||||
[html2json(e) for e in el.getchildren() if isinstance(e, etree.ElementBase)]
|
||||
[self.html2json(e) for e in el.getchildren() if isinstance(e, etree.ElementBase)]
|
||||
]
|
||||
|
||||
def setBaseAndRemoveScriptsAndMore(self, response, url):
|
||||
response.selector.remove_namespaces()
|
||||
|
||||
tree = response.selector.root.getroottree()
|
||||
|
||||
def _saveResponse(self, response, url, tree):
|
||||
# save html for extended selectors
|
||||
file_name = '%s_%s' % (time.time(), md5(url).hexdigest())
|
||||
file_path = SNAPSHOT_DIR + '/' + file_name
|
||||
file_path = self.snapshot_dir + '/' + file_name
|
||||
with open(file_path, 'w') as f:
|
||||
f.write(url + '\n')
|
||||
for k, v in response.headers.iteritems():
|
||||
for vv in v:
|
||||
f.write('%s: %s\n' % (k, vv))
|
||||
f.write('\n\n' + etree.tostring(tree, encoding='utf-8', method='html'))
|
||||
return file_name
|
||||
|
||||
|
||||
def setBaseAndRemoveScriptsAndMore(self, response, url):
|
||||
response.selector.remove_namespaces()
|
||||
|
||||
tree = response.selector.root.getroottree()
|
||||
|
||||
file_name = self._saveResponse(response, url, tree)
|
||||
|
||||
# set base url to html document
|
||||
head = tree.xpath("//head")
|
||||
@ -125,15 +131,15 @@ class Downloader(object):
|
||||
if error.type is PartialDownloadError and error.value.status == '200':
|
||||
d = defer.Deferred()
|
||||
reactor.callLater(0, d.callback, error.value.response) # error.value.response is response_str
|
||||
d.addCallback(downloadDone, request=request, response=response, feed_config=feed_config)
|
||||
d.addErrback(downloadError, request=request, url=url, response=response, feed_config=feed_config)
|
||||
d.addCallback(self.downloadDone, request=request, response=response, feed_config=feed_config)
|
||||
d.addErrback(self.downloadError, request=request, url=url, response=response, feed_config=feed_config)
|
||||
return
|
||||
|
||||
if self.debug:
|
||||
request.write('Downloader error: ' + error.getErrorMessage())
|
||||
request.write('Traceback: ' + error.getTraceback())
|
||||
else:
|
||||
request.write(self.error_html('<h1>PolitePol says: "Something wrong"</h1> <p><b>Try to refresh page or contact us by email: politepol.com@gmail.com</b>\n(Help us to improve our service with your feedback)</p> <p><i>Scary mantra: %s</i></p>' % escape(error.getErrorMessage())))
|
||||
request.write(self.error_html('<h1>PolitePol says: "Something wrong"</h1> <p><b>Try to refresh page or contact us by email: <a href="mailto:politepol.com@gmail.com">politepol.com@gmail.com</a></b>\n(Help us to improve our service with your feedback)</p> <p><i>Scary mantra: %s</i></p>' % escape(error.getErrorMessage())))
|
||||
sys.stderr.write('\n'.join([str(datetime.utcnow()), request.uri, url, 'Downloader error: ' + error.getErrorMessage(), 'Traceback: ' + error.getTraceback()]))
|
||||
request.finish()
|
||||
|
||||
@ -157,17 +163,17 @@ class Downloader(object):
|
||||
traceback.print_exc(file=sys.stdout)
|
||||
|
||||
|
||||
def downloadStarted(response, request, url, feed_config):
|
||||
def downloadStarted(self, response, request, url, feed_config):
|
||||
d = readBody(response)
|
||||
d.addCallback(downloadDone, request=request, response=response, feed_config=feed_config)
|
||||
d.addErrback(downloadError, request=request, url=url, response=response, feed_config=feed_config)
|
||||
d.addCallback(self.downloadDone, request=request, response=response, feed_config=feed_config)
|
||||
d.addErrback(self.downloadError, request=request, url=url, response=response, feed_config=feed_config)
|
||||
return response
|
||||
|
||||
def downloadDone(response_str, request, response, feed_config):
|
||||
def downloadDone(self, response_str, request, response, feed_config):
|
||||
url = response.request.absoluteURI
|
||||
|
||||
print('Response <%s> ready (%s bytes)' % (url, len(response_str)))
|
||||
response = buildScrapyResponse(response, response_str, url)
|
||||
response = self.buildScrapyResponse(response, response_str, url)
|
||||
|
||||
response = HttpCompressionMiddleware().process_response(Request(url), response, None)
|
||||
response = DecompressionMiddleware().process_response(None, response, None)
|
||||
@ -177,36 +183,22 @@ class Downloader(object):
|
||||
if feed_config:
|
||||
[response_str, post_cnt, new_post_cnt] = self.feed.buildFeed(response, feed_config)
|
||||
request.setHeader(b"Content-Type", b'text/xml; charset=utf-8')
|
||||
log.info('Stat: ip={request.ip} feed_id={request.feed_id} post_cnt={request.post_cnt} new_post_cnt={request.new_post_cnt}', request=RequestStat(
|
||||
ip=ip,
|
||||
feed_id=feed_config['id'],
|
||||
post_cnt=post_cnt,
|
||||
new_post_cnt=new_post_cnt
|
||||
),
|
||||
stat=True
|
||||
)
|
||||
if self.stat_tool:
|
||||
self.stat_tool.trace(ip=ip, feed_id=feed_config['id'], post_cnt=post_cnt, new_post_cnt=new_post_cnt)
|
||||
else:
|
||||
response_str, file_name = setBaseAndRemoveScriptsAndMore(response, url)
|
||||
log.info('Stat: ip={request.ip} url={request.url}', request=RequestStat(
|
||||
ip=ip,
|
||||
feed_id=0,
|
||||
post_cnt=0,
|
||||
new_post_cnt=0,
|
||||
url=url
|
||||
),
|
||||
stat=True
|
||||
)
|
||||
response_str, file_name = self.setBaseAndRemoveScriptsAndMore(response, url)
|
||||
if self.stat_tool:
|
||||
self.stat_tool.trace(ip=ip, feed_id=0, post_cnt=0, new_post_cnt=0, url=url)
|
||||
|
||||
request.write(response_str)
|
||||
request.finish()
|
||||
run_mem_mon()
|
||||
self.run_mem_mon()
|
||||
|
||||
def run_mem_mon():
|
||||
global mem_mon
|
||||
if mem_mon:
|
||||
def run_mem_mon(self):
|
||||
if self.mem_mon:
|
||||
d = defer.Deferred()
|
||||
reactor.callLater(0, d.callback, None)
|
||||
d.addCallback(mem_mon.show_diff)
|
||||
d.addCallback(self.mem_mon.show_diff)
|
||||
d.addErrback(lambda err: print("Memory Monitor error: %s\nPGC traceback: %s" % (err.getErrorMessage(), err.getTraceback())))
|
||||
|
||||
|
||||
@ -215,14 +207,14 @@ class Site(resource.Resource):
|
||||
|
||||
feed_regexp = re.compile('^/feed1?/(\d{1,10})$')
|
||||
|
||||
def __init__(self, db_creds, snapshot_dir, user_agent, debug=False, limiter=None):
|
||||
def __init__(self, db_creds, snapshot_dir, user_agent, debug=False, limiter=None, mem_mon=None, stat_tool=None):
|
||||
self.db_creds = db_creds
|
||||
self.snapshot_dir = snapshot_dir
|
||||
self.user_agent = user_agent
|
||||
self.limiter = limiter
|
||||
|
||||
self.downloader = Downloader(debug)
|
||||
self.feed = Feed(db_creds)
|
||||
self.downloader = Downloader(self.feed, debug, snapshot_dir, stat_tool, mem_mon)
|
||||
|
||||
def startRequest(self, request, url, feed_config = None):
|
||||
agent = BrowserLikeRedirectAgent(
|
||||
@ -279,20 +271,17 @@ class Site(resource.Resource):
|
||||
|
||||
class Server(object):
|
||||
|
||||
def __init__(self, port, db_creds, snapshot_dir, user_agent, debug=False, limiter=None):
|
||||
def __init__(self, port, db_creds, snapshot_dir, user_agent, debug=False, limiter=None, mem_mon=None):
|
||||
self.port = port
|
||||
self.db_creds = db_creds
|
||||
self.snapshot_dir = snapshot_dir
|
||||
self.user_agent = user_agent
|
||||
self.debug = debug
|
||||
self.limiter = limiter
|
||||
self.mem_mon = mem_mon
|
||||
|
||||
self.log_handler = LogHandler()
|
||||
|
||||
def setMemMonitor(self, _mem_mon=None):
|
||||
global mem_mon
|
||||
mem_mon = _mem_mon
|
||||
|
||||
def run(self):
|
||||
endpoints.serverFromString(reactor, "tcp:%s" % self.port).listen(server.Site(Site(self.db_creds, self.snapshot_dir, self.user_agent, self.debug, self.limiter)))
|
||||
endpoints.serverFromString(reactor, "tcp:%s" % self.port).listen(server.Site(Site(self.db_creds, self.snapshot_dir, self.user_agent, self.debug, self.limiter, self.mem_mon)))
|
||||
reactor.run()
|
Loading…
x
Reference in New Issue
Block a user