"""Provides a custom :class:`~.Thread` to mount and interact with :class:`~.HTTPAdapter`\s.
Multiple :class:`~.Thread`\s are driven by a single :class:`~.Driver`.
Each :class:`~.Thread` is responsible for:
1. Creating and maintaining a :class:`~requests.Session` object.
2. Mounting a custom adapter (i.e. our :class:`~.HTTPAdapter`) to the :class:`~requests.Session`.
3. Extracting cookies from outgoing :class:`~requests.Request`\s into a cookie jar.
4. Optionally evaluating statements embedded in :class:`~requests.Request`\s.
5. Maintaining a :obj:`work_queue` of :class:`~requests.Request`\s or callables to execute.
"""
import logging
import re
import requests
import threading
from copy import copy
from six.moves import urllib_parse as urlparse
from requests.exceptions import RequestException
from requests.packages.urllib3.exceptions import HTTPError
from .adapter import HTTPAdapter
# Convenience imports; for use by evaluated statements
import base64, binascii, datetime, hashlib, json, math, random, time
_LOG = logging.getLogger(__name__)
class InvalidWorkItem(Exception):
pass
[docs]class Thread(threading.Thread):
"""
A worker thread that acts as the middleman between :class:`~.Driver`\s
and :class:`~.HTTPAdapter`\s.
Parameters
------------
work_queue : list
A list of work items for this class to process when :meth:`~.Thread.run` is executed.
Work items *must* be a :class:`requests.Request` or a callable function:
- :class:`~requests.Request`\s are sent and their :class:`~requests.Response`\s
are appended to the :attr:`all_responses` list.
- Functions will be executed with the :class:`~.Thread` instance as the
only parameter (i.e. :obj:`self`). Functions are encouraged inspect the
:class:`~.Thread` state, process any responses, and to modify
:attr:`work_queue` to add or change work as necessary.
race_args : dict
Arguments for this class and its child :class:`~.BaseConnection`\s.
Entries relevant to this class are:
thread_num : int
The thread number associated with this :class:`~.Thread` and its children classes.
shared : dict
A shared dict between this :class:`~.Thread`, its :class:`Driver`, and its children classes.
send_kwargs : dict, optional
Dict of extra arguments to pass to :meth:`requests.Session.send`.
Default is ``{}``.
fake_send : bool, optional
Don't actually send the :class:`~requests.Request`, just create a fake
:class:`~requests.Response` from it. The :class:`~requests.PreparedRequest`
is stored in :attr:`requests.Response.request`. Useful for debugging.
Default is ``False``.
do_eval : bool, optional
Indicates if embedded statements in :class:`~requests.Request`\s should be evaluated.
:class:`~requests.Request`\s are evaluated before being :meth:`requests.Session.prepare`\d.
See :attr:`_eval_attrs`, :attr:`_eval_pattern`, and :attr:`_eval_action` for details.
Default is ``False``.
save_sent_cookies : bool, optional
Cookies manually set in :class:`~requests.Request` :attr:`headers` or :attr:`cookie`
attributes should be extracted and saved in our :class:`~requests.Session`'s cookie jar.
Extraction occurs after embedded statement evaluation is applied.
Default is ``True``.
Attributes
------------
work_queue : list
Local copy of `__init__` parameter.
race_args : dict
Local copy of `__init__` parameter.
thread_num : int
Unpacked value from :attr:`race_args`.
shared : dict
Unpacked value from :attr:`race_args`.
fake_send : bool
Unpacked value from :attr:`race_args`.
do_eval : bool
Unpacked value from :attr:`race_args`.
save_sent_cookies : bool
Unpacked value from :attr:`race_args`.
logger : logging.Logger
A :class:`~logging.Logger` instance for this :class:`~.Thread`.
_eval_attrs : list
The list of :class:`~requests.Request` attributes to apply :attr:`_eval_pattern` to.
_eval_pattern : str
The regex pattern that searches for evaluable code in the prepared request attributes.
Default pattern matches ``<<<statement_goes_here>>>``.
_eval_flags : int
The regex flags to apply to :attr:`_eval_pattern`.
Default is ``re.VERBOSE | re.DOTALL``.
_eval_action : callable
The action to take with the :class:`re.MatchObject` found by :attr:`_eval_pattern`.
Default is converting :class:`re.MatchObject` group 1 to a string, then :func:`eval` it.
session : requests.Session
The session-level storage associated with the most recent :meth:`~.run` call.
The :class:`~requests.Session` is created when :meth:`.run` begins and is closed when it ends.
adapter : pyrace.HTTPAdapter
The adapter responsible for handling our :class:`~requests.Session`'s HTTP requests.
See :ref:`transport adapter <requests:transport-adapters>` documentation for details.
response : requests.Response
The :class:`~requests.Response` object from the most recently executed :class:`~requests.Request`.
all_responses : list
All :class:`~requests.Response` objects from the most recent :meth:`~.run` call, in order.
Raises
--------
InvalidWorkItem
A work item in :attr:`work_queue` wasn't a :class:`~requests.Request` or callable.
"""
_eval_attrs = [
'url',
'headers',
'cookies',
'params',
'data',
'json',
]
_eval_pattern = r'<<< (.*?) >>>'
_eval_flags = re.VERBOSE | re.DOTALL
_eval_action = lambda self, match: str(eval(match.group(1)))
def __init__(self, work_queue, race_args):
"""Creates a :class:`~.Thread` with arguments for itself and its :class:`~.BaseConnection`\s."""
self.work_queue = work_queue
self.race_args = race_args
# Shared parameters
self.thread_num = self.race_args['thread_num']
self.shared = self.race_args['shared']
# Thread exclusive parameters
self.send_kwargs = self.race_args.pop('send_kwargs', {})
self.do_eval = self.race_args.pop('do_eval', False)
self.fake_send = self.race_args.pop('fake_send', False)
self.save_sent_cookies = self.race_args.pop('save_sent_cookies', True)
# Convert a non-list work_queue into a single-element list.
if not isinstance(self.work_queue, list):
self.work_queue = [self.work_queue]
# Fail early if an invalid work item is detected.
# This prevents a multitude of runtime exceptions when the threads run.
self._validate_work_queue(self.work_queue)
self.logger = _LOG.getChild("T{:03d}".format(self.thread_num))
self.session = None
self.adapter = None
self.response = None
self.all_responses = []
super(Thread, self).__init__()
self.daemon = True
def _validate_work_queue(self, work_queue):
"""Ensures that all :attr:`work_queue` items are :class:`~requests.Request`\s or callables."""
for i, work_item in enumerate(work_queue):
if callable(work_item):
pass
elif isinstance(work_item, requests.Request):
pass
else:
raise InvalidWorkItem("Work item {} isn't callable or Request: {}".format(i, work_item))
def _create_session(self):
"""Creates a :class:`~requests.Session` and overrides default headers."""
session = requests.Session()
# Prevent the fall-back "python-requests/version" User-Agent header.
# We still allow the following default headers:
# - Accept-Encoding: gzip, deflate
# - Accept: */*
# - Connection: keep-alive
session.headers['User-Agent'] = None
return session
def _get_adapter(self, race_args):
"""Creates a new instance of our :class:`~.HTTPAdapter` with :attr:`race_args`."""
return HTTPAdapter(race_args = race_args)
def _mount_adapter(self, session, adapter):
"""Mount an :class:`~.HTTPAdapter` to the given :class:`~requests.Session`."""
session.mount("http://", adapter)
session.mount("https://", adapter)
def _prepare_request(self, session, request):
"""Create a :class:`~requests.PreparedRequest` under the current :class:`~requests.Session`."""
return session.prepare_request(request)
def _eval_request_attrs(self, req, attrs):
"""Evaluates statements in the attributes of a :class:`~requests.Request`, returning a new one."""
# Shallow copy suffices.
# `_eval_recursive` creates new objects while it recurses,
# so there's no risk of damaging the original copies.
rtn = copy(req)
for attr in attrs:
if hasattr(req, attr):
prepped_attr = getattr(req, attr)
evaluated_attr = self._eval_recursive(prepped_attr)
if evaluated_attr != prepped_attr:
setattr(rtn, attr, evaluated_attr)
else:
self.logger.warn("{} has no {} attribute to evaluate".format(req, attr))
return rtn
def _eval_recursive(self, thing):
"""Recursively evaluates statements within an object, returning a new object of the same type."""
if isinstance(thing, dict):
return {key: self._eval_recursive(value) for (key, value) in thing.items()}
elif isinstance(thing, list):
return [self._eval_recursive(value) for value in thing]
elif isinstance(thing, tuple):
return (self._eval_recursive(value) for value in thing)
elif isinstance(thing, str):
# The bottom of the recursion stack, actually applies `_eval_action` to the string.
# The matched pattern will be replaced with its evaluated result.
return re.sub(
self._eval_pattern,
self._eval_action,
thing,
flags = self._eval_flags
)
else:
# Unknown type, return it as-is (objects, numerics, None, etc)
return thing
def _extract_cookies(self, req, jar = None):
"""
Extracts cookies from a :class:`~requests.Request` and returns them
in a :class:`~requests.RequestsCookieJar`.
Arguments
-----------
req : requests.Request
The :class:`~requests.Request` that the cookies should be extracted from.
If a Cookie header is present, it will be used.
If not, the `cookie` attribute will be used instead.
jar : requests.RequestsCookieJar, optional
The :class:`~requests.RequestsCookieJar` to add the cookies to.
Only cookies with differing values will be added to prevent domain/path issues.
Default is ``None``, create a new :class:`~requests.RequestsCookieJar`.
"""
if not jar:
cookie_jar = requests.cookies.RequestsCookieJar()
else:
cookie_jar = jar.copy()
# Get the Request's host so we can tie the cookies to it.
# This may be more restrictive than required (i.e. subdomain instead of host)
# but much less likely to send cookies to places they shouldn't go.
domain = getattr(urlparse.urlparse(req.url), 'netloc', None)
cookie_tuples = []
# Cookie header takes precedence over cookie dict.
# This is how the requests library handles it as well.
cookie_header = req.headers.get("Cookie", None)
if cookie_header:
for cookie in cookie_header.split(";"):
if not "=" in cookie:
# Cookies must have a name and a value as per RFC 6265:
# https://stackoverflow.com/a/23393248/477563
self.logger.warn("Cookie header entry '{}' missing a '='".format(cookie))
continue
name, value = [part.strip() for part in cookie.split("=", 1)]
cookie_tuples.append( (name, value) )
elif req.cookies:
for name, value in req.cookies.items():
cookie_tuples.append( (name, value) )
# If a cookie with this name and value exists, don't update it.
# The existing cookie may have more accureate domain/path info than we do.
for name, value in cookie_tuples:
if cookie_jar.get(name, domain = domain) != value:
cookie_jar.set(name, value, domain = domain)
return cookie_jar
def _send_request(self, session, prepared, **send_kwargs):
"""
Sends (or pretends to send) a :class:`~requests.PreparedRequest` under
our :class:`~requests.Session`, returning a :class:`~requests.Response`.
"""
if self.fake_send:
response = requests.Response()
response.request = prepared
else:
response = session.send(prepared, **send_kwargs)
return response
[docs] def run(self):
"""
The work body of a :class:`threading.Thread`.
Processes work items from :attr:`work_queue` in sequential order.
"""
self.session = self._create_session()
self.adapter = self._get_adapter(self.race_args)
self._mount_adapter(self.session, self.adapter)
self.all_responses = []
while self.work_queue:
self.logger.debug("Work queue size: {}".format(len(self.work_queue)))
work_item = self.work_queue.pop(0)
if callable(work_item):
# A callable work item that may do result processing and/or modify `work_queue`.
work_item(self)
self._validate_work_queue(self.work_queue)
continue
# The Request must be prepared under our current Session to use its state.
# This includes using the custom persistent cookies, headers, etc.
# This also gives us an opportunity to modify the data before sending...
request = work_item
# ... like evaluating embedded Python statements in the URL, headers, and data.
if self.do_eval:
request = self._eval_request_attrs(request, self._eval_attrs)
# Cookie header to CookieJar extraction must happen after eval as headers may have changed.
# The requests library doesn't save sent Cookie header or dict entries into the CookieJar.
# If we didn't extract them manually, the first request would have the correct Cookie header,
# but subsequent requests/redirects will only have the response's Set-Cookie values.
# As a note, if a Cookie header and CookieJar are both present, the header is used verbatim.
if self.save_sent_cookies:
self.session.cookies = self._extract_cookies(request, jar = self.session.cookies)
# Tying our Session cookies to the Request.
# This also maps data structures (e.g. files, cookies, data) into real strings.
prepared = self._prepare_request(self.session, request)
try:
# Get a Response for our PreparedRequest
self.response = self._send_request(self.session, prepared, **self.send_kwargs)
except (RequestException, HTTPError) as ex:
# If something HTTP related went wrong, save the exception as the response.
# This will give the Driver something to inspect when the Thread dies.
self.response = ex
self.session.close()
raise ex
finally:
# Responses contain the PreparedRequest that was sent, no need to store it separately.
self.all_responses.append(self.response)
self.logger.debug("Work queue empty, shutting down")
# Closes the Session and all of its adapters and their children.
self.session.close()