Github - grequests
requests 是 Python发送接口请求非常好用的一个三方库,但是requests发送请求是串行的,即阻塞的。发送完一条请求才能发送另一条请求。
为了提升测试效率,一般需要并行发送请求。这里可以使用多线程,或者协程,gevent或者aiohttp,然而使用起来,都相对麻烦。
grequests 是基于gevent+requests
的一个并发发送请求的库,使用起来非常简单。
安装:
pip install grequests
1. grequests 使用
示例一:
import grequests
urls = [
'http://www.heroku.com',
'http://python-tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://fakedomain/',
'http://kennethreitz.com'
]
reps = [grequests.get(u) for u in urls] #请求列表
reps_list = grequests.map(reps) #响应列表
print(reps_list)
#[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, None, <Response [200]>]
grequests支持get、post、put、delete等requests支持的HTTP请求方法,使用参数和requests一致.
示例二:
import grequests
#请求列表
req_list = [
grequests.get('http://httpbin.org/get?a=1&b=2'),
grequests.post('http://httpbin.org/post', data={'a':1,'b':2}),
grequests.put('http://httpbin.org/post', json={'a': 1, 'b': 2}),
]
#并行发送,等最后一个运行完后返回
res_list = grequests.map(req_list)
#打印请求的响应文本,得到所有请求的返回结果
for i, rep in enumerate(rep_list):
print('{} - {}'.format(i, rep.text))
2. grequests 异常处理
在批量发送请求时难免遇到某个请求url无法访问或超时等异常,grequests.map() 方法支持自定义异常处理函数.
示例如,
import grequests
def exception_handler(request, exception):
print("Request failed")
reqs = [
grequests.get('http://httpbin.org/delay/1', timeout=0.001),#超时异常
grequests.get('http://fakedomain/'),#域名不存在
grequests.get('http://httpbin.org/status/500')] #正常返回500
req_list = grequests.map(reqs, exception_handler=exception_handler)
print(req_list)
'''
Request failed
Request failed
[None, None, <Response [500]>]
'''
3. grequests和requests性能对比
示例如,
import requests
import grequests
import time
start = time.time()
res_list = [requests.get('https://github.com') for i in range(100)]
print("[INFO]timecost of requests: ", time.time()-start)
start = time.time()
req_list = [grequests.get('https://github.com') for i in range(100)]
res_list = grequests.map(req_list)
print("[INFO]timecost of grequests: ", time.time()-start)
4. grequests 源码
Github - grequests.py
# -*- coding: utf-8 -*-
"""
This module contains an asynchronous replica of ``requests.api``, powered
by gevent. All API methods return a ``Request`` instance (as opposed to
``Response``). A list of requests can be sent with ``map()``.
"""
from functools import partial
import traceback
try:
import gevent
from gevent import monkey as curious_george
from gevent.pool import Pool
except ImportError:
raise RuntimeError('Gevent is required for grequests.')
# Monkey-patch.
curious_george.patch_all(thread=False, select=False)
from requests import Session
__all__ = (
'map', 'imap',
'get', 'options', 'head', 'post', 'put', 'patch', 'delete', 'request'
)
class AsyncRequest(object):
""" Asynchronous request.
Accept same parameters as ``Session.request`` and some additional:
:param session: Session which will do request
:param callback: Callback called on response.
Same as passing ``hooks={'response': callback}``
"""
def __init__(self, method, url, **kwargs):
#: Request method
self.method = method
#: URL to request
self.url = url
#: Associated ``Session``
self.session = kwargs.pop('session', None)
if self.session is None:
self.session = Session()
self._close = True
else:
self._close = False # don't close adapters after each request if the user provided the session
callback = kwargs.pop('callback', None)
if callback:
kwargs['hooks'] = {'response': callback}
#: The rest arguments for ``Session.request``
self.kwargs = kwargs
#: Resulting ``Response``
self.response = None
def send(self, **kwargs):
"""
Prepares request based on parameter passed to constructor and optional ``kwargs```.
Then sends request and saves response to :attr:`response`
:returns: ``Response``
"""
merged_kwargs = {}
merged_kwargs.update(self.kwargs)
merged_kwargs.update(kwargs)
try:
self.response = self.session.request(self.method,
self.url, **merged_kwargs)
except Exception as e:
self.exception = e
self.traceback = traceback.format_exc()
finally:
if self._close:
# if we provided the session object, make sure we're cleaning up
# because there's no sense in keeping it open at this point if it wont be reused
self.session.close()
return self
def send(r, pool=None, stream=False):
"""Sends the request object using the specified pool. If a pool isn't
specified this method blocks. Pools are useful because you can specify size
and can hence limit concurrency."""
if pool is not None:
return pool.spawn(r.send, stream=stream)
return gevent.spawn(r.send, stream=stream)
# Shortcuts for creating AsyncRequest with appropriate HTTP method
get = partial(AsyncRequest, 'GET')
options = partial(AsyncRequest, 'OPTIONS')
head = partial(AsyncRequest, 'HEAD')
post = partial(AsyncRequest, 'POST')
put = partial(AsyncRequest, 'PUT')
patch = partial(AsyncRequest, 'PATCH')
delete = partial(AsyncRequest, 'DELETE')
# synonym
def request(method, url, **kwargs):
return AsyncRequest(method, url, **kwargs)
def map(requests, stream=False, size=None, exception_handler=None, gtimeout=None):
"""Concurrently converts a list of Requests to Responses.
:param requests: a collection of Request objects.
:param stream: If True, the content will not be downloaded immediately.
:param size: Specifies the number of requests to make at a time. If None, no throttling occurs.
:param exception_handler: Callback function, called when exception occured. Params: Request, Exception
:param gtimeout: Gevent joinall timeout in seconds. (Note: unrelated to requests timeout)
"""
requests = list(requests)
pool = Pool(size) if size else None
jobs = [send(r, pool, stream=stream) for r in requests]
gevent.joinall(jobs, timeout=gtimeout)
ret = []
for request in requests:
if request.response is not None:
ret.append(request.response)
elif exception_handler and hasattr(request, 'exception'):
ret.append(exception_handler(request, request.exception))
elif exception_handler and not hasattr(request, 'exception'):
ret.append(exception_handler(request, None))
else:
ret.append(None)
return ret
def imap(requests, stream=False, size=2, exception_handler=None):
"""Concurrently converts a generator object of Requests to
a generator of Responses.
:param requests: a generator of Request objects.
:param stream: If True, the content will not be downloaded immediately.
:param size: Specifies the number of requests to make at a time. default is 2
:param exception_handler: Callback function, called when exception occurred. Params: Request, Exception
"""
pool = Pool(size)
def send(r):
return r.send(stream=stream)
for request in pool.imap_unordered(send, requests):
if request.response is not None:
yield request.response
elif exception_handler:
ex_result = exception_handler(request, request.exception)
if ex_result is not None:
yield ex_result
pool.join()
grequests 测试代码:Github - grequests/tests.py