mirror of
https://github.com/taroved/pol
synced 2025-05-28 03:50:08 -07:00
small fixes
This commit is contained in:
parent
6f97665b68
commit
d90386117b
145
pol/client.py
Normal file
145
pol/client.py
Normal file
@ -0,0 +1,145 @@
|
||||
from __future__ import division, absolute_import
|
||||
|
||||
import warnings
|
||||
|
||||
from twisted.python.failure import Failure
|
||||
from twisted.internet import defer, protocol, reactor
|
||||
from twisted.web._newclient import (
|
||||
HTTP11ClientProtocol,
|
||||
PotentialDataLoss,
|
||||
Request,
|
||||
RequestGenerationFailed,
|
||||
RequestNotSent,
|
||||
RequestTransmissionFailed,
|
||||
Response,
|
||||
ResponseDone,
|
||||
ResponseFailed,
|
||||
ResponseNeverReceived,
|
||||
_WrapperException,
|
||||
)
|
||||
from twisted.web.client import PartialDownloadError
|
||||
|
||||
IGNORE_SIZE = 0
|
||||
|
||||
class _PpReadBodyProtocol(protocol.Protocol):
|
||||
"""
|
||||
Protocol that collects data sent to it.
|
||||
|
||||
This is a helper for L{IResponse.deliverBody}, which collects the body and
|
||||
fires a deferred with it.
|
||||
|
||||
@ivar deferred: See L{__init__}.
|
||||
@ivar status: See L{__init__}.
|
||||
@ivar message: See L{__init__}.
|
||||
|
||||
@ivar dataBuffer: list of byte-strings received
|
||||
@type dataBuffer: L{list} of L{bytes}
|
||||
"""
|
||||
|
||||
def __init__(self, status, message, deferred, max_size):
|
||||
"""
|
||||
@param status: Status of L{IResponse}
|
||||
@ivar status: L{int}
|
||||
|
||||
@param message: Message of L{IResponse}
|
||||
@type message: L{bytes}
|
||||
|
||||
@param deferred: deferred to fire when response is complete
|
||||
@type deferred: L{Deferred} firing with L{bytes}
|
||||
"""
|
||||
self.deferred = deferred
|
||||
self.status = status
|
||||
self.message = message
|
||||
self.dataBuffer = []
|
||||
|
||||
self.max_size = max_size
|
||||
self.buffer_size = 0
|
||||
|
||||
|
||||
def dataReceived(self, data):
|
||||
"""
|
||||
Accumulate some more bytes from the response.
|
||||
"""
|
||||
self.dataBuffer.append(data)
|
||||
|
||||
self.buffer_size += len(data)
|
||||
if self.max_size != IGNORE_SIZE and self.buffer_size > self.max_size:
|
||||
self.transport.stopProducing() # https://twistedmatrix.com/trac/ticket/8227
|
||||
|
||||
|
||||
def connectionLost(self, reason):
|
||||
"""
|
||||
Deliver the accumulated response bytes to the waiting L{Deferred}, if
|
||||
the response body has been completely received without error.
|
||||
"""
|
||||
if reason.check(ResponseDone):
|
||||
self.deferred.callback(b''.join(self.dataBuffer))
|
||||
elif reason.check(PotentialDataLoss):
|
||||
self.deferred.errback(
|
||||
PartialDownloadError(self.status, self.message,
|
||||
b''.join(self.dataBuffer)))
|
||||
else:
|
||||
self.deferred.errback(reason)
|
||||
|
||||
|
||||
|
||||
def ppReadBody(response, max_size):
|
||||
"""
|
||||
Get the body of an L{IResponse} and return it as a byte string.
|
||||
|
||||
This is a helper function for clients that don't want to incrementally
|
||||
receive the body of an HTTP response.
|
||||
|
||||
@param response: The HTTP response for which the body will be read.
|
||||
@type response: L{IResponse} provider
|
||||
|
||||
@return: A L{Deferred} which will fire with the body of the response.
|
||||
Cancelling it will close the connection to the server immediately.
|
||||
"""
|
||||
def cancel(deferred):
|
||||
"""
|
||||
Cancel a L{readBody} call, close the connection to the HTTP server
|
||||
immediately, if it is still open.
|
||||
|
||||
@param deferred: The cancelled L{defer.Deferred}.
|
||||
"""
|
||||
abort = getAbort()
|
||||
if abort is not None:
|
||||
abort()
|
||||
|
||||
d = defer.Deferred(cancel)
|
||||
protocol = _PpReadBodyProtocol(response.code, response.phrase, d, max_size=max_size)
|
||||
def getAbort():
|
||||
return getattr(protocol.transport, 'abortConnection', None)
|
||||
|
||||
response.deliverBody(protocol)
|
||||
|
||||
if protocol.transport is not None and getAbort() is None:
|
||||
warnings.warn(
|
||||
'Using readBody with a transport that does not have an '
|
||||
'abortConnection method',
|
||||
category=DeprecationWarning,
|
||||
stacklevel=2)
|
||||
|
||||
def respFailed(fail):
|
||||
if fail.type is ResponseFailed and max_size != IGNORE_SIZE and protocol.buffer_size > max_size:
|
||||
d = defer.Deferred()
|
||||
reactor.callLater(0, d.errback, ResponseIsTooBig('Response is too big', max_size))
|
||||
return d
|
||||
else:
|
||||
return fail
|
||||
|
||||
d.addErrback(respFailed)
|
||||
|
||||
return d
|
||||
|
||||
|
||||
class ResponseIsTooBig(Exception):
|
||||
"""
|
||||
Response is too big
|
||||
|
||||
@ivar max_size: Max length for response in bytes
|
||||
"""
|
||||
def __init__(self, reason, max_size):
|
||||
Exception.__init__(self, reason, max_size)
|
||||
self.max_size = max_size
|
@ -117,7 +117,7 @@ class Feed(object):
|
||||
title = title[0] if title else 'Polite Pol: ' + feed_config['uri'],
|
||||
link=feed_config['uri'],
|
||||
description="Generated by PolitePol.com.\n"+\
|
||||
"Source page url: " + feed_config['uri'],
|
||||
"Source page: " + feed_config['uri'],
|
||||
language="en",
|
||||
)
|
||||
new_post_cnt = self.fill_time(feed_config['id'], items)
|
||||
|
@ -10,7 +10,7 @@ from lxml import etree
|
||||
|
||||
from twisted.web import server, resource
|
||||
from twisted.internet import reactor, endpoints, defer
|
||||
from twisted.web.client import Agent, BrowserLikeRedirectAgent, readBody, PartialDownloadError, HTTPConnectionPool
|
||||
from twisted.web.client import Agent, BrowserLikeRedirectAgent, PartialDownloadError, HTTPConnectionPool
|
||||
from twisted.web.server import NOT_DONE_YET
|
||||
from twisted.web.http_headers import Headers
|
||||
from twisted.web.http import INTERNAL_SERVER_ERROR
|
||||
@ -29,6 +29,7 @@ from scrapy.selector import Selector
|
||||
|
||||
from pol.log import LogHandler
|
||||
from .feed import Feed
|
||||
from .client import ppReadBody, IGNORE_SIZE
|
||||
|
||||
from twisted.logger import Logger
|
||||
|
||||
@ -38,7 +39,7 @@ log = Logger()
|
||||
class Downloader(object):
|
||||
|
||||
def __init__(self, feed, debug, snapshot_dir, stat_tool, memon, request,
|
||||
url, feed_config, selector_defer, sanitize):
|
||||
url, feed_config, selector_defer, sanitize, max_size):
|
||||
self.feed = feed
|
||||
self.debug = debug
|
||||
self.snapshot_dir = snapshot_dir
|
||||
@ -49,6 +50,7 @@ class Downloader(object):
|
||||
self.feed_config=feed_config
|
||||
self.selector_defer = selector_defer
|
||||
self.sanitize = sanitize
|
||||
self.max_size = max_size
|
||||
|
||||
def html2json(self, el):
|
||||
return [
|
||||
@ -188,7 +190,7 @@ class Downloader(object):
|
||||
def downloadStarted(self, response):
|
||||
self.response = response
|
||||
|
||||
d = readBody(response)
|
||||
d = ppReadBody(response, self.max_size)
|
||||
d.addCallback(self.downloadDone)
|
||||
d.addErrback(self.downloadError)
|
||||
return response
|
||||
@ -256,7 +258,7 @@ class Site(resource.Resource):
|
||||
feed_regexp = re.compile(b'^/feed/(\d{1,10})')
|
||||
|
||||
|
||||
def __init__(self, db_creds, snapshot_dir, user_agent, debug=False, limiter=None, memon=None, stat_tool=None, prefetch_dir=None, feed=None, downloadercls=None):
|
||||
def __init__(self, db_creds, snapshot_dir, user_agent, debug=False, limiter=None, memon=None, stat_tool=None, prefetch_dir=None, feed=None, downloadercls=None, max_size=IGNORE_SIZE):
|
||||
self.db_creds = db_creds
|
||||
self.snapshot_dir = snapshot_dir
|
||||
self.user_agent = user_agent
|
||||
@ -267,12 +269,13 @@ class Site(resource.Resource):
|
||||
self.debug = debug
|
||||
self.stat_tool = stat_tool
|
||||
self.memon= memon
|
||||
self.max_size = max_size
|
||||
self.downloadercls = downloadercls or Downloader
|
||||
|
||||
def startRequest(self, request, url, feed_config = None, selector_defer=None, sanitize=False):
|
||||
downloader = self.downloadercls(self.feed, self.debug, self.snapshot_dir, self.stat_tool, self.memon,
|
||||
request=request, url=url, feed_config=feed_config,
|
||||
selector_defer=selector_defer, sanitize=sanitize)
|
||||
selector_defer=selector_defer, sanitize=sanitize, max_size=self.max_size)
|
||||
|
||||
sresponse = self.tryLocalPage(url)
|
||||
if sresponse:
|
||||
@ -348,7 +351,7 @@ class Site(resource.Resource):
|
||||
|
||||
class Server(object):
|
||||
|
||||
def __init__(self, port, db_creds, snapshot_dir, user_agent, debug=False, limiter=None, memon=None, stat_tool=None, prefetch_dir=None, feed=None, sitecls=None, downloadercls=None):
|
||||
def __init__(self, port, db_creds, snapshot_dir, user_agent, debug=False, limiter=None, memon=None, stat_tool=None, prefetch_dir=None, feed=None, sitecls=None, downloadercls=None, max_size=IGNORE_SIZE):
|
||||
self.port = port
|
||||
self.db_creds = db_creds
|
||||
self.snapshot_dir = snapshot_dir
|
||||
@ -364,7 +367,7 @@ class Server(object):
|
||||
if not sitecls:
|
||||
sitecls = Site
|
||||
|
||||
self.site = sitecls(self.db_creds, self.snapshot_dir, self.user_agent, self.debug, self.limiter, self.memon, self.stat_tool, self.prefetch_dir, feed, downloadercls=downloadercls)
|
||||
self.site = sitecls(self.db_creds, self.snapshot_dir, self.user_agent, self.debug, self.limiter, self.memon, self.stat_tool, self.prefetch_dir, feed, downloadercls=downloadercls, max_size=max_size)
|
||||
|
||||
def requestSelector(self, url=None, feed_config=None):
|
||||
d = defer.Deferred()
|
||||
|
Loading…
x
Reference in New Issue
Block a user