Skip to main content

接口测试

接口测试

本章列举了一些直接发起请求的第三方模块,这些模块在 HTTP1.0 时期,非 Js 渲染时代是我们获取网页数据的利器。

现在是HTTP2.0(简称 h2)以及以 VUE 为代表的 Js 渲染页面时代。因此这些模块现在主要用于获取直接的数据,譬如:图片、视频、纯静态的 HTML 网页或 XHR(XMLHttpRequest)类型的请求。

请求的方法与 HTTP 协议对资源的操作对应

(1) GET:请求获取 URL 位置的资源 (2) HEAD:请求获取 URL 位置资源的响应消息报告,即获得该资源的头部信息 (3) POST:请求向 URL 位置的资源后附加新的数据 (4) PUT:请求向 URL 位置存储一个资源,覆盖原 URL 位置的资源 (5) PATCH:请求局部更新 URL 位置的资源,即更改该处资源的部分内容 (6) DELETE:请求删除 URL 位置存储的资源 (特殊)options :向服务器获取一些服务器和客户端能够打交道的参数,但与获取资源并不直接相关

request 模块:仅支持 HTTP1.0 仅同步 是基于 urllib3 的再次封装,因此语法更简单

aiohttp 模块:支持 HTTP2.0 仅异步

httpx 模块:支持 HTTP1.1/2.0 同步异步,双卡双待,是直接发起请求的模块中最全能的模块。

locust 模块:大规模的并发测试,可以实时查看吞吐量、响应时间和错误和/或导出以供以后分析。

requests

requests 是基于 urllib3 开发的一个优雅而简单的 PythonHTTP 库,为人类构建。

requests 模块官方文档

requests 发送请求

import requests
r = requests.get('https://api.github.com/events')
r = requests.post('https://httpbin.org/post', data={'key': 'value'})
r = requests.put('https://httpbin.org/put', data={'key': 'value'})
r = requests.delete('https://httpbin.org/delete')
r = requests.head('https://httpbin.org/get')
r = requests.patch('http://httpbin.org/patch',data ={'key': 'value'})
r = requests.options('https://httpbin.org/get')

requests 异常处理

Requests 库通过 raise_for_status()引发支持六种常用的连接异常

(1) requests.ConnectionError:网络连接错误异常,如 DNS 查询失败、拒绝连接等 (2) requests.HTTPError:HTTP 错误异常 (3) requests.URLRequired:URL 缺失异常 (4) requests.TooManyRedirects:超过最大重定向次数,产生重定向异常 (5) requests.ConnectTimeout:连接远程服务器超时异常 (6) requests.Timeout:请求 URL 超时,产生超时异常(发出请求到获得内容的整个过程的超时异常)

import requests
try:
r=requests.get("https://www.baidu.com/error",timeout=30) #请求一个url链接
r.raise_for_status() #如果状态不是200,引发HTTPError异常
except requests.HTTPError as e:
print(e)# 404 Client Error: Not Found for url: https://www.baidu.com/error

aiohttp

aiohttp 模块官方文档

aiohttp 发起请求

import aiohttp
import asyncio

async def main():
async with aiohttp.ClientSession('http://httpbin.org') as session:
async with session.post('/post', data=b'data'):
pass
async with session.put('/put', data=b'data'):
pass
async with session.get('/get') as resp:
print(resp.status)
print(await resp.text())

asyncio.run(main())

httpx

HTTPX 是 Python 3 的全功能 HTTP 客户端,它提供同步和异步 API,并支持 HTTP / 1.1 和 HTTP / 2。配置代理和请求头也很容易。httpx 还有 web 测试模式、精细化配置代理 IP、Event Hooks、SSL 证书等语法,但是因为 VUE 时代的到来,这些实现都有了新的方案,所以不再过多介绍。

httpx 代码文档

httpxH2 检测

import httpx
import asyncio

'''
检查 HTTP 版本
在客户端上启用 HTTP/2 支持并不一定意味着您的 请求和响应将通过 HTTP/2 传输,因为客户端和服务器都需要支持 HTTP/2。如果连接到仅 支持 HTTP/1.1 客户端将使用标准的 HTTP/1.1 连接。

您可以通过检查来确定使用了哪个版本的 HTTP 协议 响应上的属性。
'''
async def main():
client = httpx.AsyncClient(http2=True)
response = await client.get('https://www.python-httpx.org/')
print(response.http_version) # "HTTP/1.0", "HTTP/1.1", or "HTTP/2".
asyncio.run(main())

httpx 发起请求

import httpx

## HTTP / 1.1 同步请求模式A
r = httpx.post('https://httpbin.org/post', data={'key': 'value'})
r = httpx.put('https://httpbin.org/put', data={'key': 'value'})
r = httpx.delete('https://httpbin.org/delete')
r = httpx.head('https://httpbin.org/get')
r = httpx.options('https://httpbin.org/get')
r = httpx.patch('https://httpbin.org/get')
r = httpx.get('https://www.example.org/')

r # <Response [200 OK]>
r.status_code # 200
r.headers['content-type'] # r.headers['content-type']
r.text #'<!doctype html>...</html>'

## HTTP / 1.1 同步请求模式B
client = httpx.Client()
r = client.post('https://httpbin.org/post', data={'key': 'value'})
r = client.put('https://httpbin.org/put', data={'key': 'value'})
r = client.delete('https://httpbin.org/delete')
r = client.head('https://httpbin.org/get')
r = client.options('https://httpbin.org/get')
r = client.patch('https://httpbin.org/get')
r = client.get('https://www.example.org/')
client.close()


## 异步请求模式A
async def maina():
async with httpx.AsyncClient() as client:
r = await client.post('https://httpbin.org/post', data={'key': 'value'})
r = await client.put('https://httpbin.org/put', data={'key': 'value'})
r = await client.delete('https://httpbin.org/delete')
r = await client.head('https://httpbin.org/get')
r = await client.options('https://httpbin.org/get')
r = await client.patch('https://httpbin.org/get')
r = await client.get('https://www.example.com/')
print(r) #<Response [200 OK]>
asyncio.run(maina())

## 异步请求模式B
async def mainb():
client = httpx.AsyncClient()
r = await client.post('https://httpbin.org/post', data={'key': 'value'})
r = await client.put('https://httpbin.org/put', data={'key': 'value'})
r = await client.delete('https://httpbin.org/delete')
r = await client.head('https://httpbin.org/get')
r = await client.options('https://httpbin.org/get')
r = await client.patch('https://httpbin.org/get')
r = await client.get('https://www.example.com/')
await client.aclose()
print(r) #<Response [200 OK]>
asyncio.run(mainb())

## H2模式下同步与异步请求
client = httpx.Client(http2=True)
client = httpx.AsyncClient(http2=True)

## 配置合并
headers = {'X-Auth': 'from-client'}
params = {'client_id': 'client1'}
with httpx.Client(headers=headers, params=params,http2=True,proxies="http://localhost:8030") as client:
headers = {'X-Custom': 'from-request'}
params = {'request_id': 'request1'}
r = client.get('https://example.com', headers=headers, params=params)

r.request.url # URL('https://example.com?client_id=client1&request_id=request1')
r.request.headers['X-Auth'] # 'from-client'
r.request.headers['X-Custom'] # 'from-request'

httpx 高级用法

如果需要监视大型响应的下载进度,可以使用响应流式处理并检查属性。response.num_bytes_downloaded

import tempfile
import httpx
import rich.progress

with tempfile.NamedTemporaryFile() as download_file:
url = "https://speed.hetzner.de/100MB.bin"
with httpx.stream("GET", url) as response:
total = int(response.headers["Content-Length"])

with rich.progress.Progress(
"[progress.percentage]{task.percentage:>3.0f}%",
rich.progress.BarColumn(bar_width=None),
rich.progress.DownloadColumn(),
rich.progress.TransferSpeedColumn(),
) as progress:
download_task = progress.add_task("Download", total=total)
for chunk in response.iter_bytes():
download_file.write(chunk)
progress.update(download_task, completed=response.num_bytes_downloaded)

locust

locust 需要创建一个locustfile.py的文件,他只是一个普通的 Python 模块,它可以从其他文件或包中导入代码。

import time
from locust import HttpUser, task, between

# 要使文件成为有效的 locustfile,它必须至少包含一个继承自 User 的类。
class QuickstartUser(HttpUser):
# 模拟用户在执行每个任务(见下文)后等待 1 到 5 秒。
wait_time = between(1, 5)

# 用 @task 修饰的方法是 locust 文件的核心。对于每个正在运行的 User,Locust 都会创建一个 greenlet(协程或“微线程”),它将调用这些方法。任务中的代码是按顺序执行的(它只是常规的 Python 代码),因此在收到来自 /hello 的响应之前,不会调用 /world。
@task
def hello_world(self):
self.client.get("/hello")
self.client.get("/world")

# 我们通过使用 @task 装饰两个方法来声明两个任务,其中一个方法被赋予了更高的权重 (3)。当我们的 QuickstartUser 运行时,它将选择声明的任务之一 - 在本例中为 hello_world 或 view_items - 并执行它。任务是随机选择的,但您可以为其赋予不同的权重。上述配置将使 Locust 采摘 view_items 的可能性是 hello_world 的三倍。当任务完成执行后,用户将休眠其指定的等待时间(在本例中为 1 到 5 秒)。然后它将选择一个新任务。
@task(3)
def view_items(self):
for item_id in range(10):
self.client.get(f"/item?id={item_id}", name="/item")
time.sleep(1)
# 只有用 @task 修饰的方法才会被选中,因此你可以用任何你喜欢的方式定义自己的内部帮助程序方法。
def on_start(self):
self.client.post("/login", json={"username":"foo", "password":"bar"})

启动压力测试可以在通过 locust 命令,这将会启动一个可视化操作界面。输入测试时间、并发用户、目标网址等信息即可查看压测结果图表。

如果在服务器上执行,可以使用命令行locust --headless --users 10 --spawn-rate 1 -H http://your-server.com

更多示例可以查看官方文档