接口测试
httpx 模块:支持 HTTP1.1/2.0 同步异步,双卡双待,是直接发起请求的模块中最全能的模块。
locust 模块:大规模的并发测试,可以实时查看吞吐量、响应时间和错误和/或导出以供以后分析。
httpx
HTTPX 是 Python 3 的全功能 HTTP 客户端,它提供同步和异步 API,并支持 HTTP / 1.1 和 HTTP / 2。配置代理和请求头也很容易。httpx 还有 web 测试模式、精细化配置代理 IP、Event Hooks、SSL 证书等语法,但是因为 VUE 时代的到来,这些实现都有了新的方案,所以不再过多介绍。
httpxH2 检测
import httpx
import asyncio
'''
检查 HTTP 版本
在客户端上启用 HTTP/2 支持并不一定意味着您的 请求和响应将通过 HTTP/2 传输,因为客户端和服务器都需要支持 HTTP/2。如果连接到仅 支持 HTTP/1.1 客户端将使用标准的 HTTP/1.1 连接。
您可以通过检查来确定使用了哪个版本的 HTTP 协议 响应上的属性。
'''
async def main():
client = httpx.AsyncClient(http2=True)
response = await client.get('https://www.python-httpx.org/')
print(response.http_version) # "HTTP/1.0", "HTTP/1.1", or "HTTP/2".
asyncio.run(main())
httpx 发起请求
import httpx
## HTTP / 1.1 同步请求模式A
r = httpx.post('https://httpbin.org/post', data={'key': 'value'})
r = httpx.put('https://httpbin.org/put', data={'key': 'value'})
r = httpx.delete('https://httpbin.org/delete')
r = httpx.head('https://httpbin.org/get')
r = httpx.options('https://httpbin.org/get')
r = httpx.patch('https://httpbin.org/get')
r = httpx.get('https://www.example.org/')
r # <Response [200 OK]>
r.status_code # 200
r.headers['content-type'] # r.headers['content-type']
r.text #'<!doctype html>...</html>'
## HTTP / 1.1 同步请求模式B
client = httpx.Client()
r = client.post('https://httpbin.org/post', data={'key': 'value'})
r = client.put('https://httpbin.org/put', data={'key': 'value'})
r = client.delete('https://httpbin.org/delete')
r = client.head('https://httpbin.org/get')
r = client.options('https://httpbin.org/get')
r = client.patch('https://httpbin.org/get')
r = client.get('https://www.example.org/')
client.close()
## 异步请求模式A
async def maina():
async with httpx.AsyncClient() as client:
r = await client.post('https://httpbin.org/post', data={'key': 'value'})
r = await client.put('https://httpbin.org/put', data={'key': 'value'})
r = await client.delete('https://httpbin.org/delete')
r = await client.head('https://httpbin.org/get')
r = await client.options('https://httpbin.org/get')
r = await client.patch('https://httpbin.org/get')
r = await client.get('https://www.example.com/')
print(r) #<Response [200 OK]>
asyncio.run(maina())
## 异步请求模式B
async def mainb():
client = httpx.AsyncClient()
r = await client.post('https://httpbin.org/post', data={'key': 'value'})
r = await client.put('https://httpbin.org/put', data={'key': 'value'})
r = await client.delete('https://httpbin.org/delete')
r = await client.head('https://httpbin.org/get')
r = await client.options('https://httpbin.org/get')
r = await client.patch('https://httpbin.org/get')
r = await client.get('https://www.example.com/')
await client.aclose()
print(r) #<Response [200 OK]>
asyncio.run(mainb())
## H2模式下同步与异步请求
client = httpx.Client(http2=True)
client = httpx.AsyncClient(http2=True)
## 配置合并
headers = {'X-Auth': 'from-client'}
params = {'client_id': 'client1'}
with httpx.Client(headers=headers, params=params,http2=True,proxies="http://localhost:8030") as client:
headers = {'X-Custom': 'from-request'}
params = {'request_id': 'request1'}
r = client.get('https://example.com', headers=headers, params=params)
r.request.url # URL('https://example.com?client_id=client1&request_id=request1')
r.request.headers['X-Auth'] # 'from-client'
r.request.headers['X-Custom'] # 'from-request'
httpx 高级用法
如果需要监视大型响应的下载进度,可以使用响应流式处理并检查属性。response.num_bytes_downloaded
import tempfile
import httpx
import rich.progress
with tempfile.NamedTemporaryFile() as download_file:
url = "https://speed.hetzner.de/100MB.bin"
with httpx.stream("GET", url) as response:
total = int(response.headers["Content-Length"])
with rich.progress.Progress(
"[progress.percentage]{task.percentage:>3.0f}%",
rich.progress.BarColumn(bar_width=None),
rich.progress.DownloadColumn(),
rich.progress.TransferSpeedColumn(),
) as progress:
download_task = progress.add_task("Download", total=total)
for chunk in response.iter_bytes():
download_file.write(chunk)
progress.update(download_task, completed=response.num_bytes_downloaded)
locust
locust 需要创建一个locustfile.py的文件,他只是一个普通的 Python 模块,它可以从其他文件或包中导入代码。
import time
from locust import HttpUser, task, between
# 要使文件成为有效的 locustfile,它必须至少包含一个继承自 User 的类。
class QuickstartUser(HttpUser):
# 模拟用户在执行每个任务(见下文)后等待 1 到 5 秒。
wait_time = between(1, 5)
# 用 @task 修饰的方法是 locust 文件的核心。对于每个正在运行的 User,Locust 都会创建一个 greenlet(协程或“微线程”),它将调用这些方法。任务中的代码是按顺序执行的(它只是常规的 Python 代码),因此在收到来自 /hello 的响应之前,不会调用 /world。
@task
def hello_world(self):
self.client.get("/hello")
self.client.get("/world")
# 我们通过使用 @task 装饰两个 方法来声明两个任务,其中一个方法被赋予了更高的权重 (3)。当我们的 QuickstartUser 运行时,它将选择声明的任务之一 - 在本例中为 hello_world 或 view_items - 并执行它。任务是随机选择的,但您可以为其赋予不同的权重。上述配置将使 Locust 采摘 view_items 的可能性是 hello_world 的三倍。当任务完成执行后,用户将休眠其指定的等待时间(在本例中为 1 到 5 秒)。然后它将选择一个新任务。
@task(3)
def view_items(self):
for item_id in range(10):
self.client.get(f"/item?id={item_id}", name="/item")
time.sleep(1)
# 只有用 @task 修饰的方法才会被选中,因此你可以用任何你喜欢的方式定义自己的内部帮助程序方法。
def on_start(self):
self.client.post("/login", json={"username":"foo", "password":"bar"})
启动压力测试可以在通过 locust 命令,这将会启动一个可视化操作界面。输入测试时间、并发用户、目标网址等信息即可查看压测结果图表。
如果在服务器上执行,可以使用命令行locust --headless --users 10 --spawn-rate 1 -H http://your-server.com
更多示例可以查看官方文档