mirror of
https://github.com/taroved/pol
synced 2025-05-21 08:30:24 -07:00
split to modules
This commit is contained in:
parent
fcc773d434
commit
b3c58981d0
366
downloader.py
366
downloader.py
@ -1,368 +1,6 @@
|
|||||||
from __future__ import print_function
|
from pol import Server
|
||||||
import json
|
|
||||||
import time, sys, traceback
|
|
||||||
from hashlib import md5
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
from twisted.logger import globalLogBeginner, formatEventAsClassicLogText, Logger
|
|
||||||
from twisted.web import server, resource
|
|
||||||
from twisted.internet import reactor, endpoints, defer
|
|
||||||
from twisted.web.client import Agent, BrowserLikeRedirectAgent, readBody, PartialDownloadError, HTTPConnectionPool
|
|
||||||
from twisted.web.server import NOT_DONE_YET
|
|
||||||
from twisted.web.http_headers import Headers
|
|
||||||
from twisted.web.html import escape
|
|
||||||
twisted_headers = Headers
|
|
||||||
|
|
||||||
from scrapy.http.response.text import TextResponse
|
|
||||||
from scrapy.downloadermiddlewares.httpcompression import HttpCompressionMiddleware
|
|
||||||
from scrapy.downloadermiddlewares.decompression import DecompressionMiddleware
|
|
||||||
from scrapy.selector import Selector
|
|
||||||
|
|
||||||
from scrapy.http.request import Request
|
|
||||||
from scrapy.http import Headers
|
|
||||||
from scrapy.responsetypes import responsetypes
|
|
||||||
from scrapy.core.downloader.contextfactory import ScrapyClientContextFactory
|
|
||||||
|
|
||||||
from lxml import etree
|
|
||||||
import re
|
|
||||||
|
|
||||||
from feed import getFeedData, buildFeed, get_conn
|
|
||||||
from contextlib import closing
|
|
||||||
|
|
||||||
from settings import DOWNLOADER_USER_AGENT, FEED_REQUEST_PERIOD_LIMIT, DEBUG, SNAPSHOT_DIR
|
|
||||||
|
|
||||||
|
|
||||||
class bcolors:
|
|
||||||
HEADER = '\033[95m'
|
|
||||||
OKBLUE = '\033[94m'
|
|
||||||
OKGREEN = '\033[92m'
|
|
||||||
WARNING = '\033[93m'
|
|
||||||
FAIL = '\033[91m'
|
|
||||||
ENDC = '\033[0m'
|
|
||||||
BOLD = '\033[1m'
|
|
||||||
UNDERLINE = '\033[4m'
|
|
||||||
|
|
||||||
class RequestStat:
|
|
||||||
def __init__(self, ip, feed_id, post_cnt, new_post_cnt, url=None, ex_msg=None, ex_callstack=None):
|
|
||||||
self.ip = ip
|
|
||||||
self.feed_id = feed_id
|
|
||||||
self.post_cnt = post_cnt
|
|
||||||
self.new_post_cnt = new_post_cnt
|
|
||||||
self.url = url
|
|
||||||
self.ex_msg = ex_msg
|
|
||||||
self.ex_callstack = ex_callstack
|
|
||||||
|
|
||||||
def get_ip_id(ip, cur):
|
|
||||||
#import pdb;pdb.set_trace()
|
|
||||||
cur.execute("""select id from ips where address=%s""", (ip,))
|
|
||||||
ip_id = cur.fetchone()
|
|
||||||
if not ip_id:
|
|
||||||
cur.execute("insert into ips (address) values (%s)", (ip,))
|
|
||||||
ip_id = cur.lastrowid
|
|
||||||
return ip_id
|
|
||||||
|
|
||||||
|
|
||||||
def save_stat(stat):
|
|
||||||
try:
|
|
||||||
with closing(get_conn()) as conn:
|
|
||||||
with conn as cur:
|
|
||||||
ip_id = get_ip_id(stat.ip, cur)
|
|
||||||
cur.execute("""insert into requests (ip_id, feed_id, post_cnt, new_post_cnt)
|
|
||||||
values (%s, %s, %s, %s)""", (ip_id, stat.feed_id, stat.post_cnt, stat.new_post_cnt))
|
|
||||||
stat_id = cur.lastrowid
|
|
||||||
if not stat.feed_id:
|
|
||||||
cur.execute("insert into request_urls (url, request_id) values (%s, %s)", (stat.url.encode('utf-8')[:2000], stat_id))
|
|
||||||
|
|
||||||
if stat.ex_msg:
|
|
||||||
cur.execute("""insert into request_fails (request_id, ex_msg, ex_callstack)
|
|
||||||
values (%s, %s, %s)""", (stat_id, stat.ex_msg[0:2000], stat.ex_callstack[:2000]))
|
|
||||||
except:
|
|
||||||
traceback.print_exc(file=sys.stdout)
|
|
||||||
|
|
||||||
def print_log(event):
|
|
||||||
if 'isError' in event and event['isError']:
|
|
||||||
sys.stdout.write(bcolors.FAIL + formatEventAsClassicLogText(event) + bcolors.ENDC)
|
|
||||||
sys.stderr.write(formatEventAsClassicLogText(event))
|
|
||||||
sys.stderr.flush()
|
|
||||||
else:
|
|
||||||
if 'stat' in event and event['stat']:
|
|
||||||
save_stat(event['request'])
|
|
||||||
sys.stdout.write(formatEventAsClassicLogText(event))
|
|
||||||
sys.stdout.flush()
|
|
||||||
|
|
||||||
globalLogBeginner.beginLoggingTo([print_log], discardBuffer=True, redirectStandardIO=False) # requred, discardBuffer gets rid of the LimitedHistoryLogObserver, redirectStandardIO will loop print action
|
|
||||||
|
|
||||||
log = Logger()
|
|
||||||
|
|
||||||
if FEED_REQUEST_PERIOD_LIMIT:
|
|
||||||
import redis
|
|
||||||
|
|
||||||
def check_feed_request_time_limit(url):
|
|
||||||
if FEED_REQUEST_PERIOD_LIMIT:
|
|
||||||
r = redis.StrictRedis(host='localhost', port=6379, db=0)
|
|
||||||
previous_timestamp = r.get(url)
|
|
||||||
if previous_timestamp:
|
|
||||||
previous_timestamp = int(r.get(url))
|
|
||||||
time_passed = int(time.time()) - previous_timestamp
|
|
||||||
if time_passed <= FEED_REQUEST_PERIOD_LIMIT:
|
|
||||||
# time left to wait
|
|
||||||
return FEED_REQUEST_PERIOD_LIMIT - time_passed
|
|
||||||
r.set(url, int(time.time()))
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
#pool = HTTPConnectionPool(reactor, persistent=False)
|
|
||||||
#pool.cachedConnectionTimeout = 3
|
|
||||||
|
|
||||||
agent = BrowserLikeRedirectAgent(
|
|
||||||
Agent(reactor,
|
|
||||||
contextFactory=ScrapyClientContextFactory(), # skip certificate verification
|
|
||||||
connectTimeout=10),
|
|
||||||
#pool=pool),
|
|
||||||
redirectLimit=5
|
|
||||||
)
|
|
||||||
|
|
||||||
def html2json(el):
|
|
||||||
return [
|
|
||||||
el.tag,
|
|
||||||
{"tag-id": el.attrib["tag-id"]},
|
|
||||||
[html2json(e) for e in el.getchildren() if isinstance(e, etree.ElementBase)]
|
|
||||||
]
|
|
||||||
|
|
||||||
def setBaseAndRemoveScriptsAndMore(response, url):
|
|
||||||
response.selector.remove_namespaces()
|
|
||||||
|
|
||||||
tree = response.selector.root.getroottree()
|
|
||||||
|
|
||||||
# save html for extended selectors
|
|
||||||
file_name = '%s_%s' % (time.time(), md5(url).hexdigest())
|
|
||||||
file_path = SNAPSHOT_DIR + '/' + file_name
|
|
||||||
with open(file_path, 'w') as f:
|
|
||||||
f.write(url + '\n')
|
|
||||||
for k, v in response.headers.iteritems():
|
|
||||||
for vv in v:
|
|
||||||
f.write('%s: %s\n' % (k, vv))
|
|
||||||
f.write('\n\n' + etree.tostring(tree, encoding='utf-8', method='html'))
|
|
||||||
|
|
||||||
# set base url to html document
|
|
||||||
head = tree.xpath("//head")
|
|
||||||
if head:
|
|
||||||
head = head[0]
|
|
||||||
base = head.xpath("./base")
|
|
||||||
if base:
|
|
||||||
base = base[0]
|
|
||||||
else:
|
|
||||||
base = etree.Element("base")
|
|
||||||
head.insert(0, base)
|
|
||||||
base.set('href', url)
|
|
||||||
|
|
||||||
i = 1
|
|
||||||
for bad in tree.xpath("//*"):
|
|
||||||
# remove scripts
|
|
||||||
if bad.tag == 'script':
|
|
||||||
bad.getparent().remove(bad)
|
|
||||||
else:
|
|
||||||
# set tag-id attribute
|
|
||||||
bad.attrib['tag-id'] = str(i)
|
|
||||||
i += 1
|
|
||||||
|
|
||||||
# sanitize anchors
|
|
||||||
if bad.tag == 'a' and 'href' in bad.attrib:
|
|
||||||
bad.attrib['origin-href'] = bad.attrib['href']
|
|
||||||
del bad.attrib['href']
|
|
||||||
|
|
||||||
# remove html events
|
|
||||||
for attr in bad.attrib:
|
|
||||||
if attr.startswith('on'):
|
|
||||||
del bad.attrib[attr]
|
|
||||||
|
|
||||||
# sanitize forms
|
|
||||||
if bad.tag == 'form':
|
|
||||||
bad.attrib['onsubmit'] = "return false"
|
|
||||||
|
|
||||||
body = tree.xpath("//body")
|
|
||||||
if body:
|
|
||||||
# append html2json js object
|
|
||||||
jsobj = html2json(tree.getroot())
|
|
||||||
script = etree.Element('script', {'type': 'text/javascript'})
|
|
||||||
script.text = '\n'.join((
|
|
||||||
'var html2json = ' + json.dumps(jsobj) + ';',
|
|
||||||
'var snapshot_time = "' + file_name + '";'
|
|
||||||
))
|
|
||||||
body[0].append(script)
|
|
||||||
|
|
||||||
return (etree.tostring(tree, method='html'), file_name)
|
|
||||||
|
|
||||||
def buildScrapyResponse(response, body, url):
|
|
||||||
status = response.code
|
|
||||||
headers = Headers({k:','.join(v) for k,v in response.headers.getAllRawHeaders()})
|
|
||||||
respcls = responsetypes.from_args(headers=headers, url=url)
|
|
||||||
return respcls(url=url, status=status, headers=headers, body=body)
|
|
||||||
|
|
||||||
def buildScrapyRequest(url):
|
|
||||||
return Request(url)
|
|
||||||
|
|
||||||
def downloadStarted(response, request, url, feed_config):
|
|
||||||
d = readBody(response)
|
|
||||||
d.addCallback(downloadDone, request=request, response=response, feed_config=feed_config)
|
|
||||||
d.addErrback(downloadError, request=request, url=url, response=response, feed_config=feed_config)
|
|
||||||
return response
|
|
||||||
|
|
||||||
def downloadDone(response_str, request, response, feed_config):
|
|
||||||
url = response.request.absoluteURI
|
|
||||||
|
|
||||||
print('Response <%s> ready (%s bytes)' % (url, len(response_str)))
|
|
||||||
response = buildScrapyResponse(response, response_str, url)
|
|
||||||
|
|
||||||
response = HttpCompressionMiddleware().process_response(Request(url), response, None)
|
|
||||||
response = DecompressionMiddleware().process_response(None, response, None)
|
|
||||||
|
|
||||||
if (isinstance(response, TextResponse)):
|
|
||||||
ip = request.getHeader('x-real-ip') or request.client.host
|
|
||||||
if feed_config:
|
|
||||||
[response_str, post_cnt, new_post_cnt] = buildFeed(response, feed_config)
|
|
||||||
request.setHeader(b"Content-Type", b'text/xml; charset=utf-8')
|
|
||||||
log.info('Stat: ip={request.ip} feed_id={request.feed_id} post_cnt={request.post_cnt} new_post_cnt={request.new_post_cnt}', request=RequestStat(
|
|
||||||
ip=ip,
|
|
||||||
feed_id=feed_config['id'],
|
|
||||||
post_cnt=post_cnt,
|
|
||||||
new_post_cnt=new_post_cnt
|
|
||||||
),
|
|
||||||
stat=True
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
response_str, file_name = setBaseAndRemoveScriptsAndMore(response, url)
|
|
||||||
log.info('Stat: ip={request.ip} url={request.url}', request=RequestStat(
|
|
||||||
ip=ip,
|
|
||||||
feed_id=0,
|
|
||||||
post_cnt=0,
|
|
||||||
new_post_cnt=0,
|
|
||||||
url=url
|
|
||||||
),
|
|
||||||
stat=True
|
|
||||||
)
|
|
||||||
|
|
||||||
request.write(response_str)
|
|
||||||
request.finish()
|
|
||||||
run_pgc()
|
|
||||||
|
|
||||||
from pympler import tracker
|
|
||||||
import gc
|
|
||||||
|
|
||||||
tr = tracker.SummaryTracker()
|
|
||||||
MON_PERIOD_SECONDS = 10 * 60 # 1 hours
|
|
||||||
mon_time = None
|
|
||||||
def mon(none):
|
|
||||||
global mon_time
|
|
||||||
tm = int(time.time())
|
|
||||||
if not mon_time or tm - mon_time >= MON_PERIOD_SECONDS:
|
|
||||||
#global pool
|
|
||||||
#pool.closeCachedConnections()
|
|
||||||
gc.collect()
|
|
||||||
global tr
|
|
||||||
for line in tr.format_diff():
|
|
||||||
log.info(line)
|
|
||||||
mon_time = tm
|
|
||||||
|
|
||||||
def run_pgc():
|
|
||||||
d = defer.Deferred()
|
|
||||||
reactor.callLater(0, d.callback, None)
|
|
||||||
d.addCallback(mon)
|
|
||||||
d.addErrback(lambda err: print("PGC error: %s\nPGC traceback: %s" % (err.getErrorMessage(), err.getTraceback())))
|
|
||||||
|
|
||||||
def error_html(msg):
|
|
||||||
return "<html><body>%s</body></html" % escape(msg).replace("\n", "<br/>\n")
|
|
||||||
|
|
||||||
def downloadError(error, request=None, url=None, response=None, feed_config=None):
|
|
||||||
# read for details: https://stackoverflow.com/questions/29423986/twisted-giving-twisted-web-client-partialdownloaderror-200-ok
|
|
||||||
if error.type is PartialDownloadError and error.value.status == '200':
|
|
||||||
d = defer.Deferred()
|
|
||||||
reactor.callLater(0, d.callback, error.value.response) # error.value.response is response_str
|
|
||||||
d.addCallback(downloadDone, request=request, response=response, feed_config=feed_config)
|
|
||||||
d.addErrback(downloadError, request=request, url=url, response=response, feed_config=feed_config)
|
|
||||||
return
|
|
||||||
|
|
||||||
if DEBUG:
|
|
||||||
request.write('Downloader error: ' + error.getErrorMessage())
|
|
||||||
request.write('Traceback: ' + error.getTraceback())
|
|
||||||
else:
|
|
||||||
request.write(error_html('Something wrong. Contact us by email: politepol.com@gmail.com \n Scary mantra: ' + error.getErrorMessage()))
|
|
||||||
sys.stderr.write('\n'.join([str(datetime.utcnow()), request.uri, url, 'Downloader error: ' + error.getErrorMessage(), 'Traceback: ' + error.getTraceback()]))
|
|
||||||
request.finish()
|
|
||||||
|
|
||||||
try:
|
|
||||||
feed_id = feed_config and feed_config['id']
|
|
||||||
s_url = None
|
|
||||||
if not feed_id:
|
|
||||||
feed_id = 0
|
|
||||||
s_url = url
|
|
||||||
log.info('Stat: ip={request.ip} feed_id={request.feed_id} url="{request.url}" error="{request.ex_msg}"', request=RequestStat(
|
|
||||||
ip = request.getHeader('x-real-ip') or request.client.host,
|
|
||||||
feed_id = feed_id,
|
|
||||||
post_cnt=0,
|
|
||||||
new_post_cnt=0,
|
|
||||||
url=s_url,
|
|
||||||
ex_msg=error.getErrorMessage(),
|
|
||||||
ex_callstack=error.getTraceback()
|
|
||||||
),
|
|
||||||
stat=True
|
|
||||||
)
|
|
||||||
except:
|
|
||||||
traceback.print_exc(file=sys.stdout)
|
|
||||||
|
|
||||||
|
|
||||||
class Downloader(resource.Resource):
|
|
||||||
isLeaf = True
|
|
||||||
|
|
||||||
feed_regexp = re.compile('^/feed1?/(\d{1,10})$')
|
|
||||||
|
|
||||||
def startRequest(self, request, url, feed_config = None):
|
|
||||||
d = agent.request(
|
|
||||||
'GET',
|
|
||||||
url,
|
|
||||||
twisted_headers({
|
|
||||||
'Accept': ['text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'],
|
|
||||||
'Accept-Encoding': ['gzip, deflate, sdch'],
|
|
||||||
'User-Agent': [DOWNLOADER_USER_AGENT]
|
|
||||||
}),
|
|
||||||
None
|
|
||||||
)
|
|
||||||
print('Request <GET %s> started' % (url,))
|
|
||||||
d.addCallback(downloadStarted, request=request, url=url, feed_config=feed_config)
|
|
||||||
d.addErrback(downloadError, request=request, url=url)
|
|
||||||
|
|
||||||
def render_GET(self, request):
|
|
||||||
'''
|
|
||||||
Render page for frontend or RSS feed
|
|
||||||
'''
|
|
||||||
if 'url' in request.args: # page for frontend
|
|
||||||
url = request.args['url'][0]
|
|
||||||
|
|
||||||
self.startRequest(request, url)
|
|
||||||
return NOT_DONE_YET
|
|
||||||
elif self.feed_regexp.match(request.uri) is not None: # feed
|
|
||||||
feed_id = self.feed_regexp.match(request.uri).groups()[0]
|
|
||||||
|
|
||||||
time_left = check_feed_request_time_limit(request.uri)
|
|
||||||
if time_left:
|
|
||||||
request.setResponseCode(429)
|
|
||||||
request.setHeader('Retry-After', str(time_left) + ' seconds')
|
|
||||||
return 'Too Many Requests. Retry after %s seconds' % (str(time_left))
|
|
||||||
else:
|
|
||||||
res = getFeedData(request, feed_id)
|
|
||||||
|
|
||||||
if isinstance(res, basestring): # error message
|
|
||||||
return res
|
|
||||||
|
|
||||||
url, feed_config = res
|
|
||||||
self.startRequest(request, url, feed_config)
|
|
||||||
return NOT_DONE_YET
|
|
||||||
else: # neither page and feed
|
|
||||||
return 'Url is required'
|
|
||||||
|
|
||||||
|
|
||||||
port = sys.argv[1] if len(sys.argv) >= 2 else 1234
|
port = sys.argv[1] if len(sys.argv) >= 2 else 1234
|
||||||
|
|
||||||
endpoints.serverFromString(reactor, "tcp:%s" % port).listen(server.Site(Downloader()))
|
Server(port, DATABASES['default'], SNAPSHOT_DIR, DOWNLOADER_USER_AGENT, DEBUG).run()
|
||||||
print('Server starting at http://localhost:%s' % port)
|
|
||||||
reactor.run()
|
|
||||||
|
7
pol/db.py
Executable file
7
pol/db.py
Executable file
@ -0,0 +1,7 @@
|
|||||||
|
import MySQLdb
|
||||||
|
|
||||||
|
|
||||||
|
def get_conn(creds):
|
||||||
|
db = MySQLdb.connect(host=creds['HOST'], port=int(creds['PORT']), user=creds['USER'], passwd=creds['PASSWORD'], db=creds['NAME'], init_command='SET NAMES utf8mb4')
|
||||||
|
db.autocommit(True)
|
||||||
|
return db
|
172
pol/feed.py
Executable file
172
pol/feed.py
Executable file
@ -0,0 +1,172 @@
|
|||||||
|
import w3lib.url
|
||||||
|
import w3lib.html
|
||||||
|
|
||||||
|
from lxml import etree
|
||||||
|
import re, sys
|
||||||
|
from hashlib import md5
|
||||||
|
|
||||||
|
from feedgenerator import Rss201rev2Feed, Enclosure
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
import MySQLdb
|
||||||
|
from contextlib import closing
|
||||||
|
from settings import DATABASES, DOWNLOADER_USER_AGENT
|
||||||
|
from twisted.logger import Logger
|
||||||
|
|
||||||
|
|
||||||
|
log = Logger()
|
||||||
|
|
||||||
|
class Feed(object):
|
||||||
|
|
||||||
|
url_hash_regexp = re.compile('(#.*)?$')
|
||||||
|
|
||||||
|
POST_TIME_DISTANCE = 15 # minutes, RSS Feed Reader skip same titles created in 10 min interval
|
||||||
|
|
||||||
|
FIELD_IDS = {'title': 1, 'description': 2, 'link': 3}
|
||||||
|
|
||||||
|
def __init__(self, db_creds, log):
|
||||||
|
self.db_creds = db_creds
|
||||||
|
self.log = log
|
||||||
|
|
||||||
|
|
||||||
|
def save_post(self, conn, created, feed_id, post_fields):
|
||||||
|
with conn as cur:
|
||||||
|
cur.execute("""insert into frontend_post (md5sum, created, feed_id)
|
||||||
|
values (%s, %s, %s)""", (post_fields['md5'], created, feed_id))
|
||||||
|
|
||||||
|
post_id = conn.insert_id()
|
||||||
|
for key in ['title', 'description', 'title_link']:
|
||||||
|
if key in post_fields:
|
||||||
|
cur.execute("""insert into frontend_postfield (field_id, post_id, `text`)
|
||||||
|
values (%s, %s, %s)""", (FIELD_IDS[key], post_id, post_fields[key].encode('utf-8')))
|
||||||
|
log.info('Post saved id:{id!r}', id=post_id)
|
||||||
|
|
||||||
|
def fill_time(self, feed_id, items):
|
||||||
|
if not items:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
new_post_cnt = 0
|
||||||
|
for item in items:
|
||||||
|
#create md5
|
||||||
|
h = md5('')
|
||||||
|
for key in ['title', 'description', 'link']:
|
||||||
|
if key in item:
|
||||||
|
h.update(item[key].encode('utf-8'))
|
||||||
|
item['md5'] = h.hexdigest()
|
||||||
|
|
||||||
|
#fetch dates from db
|
||||||
|
fetched_dates = {}
|
||||||
|
with closing(get_conn(self.db_creds)) as conn:
|
||||||
|
with conn as cur:
|
||||||
|
quoted_hashes = ','.join(["'%s'" % (i['md5']) for i in items])
|
||||||
|
|
||||||
|
cur.execute("""select p.md5sum, p.created, p.id
|
||||||
|
from frontend_post p
|
||||||
|
where p.md5sum in (%s)
|
||||||
|
and p.feed_id=%s""" % (quoted_hashes, feed_id,))
|
||||||
|
rows = cur.fetchall()
|
||||||
|
self.log.debug('Selected {count!r} posts', count=len(rows))
|
||||||
|
for row in rows:
|
||||||
|
md5hash = row[0]
|
||||||
|
created = row[1]
|
||||||
|
post_id = row[2]
|
||||||
|
fetched_dates[md5hash] = created
|
||||||
|
|
||||||
|
cur_time = datetime.datetime.utcnow()
|
||||||
|
for item in items:
|
||||||
|
if item['md5'] in fetched_dates:
|
||||||
|
item['time'] = fetched_dates[item['md5']]
|
||||||
|
else:
|
||||||
|
item['time'] = cur_time
|
||||||
|
self.save_post(conn, cur_time, feed_id, item)
|
||||||
|
new_post_cnt += 1
|
||||||
|
cur_time -= datetime.timedelta(minutes=POST_TIME_DISTANCE)
|
||||||
|
return new_post_cnt
|
||||||
|
|
||||||
|
def _build_link(self, html, doc_url, url):
|
||||||
|
base_url = w3lib.html.get_base_url(html, doc_url)
|
||||||
|
return w3lib.url.urljoin_rfc(base_url, url).decode('utf-8')
|
||||||
|
|
||||||
|
def buildFeed(self, response, feed_config):
|
||||||
|
response.selector.remove_namespaces()
|
||||||
|
|
||||||
|
selector = response.selector
|
||||||
|
tree = selector.root.getroottree()
|
||||||
|
# get data from html
|
||||||
|
items = []
|
||||||
|
for node in selector.xpath(feed_config['xpath']):
|
||||||
|
item = {}
|
||||||
|
required_count = 0
|
||||||
|
required_found = 0
|
||||||
|
for field_name in ['title', 'description', 'link']:
|
||||||
|
if field_name in feed_config['fields']:
|
||||||
|
if feed_config['required'][field_name]:
|
||||||
|
required_count += 1
|
||||||
|
|
||||||
|
extracted = node.xpath(feed_config['fields'][field_name]).extract()
|
||||||
|
if extracted:
|
||||||
|
item[field_name] = u''.join(extracted)
|
||||||
|
if feed_config['required'][field_name]:
|
||||||
|
required_found += 1
|
||||||
|
if field_name == 'link':
|
||||||
|
item['link'] = _build_link(response.body_as_unicode(), feed_config['uri'], item[field_name])
|
||||||
|
|
||||||
|
if required_count == required_found:
|
||||||
|
items.append(item)
|
||||||
|
|
||||||
|
title = selector.xpath('//title/text()').extract()
|
||||||
|
|
||||||
|
#build feed
|
||||||
|
feed = Rss201rev2Feed(
|
||||||
|
title = title[0] if title else 'Polite Pol: ' + feed_config['uri'],
|
||||||
|
link=feed_config['uri'],
|
||||||
|
description="Generated by PolitePol.com.\n"+\
|
||||||
|
"Source page url: " + feed_config['uri'],
|
||||||
|
language="en",
|
||||||
|
)
|
||||||
|
new_post_cnt = fill_time(feed_config['id'], items)
|
||||||
|
|
||||||
|
for item in items:
|
||||||
|
title = item['title'] if 'title' in item else ''
|
||||||
|
desc = item['description'] if 'description' in item else ''
|
||||||
|
time = item['time'] if 'time' in item else datetime.datetime.utcnow()
|
||||||
|
if 'link' in item:
|
||||||
|
link = item['link']
|
||||||
|
else:
|
||||||
|
link = url_hash_regexp.sub('#' + md5((title+desc).encode('utf-8')).hexdigest(), feed_config['uri'])
|
||||||
|
feed.add_item(
|
||||||
|
title = title,
|
||||||
|
link = link,
|
||||||
|
unique_id = link,
|
||||||
|
description = desc,
|
||||||
|
#enclosure=Enclosure(fields[4], "32000", "image/jpeg") if 4 in fields else None, #"Image"
|
||||||
|
pubdate = time
|
||||||
|
)
|
||||||
|
return [feed.writeString('utf-8'), len(items), new_post_cnt]
|
||||||
|
|
||||||
|
def getFeedData(self, request, feed_id):
|
||||||
|
# get url, xpathes
|
||||||
|
feed = {}
|
||||||
|
|
||||||
|
with closing(get_conn(self.db_creds)) as conn:
|
||||||
|
with conn as cur:
|
||||||
|
cur.execute("""select f.uri, f.xpath, fi.name, ff.xpath, fi.required from frontend_feed f
|
||||||
|
right join frontend_feedfield ff on ff.feed_id=f.id
|
||||||
|
left join frontend_field fi on fi.id=ff.field_id
|
||||||
|
where f.id=%s""", (feed_id,))
|
||||||
|
rows = cur.fetchall()
|
||||||
|
|
||||||
|
for row in rows:
|
||||||
|
if not feed:
|
||||||
|
feed['id'] = feed_id
|
||||||
|
feed['uri'] = row[0]
|
||||||
|
feed['xpath'] = row[1]
|
||||||
|
feed['fields'] = {}
|
||||||
|
feed['required'] = {}
|
||||||
|
feed['fields'][row[2]] = row[3]
|
||||||
|
feed['required'][row[2]] = row[4]
|
||||||
|
|
||||||
|
if feed:
|
||||||
|
return [feed['uri'], feed]
|
||||||
|
else:
|
||||||
|
return 'Feed generator error: config of feed is empty'
|
33
pol/log.py
Executable file
33
pol/log.py
Executable file
@ -0,0 +1,33 @@
|
|||||||
|
import sys
|
||||||
|
from twisted.logger import globalLogBeginner, formatEventAsClassicLogText
|
||||||
|
|
||||||
|
|
||||||
|
class bcolors:
|
||||||
|
HEADER = '\033[95m'
|
||||||
|
OKBLUE = '\033[94m'
|
||||||
|
OKGREEN = '\033[92m'
|
||||||
|
WARNING = '\033[93m'
|
||||||
|
FAIL = '\033[91m'
|
||||||
|
ENDC = '\033[0m'
|
||||||
|
BOLD = '\033[1m'
|
||||||
|
UNDERLINE = '\033[4m'
|
||||||
|
|
||||||
|
|
||||||
|
class LogHandler(object):
|
||||||
|
"""Handler of twisted log meaasges"""
|
||||||
|
|
||||||
|
def __init__(self, stat_tool=None):
|
||||||
|
self.stat_tool = stat_tool
|
||||||
|
# requred, discardBuffer gets rid of the LimitedHistoryLogObserver, redirectStandardIO will loop print action
|
||||||
|
globalLogBeginner.beginLoggingTo([self.print_log], discardBuffer=True, redirectStandardIO=False)
|
||||||
|
|
||||||
|
|
||||||
|
def print_log(event):
|
||||||
|
if 'isError' in event and event['isError']:
|
||||||
|
sys.stdout.write(bcolors.FAIL + formatEventAsClassicLogText(event) + bcolors.ENDC)
|
||||||
|
sys.stderr.write(formatEventAsClassicLogText(event))
|
||||||
|
sys.stderr.flush()
|
||||||
|
else:
|
||||||
|
sys.stdout.write(formatEventAsClassicLogText(event))
|
||||||
|
sys.stdout.flush()
|
||||||
|
|
21
pol/memmon.py
Executable file
21
pol/memmon.py
Executable file
@ -0,0 +1,21 @@
|
|||||||
|
import time
|
||||||
|
from pympler import tracker
|
||||||
|
import gc
|
||||||
|
|
||||||
|
|
||||||
|
class Monitor(object):
|
||||||
|
|
||||||
|
prev_time = None
|
||||||
|
|
||||||
|
def __init__(self, period_second=10 * 60, log=None):
|
||||||
|
self.period_second = period_second
|
||||||
|
self.log = log
|
||||||
|
self.tr = tracker.SummaryTracker()
|
||||||
|
|
||||||
|
def show_diff(none):
|
||||||
|
tm = int(time.time())
|
||||||
|
if not self.prev_time or tm - prev_time >= self.period_second:
|
||||||
|
gc.collect()
|
||||||
|
for line in tr.format_diff():
|
||||||
|
self.log.info(line)
|
||||||
|
self.prev_time = tm
|
286
pol/server.py
Executable file
286
pol/server.py
Executable file
@ -0,0 +1,286 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
from hashlib import md5
|
||||||
|
import json
|
||||||
|
import time, sys, traceback
|
||||||
|
import re
|
||||||
|
|
||||||
|
from lxml import etree
|
||||||
|
|
||||||
|
from twisted.web import server, resource
|
||||||
|
from twisted.internet import reactor, endpoints,
|
||||||
|
from twisted.web.client import Agent, BrowserLikeRedirectAgent, readBody, PartialDownloadError, HTTPConnectionPool
|
||||||
|
from twisted.web.server import
|
||||||
|
from twisted.web.http_headers import Headers
|
||||||
|
from twisted.web.html import escape
|
||||||
|
twisted_headers = Headers
|
||||||
|
from twisted.logger import Logger
|
||||||
|
|
||||||
|
from scrapy.http.response.text import TextResponse
|
||||||
|
from scrapy.downloadermiddlewares.httpcompression import HttpCompressionMiddleware
|
||||||
|
from scrapy.downloadermiddlewares.decompression import DecompressionMiddleware
|
||||||
|
from scrapy.http.request import Request
|
||||||
|
from scrapy.http import Headers
|
||||||
|
from scrapy.responsetypes import responsetypes
|
||||||
|
from scrapy.core.downloader.contextfactory import ScrapyClientContextFactory
|
||||||
|
|
||||||
|
from .feed import Feed
|
||||||
|
|
||||||
|
class Downloader(object):
|
||||||
|
|
||||||
|
def __init__(self, debug, stat_tool=None, mem_mon=None):
|
||||||
|
self.debug = debug
|
||||||
|
self.stat_tool = stat_tool
|
||||||
|
self.mem_mon = mem_mon
|
||||||
|
|
||||||
|
def html2json(self, el):
|
||||||
|
return [
|
||||||
|
el.tag,
|
||||||
|
{"tag-id": el.attrib["tag-id"]},
|
||||||
|
[html2json(e) for e in el.getchildren() if isinstance(e, etree.ElementBase)]
|
||||||
|
]
|
||||||
|
|
||||||
|
def setBaseAndRemoveScriptsAndMore(self, response, url):
|
||||||
|
response.selector.remove_namespaces()
|
||||||
|
|
||||||
|
tree = response.selector.root.getroottree()
|
||||||
|
|
||||||
|
# save html for extended selectors
|
||||||
|
file_name = '%s_%s' % (time.time(), md5(url).hexdigest())
|
||||||
|
file_path = SNAPSHOT_DIR + '/' + file_name
|
||||||
|
with open(file_path, 'w') as f:
|
||||||
|
f.write(url + '\n')
|
||||||
|
for k, v in response.headers.iteritems():
|
||||||
|
for vv in v:
|
||||||
|
f.write('%s: %s\n' % (k, vv))
|
||||||
|
f.write('\n\n' + etree.tostring(tree, encoding='utf-8', method='html'))
|
||||||
|
|
||||||
|
# set base url to html document
|
||||||
|
head = tree.xpath("//head")
|
||||||
|
if head:
|
||||||
|
head = head[0]
|
||||||
|
base = head.xpath("./base")
|
||||||
|
if base:
|
||||||
|
base = base[0]
|
||||||
|
else:
|
||||||
|
base = etree.Element("base")
|
||||||
|
head.insert(0, base)
|
||||||
|
base.set('href', url)
|
||||||
|
|
||||||
|
i = 1
|
||||||
|
for bad in tree.xpath("//*"):
|
||||||
|
# remove scripts
|
||||||
|
if bad.tag == 'script':
|
||||||
|
bad.getparent().remove(bad)
|
||||||
|
else:
|
||||||
|
# set tag-id attribute
|
||||||
|
bad.attrib['tag-id'] = str(i)
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
# sanitize anchors
|
||||||
|
if bad.tag == 'a' and 'href' in bad.attrib:
|
||||||
|
bad.attrib['origin-href'] = bad.attrib['href']
|
||||||
|
del bad.attrib['href']
|
||||||
|
|
||||||
|
# remove html events
|
||||||
|
for attr in bad.attrib:
|
||||||
|
if attr.startswith('on'):
|
||||||
|
del bad.attrib[attr]
|
||||||
|
|
||||||
|
# sanitize forms
|
||||||
|
if bad.tag == 'form':
|
||||||
|
bad.attrib['onsubmit'] = "return false"
|
||||||
|
|
||||||
|
body = tree.xpath("//body")
|
||||||
|
if body:
|
||||||
|
# append html2json js object
|
||||||
|
jsobj = self.html2json(tree.getroot())
|
||||||
|
script = etree.Element('script', {'type': 'text/javascript'})
|
||||||
|
script.text = '\n'.join((
|
||||||
|
'var html2json = ' + json.dumps(jsobj) + ';',
|
||||||
|
'var snapshot_time = "' + file_name + '";'
|
||||||
|
))
|
||||||
|
body[0].append(script)
|
||||||
|
|
||||||
|
return (etree.tostring(tree, method='html'), file_name)
|
||||||
|
|
||||||
|
def buildScrapyResponse(self, response, body, url):
|
||||||
|
status = response.code
|
||||||
|
headers = Headers({k:','.join(v) for k,v in response.headers.getAllRawHeaders()})
|
||||||
|
respcls = responsetypes.from_args(headers=headers, url=url)
|
||||||
|
return respcls(url=url, status=status, headers=headers, body=body)
|
||||||
|
|
||||||
|
def error_html(self, msg):
|
||||||
|
return "<html><body>%s</body></html" % escape(msg).replace("\n", "<br/>\n")
|
||||||
|
|
||||||
|
def downloadError(self, error, request=None, url=None, response=None, feed_config=None):
|
||||||
|
# read for details: https://stackoverflow.com/questions/29423986/twisted-giving-twisted-web-client-partialdownloaderror-200-ok
|
||||||
|
if error.type is PartialDownloadError and error.value.status == '200':
|
||||||
|
d = defer.Deferred()
|
||||||
|
reactor.callLater(0, d.callback, error.value.response) # error.value.response is response_str
|
||||||
|
d.addCallback(downloadDone, request=request, response=response, feed_config=feed_config)
|
||||||
|
d.addErrback(downloadError, request=request, url=url, response=response, feed_config=feed_config)
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.debug:
|
||||||
|
request.write('Downloader error: ' + error.getErrorMessage())
|
||||||
|
request.write('Traceback: ' + error.getTraceback())
|
||||||
|
else:
|
||||||
|
request.write(self.error_html('Something wrong. Contact us by email: politepol.com@gmail.com \n Scary mantra: ' + error.getErrorMessage()))
|
||||||
|
sys.stderr.write('\n'.join([str(datetime.utcnow()), request.uri, url, 'Downloader error: ' + error.getErrorMessage(), 'Traceback: ' + error.getTraceback()]))
|
||||||
|
request.finish()
|
||||||
|
|
||||||
|
try:
|
||||||
|
feed_id = feed_config and feed_config['id']
|
||||||
|
s_url = None
|
||||||
|
if not feed_id:
|
||||||
|
feed_id = 0
|
||||||
|
s_url = url
|
||||||
|
log.info('Stat: ip={request.ip} feed_id={request.feed_id} url="{request.url}" error="{request.ex_msg}"', request=RequestStat(
|
||||||
|
ip = request.getHeader('x-real-ip') or request.client.host,
|
||||||
|
feed_id = feed_id,
|
||||||
|
post_cnt=0,
|
||||||
|
new_post_cnt=0,
|
||||||
|
url=s_url,
|
||||||
|
ex_msg=error.getErrorMessage(),
|
||||||
|
ex_callstack=error.getTraceback()
|
||||||
|
),
|
||||||
|
stat=True
|
||||||
|
)
|
||||||
|
except:
|
||||||
|
traceback.print_exc(file=sys.stdout)
|
||||||
|
|
||||||
|
|
||||||
|
def downloadStarted(response, request, url, feed_config):
|
||||||
|
d = readBody(response)
|
||||||
|
d.addCallback(downloadDone, request=request, response=response, feed_config=feed_config)
|
||||||
|
d.addErrback(downloadError, request=request, url=url, response=response, feed_config=feed_config)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def downloadDone(response_str, request, response, feed_config):
|
||||||
|
url = response.request.absoluteURI
|
||||||
|
|
||||||
|
print('Response <%s> ready (%s bytes)' % (url, len(response_str)))
|
||||||
|
response = buildScrapyResponse(response, response_str, url)
|
||||||
|
|
||||||
|
response = HttpCompressionMiddleware().process_response(Request(url), response, None)
|
||||||
|
response = DecompressionMiddleware().process_response(None, response, None)
|
||||||
|
|
||||||
|
if (isinstance(response, TextResponse)):
|
||||||
|
ip = request.getHeader('x-real-ip') or request.client.host
|
||||||
|
if feed_config:
|
||||||
|
[response_str, post_cnt, new_post_cnt] = self.feed.buildFeed(response, feed_config)
|
||||||
|
request.setHeader(b"Content-Type", b'text/xml; charset=utf-8')
|
||||||
|
log.info('Stat: ip={request.ip} feed_id={request.feed_id} post_cnt={request.post_cnt} new_post_cnt={request.new_post_cnt}', request=RequestStat(
|
||||||
|
ip=ip,
|
||||||
|
feed_id=feed_config['id'],
|
||||||
|
post_cnt=post_cnt,
|
||||||
|
new_post_cnt=new_post_cnt
|
||||||
|
),
|
||||||
|
stat=True
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
response_str, file_name = setBaseAndRemoveScriptsAndMore(response, url)
|
||||||
|
log.info('Stat: ip={request.ip} url={request.url}', request=RequestStat(
|
||||||
|
ip=ip,
|
||||||
|
feed_id=0,
|
||||||
|
post_cnt=0,
|
||||||
|
new_post_cnt=0,
|
||||||
|
url=url
|
||||||
|
),
|
||||||
|
stat=True
|
||||||
|
)
|
||||||
|
|
||||||
|
request.write(response_str)
|
||||||
|
request.finish()
|
||||||
|
run_mem_mon()
|
||||||
|
|
||||||
|
def run_mem_mon():
|
||||||
|
global mem_mon
|
||||||
|
if mem_mon:
|
||||||
|
d = defer.Deferred()
|
||||||
|
reactor.callLater(0, d.callback, None)
|
||||||
|
d.addCallback(mem_mon.show_diff)
|
||||||
|
d.addErrback(lambda err: print("Memory Monitor error: %s\nPGC traceback: %s" % (err.getErrorMessage(), err.getTraceback())))
|
||||||
|
|
||||||
|
|
||||||
|
class Site(resource.Resource):
|
||||||
|
isLeaf = True
|
||||||
|
|
||||||
|
feed_regexp = re.compile('^/feed1?/(\d{1,10})$')
|
||||||
|
|
||||||
|
def __init__(self, db_creds, snapshot_dir, user_agent, debug):
|
||||||
|
self.db_creds = db_creds
|
||||||
|
self.snapshot_dir = snapshot_dir
|
||||||
|
self.user_agent = user_agent
|
||||||
|
|
||||||
|
self.downloader = Downloader(debug)
|
||||||
|
self.feed = Feed(db_creds, log)
|
||||||
|
|
||||||
|
def startRequest(self, request, url, feed_config = None):
|
||||||
|
agent = BrowserLikeRedirectAgent(
|
||||||
|
Agent(reactor,
|
||||||
|
contextFactory=ScrapyClientContextFactory(), # skip certificate verification
|
||||||
|
connectTimeout=10),
|
||||||
|
#pool=pool),
|
||||||
|
redirectLimit=5
|
||||||
|
)
|
||||||
|
|
||||||
|
d = agent.request(
|
||||||
|
'GET',
|
||||||
|
url,
|
||||||
|
twisted_headers({
|
||||||
|
'Accept': ['text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'],
|
||||||
|
'Accept-Encoding': ['gzip, deflate, sdch'],
|
||||||
|
'User-Agent': [self.user_agent]
|
||||||
|
}),
|
||||||
|
None
|
||||||
|
)
|
||||||
|
print('Request <GET %s> started' % (url,))
|
||||||
|
d.addCallback(self.downloader.downloadStarted, request=request, url=url, feed_config=feed_config)
|
||||||
|
d.addErrback(self.downloader.downloadError, request=request, url=url)
|
||||||
|
|
||||||
|
def render_GET(self, request):
|
||||||
|
'''
|
||||||
|
Render page for frontend or RSS feed
|
||||||
|
'''
|
||||||
|
if 'url' in request.args: # page for frontend
|
||||||
|
url = request.args['url'][0]
|
||||||
|
|
||||||
|
self.startRequest(request, url)
|
||||||
|
return NOT_DONE_YET
|
||||||
|
elif self.feed_regexp.match(request.uri) is not None: # feed
|
||||||
|
feed_id = self.feed_regexp.match(request.uri).groups()[0]
|
||||||
|
|
||||||
|
time_left = check_feed_request_time_limit(request.uri)
|
||||||
|
if time_left:
|
||||||
|
request.setResponseCode(429)
|
||||||
|
request.setHeader('Retry-After', str(time_left) + ' seconds')
|
||||||
|
return 'Too Many Requests. Retry after %s seconds' % (str(time_left))
|
||||||
|
else:
|
||||||
|
res = self.feed.getFeedData(request, feed_id)
|
||||||
|
|
||||||
|
if isinstance(res, basestring): # error message
|
||||||
|
return res
|
||||||
|
|
||||||
|
url, feed_config = res
|
||||||
|
self.startRequest(request, url, feed_config)
|
||||||
|
return NOT_DONE_YET
|
||||||
|
else: # neither page and feed
|
||||||
|
return 'Url is required'
|
||||||
|
|
||||||
|
|
||||||
|
class Server(object):
|
||||||
|
|
||||||
|
def __init__(self, port, db_creds, snapshot_dir, user_agent, debug):
|
||||||
|
self.port = port
|
||||||
|
self.db_creds = db_creds
|
||||||
|
self.snapshot_dir = snapshot_dir
|
||||||
|
self.user_agent = user_agent
|
||||||
|
|
||||||
|
def setMemMonitor(_mem_mon=None)
|
||||||
|
global mem_mon
|
||||||
|
mem_mon = _mem_mon
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
endpoints.serverFromString(reactor, "tcp:%s" % self.port).listen(server.Site(Site(self.db_creds, self.snapshot_dir, self.user_agent, self.debug)))
|
||||||
|
reactor.run()
|
Loading…
x
Reference in New Issue
Block a user