aiohttp框架及其常用模块

[TOC]

简介

aiohttp是一个为Python提供异步HTTP 客户端/服务端编程,基于asyncio的异步库。他的核心功能如下:

  • 同时支持客户端使用(可以理解为request的异步执行方式)和服务端使用。
  • 同时支持服务端WebSockets组件和客户端WebSockets组件,开箱即用呦。
  • web服务器具有中间件,信号组件和可插拔路由的功能。

这个可插拔的路由意思就是说,你可以在代码运行的过程中增加某个接口,或者减少某个接口。

安装方式:

1
pip install aiohttp

客户端例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import aiohttp
import asyncio
import async_timeout

async def fetch(session, url):
with async_timeout.timeout(10):
async with session.get(url) as response:
return await response.text()

async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, 'http://python.org')
print(html)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

服务端例子:

1
2
3
4
5
6
7
8
9
10
11
12
from aiohttp import web

async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)

app = web.Application()
app.router.add_get('/', handle)
app.router.add_get('/{name}', handle)

web.run_app(app)

常见扩展模块

模块名称 描述
aiohttp_session 处理用户会话
aiohttp-session-mongo
aiomysql
aiopg
aioredis
aiohttp_cors 解决跨域问题
aiojobs
aiohttp_jinja2
aiobotocore aws文件存储服务器的异步模块
pytest-aiohttp
aiohttp-swagger3
aioelasticsearch
aiologstash
aiokafka

更多可以去aio-libs github官方仓库查看 https://github.com/aio-libs

服务端使用

aiohttp的服务端程序都是 aiohttp.web.Application实例对象。用于创建信号,连接路由等。

使用下列代码可以创建一个应用程序:

1
2
3
4
5
6
7
8
9
10
11
12
from aiohttp import web

# 视图
async def index(request):
return web.Response(text='Hello Aiohttp!')

app = web.Application()

# 路由
app.router.add_get('/', index)

web.run_app(app, host='127.0.0.1', port=8080)

在浏览器中打开 http://127.0.0.1:8080 即可访问。

创建视图

不罗嗦直接上代码

1
2
3
4
from aiohttp import web

async def index(request):
return web.Response(text='Hello Aiohttp!')

index就是我们创建的展示页,然后我们创建个路由连接到这个展示页上。加上路由就可以用啦。

aiohttp.web提供django风格的基础试图类。

你可以从 View 类中继承,并自定义http请求的处理方法:

1
2
3
4
5
6
7
class MyView(web.View):
async def get(self):
return await get_resp(self.request)

async def post(self):
return await post_resp(self.request)

处理器应是一个协程方法,且只接受self参数,并且返回标准web处理器的响应对象。请求对象可使用View.request中取出。

当然还需要在路由中注册:

1
2
app.router.add_route('*', '/path/to', MyView)

这样/path/to既可使用GET也可使用POST进行请求,不过其他未指定的HTTP方法会抛出405错误。

处理器

处理器对象可以被任意调用,它只接受Request实例对象,并且返回StreamResponse的派生对象实例(如Response):

1
2
def handler(request):
return web.Response()

它还可以是协程方法,这样aiohttp.web会等待处理:

1
2
async def handler(request):
return web.Response()

处理器必须被注册在路由上才能使用:

1
2
app.router.add_get('/', handler)
app.router.add_post('/post', post_handler)

add_route()同样支持使用通配符方法:

1
app.router.add_route('*', '/path', all_handler)

之后可以使用Request.method来获知请求所使用的HTTP方法。

默认情况下,add_get()也会接受HEAD请求,并像使用GET请求一样返回响应头。你也可以禁用它:

1
app.router.add_get('/', handler, allow_head=False)

如果处理器不能被调用,服务器将会返回405。

模板

我们来添加些更有用的页面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@aiohttp_jinja2.template('detail.html')
async def poll(request):
async with request['db'].acquire() as conn:
question_id = request.match_info['question_id']
try:
question, choices = await db.get_question(conn,
question_id)
except db.RecordNotFound as e:
raise web.HTTPNotFound(text=str(e))
return {
'question': question,
'choices': choices
}

编写页面时使用模板是很方便的。我们返回带有页面内容的字典,aiohttp_jinja2.template装饰器会用jinja2模板加载它。

配置文件

  1. 绝大部分服务器都有配置文件。
  2. 一般程序都不会将配置文件作为源码的一部分。
  3. 使用配置文件是公认的好方法,在部署产品时可以预防一些小错误。

配置文件建议:

  1. 将配置信息写在一个文件中。(yaml, json或ini都可以)
  2. 在一个预先设定好的目录中加载,就是有一个默认加载
  3. 拥有能通过命令行来设置配置文件的功能。如: ./run_app –config=/opt/config/app_cfg.json
  4. 对要加载的字典执行严格检测,以确保其数据类型符合预期。可以使用: trafaret, colander 或 JSON schema等库。

构建数据库

准备工作

PostgreSQL数据库为例

数据库架构

使用SQLAlchemy来写数据库架构。我们只要创建两个简单的模块——questionchoice:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import sqlalchemy as sa 

meta = sa.MetaData()

question - sq.Table(
'question', meta,
sa.Column('id', sa.Integer, nullable=False),
sa.Column('question_text', sa.String(200), nullable=False),
sa.Column('pub_date', sa.Date, nullable=False),
# Indexes #
sa.PrimaryKeyConstraint('id', name='question_id_pkey')
)

choice = sa.Table(
'choice', meta,
sa.Column('id', sa.Integer, nullable=False),
sa.Column('question_id', sa.Integer, nullable=False),
sa.Column('choice_text', sa.String(200), nullable=False),
sa.Column('votes', server_default="0", nullable=False),
# Indexes #
sa.PrimayKeyConstraint('id', name='choice_id_pkey'),
sa.ForeignKeyContraint(['question_id'], [question.c.id],
name='choice_question_id_fkey',
ondelete='CASCADE'),
)

你会看到如下数据库结构:

第一张表 question: |question| |id| |question_text| |pub_date|

第二张表 choice: |choice| |id| |choice_text| |votes| |question_id|

创建连接引擎

为了从数据库中查询数据我们需要一个引擎实例对象。假设conf变量是一个带有连接信息的配置字典,Postgres会使用异步的方式完成该操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
async def init_pg(app):
conf = app['config']
engine = await aiopg.sa.create_engine(
database=conf['database'],
user=conf['user'],
password=conf['password'],
host=conf['host'],
port=conf['host'],
minsize=conf['minsize'],
maxsize=conf['maxsize'])

app['db'] = engine

静态文件

每个web站点都有一些静态文件: 图片啦,JavaScript,CSS文件啦等等。 在生产环境中处理这些静态文件最好的方法是使用NGINX或CDN服务做反向代理。 但在开发环境中使用aiohttp服务器处理静态文件是很方便的。

只需要简单的调用一个信号即可:

1
2
3
4
app.router.add_static('/static/',
path=str(project_root/ 'static'),
name='static')

project_root表示项目根目录。

中间件

中间件是每个web处理器必不可少的组件。它的作用是在处理器处理请求前预处理请求以及在得到响应后发送出去。

aiohttp.web提供一个强有力的中间件组件来编写自定义请求处理器。

中间件是一个协程程序帮助修正请求和响应。下面这个例子是在响应中添加’wink’字符串:

1
2
3
4
5
6
7
8
from aiohttp.web import middleware

@middleware
async def middleware(request, handler):
resp = await handler(request)
resp.text = resp.text + ' wink'
return resp

WebSockets

aiohttp.web直接提供WebSockets支持。

在处理器中创建一个WebSocketResponse对象即可设置WebSocket,之后即可进行通信:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async def websocket_handler(request):

ws = web.WebSocketResponse()
await ws.prepare(request)

async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
else:
await ws.send_str(msg.data + '/answer')
elif msg.type == aiohttp.WSMsgType.ERROR:
print('ws connection closed with exception %s' %
ws.exception())

print('websocket connection closed')

return ws

websockets处理器需要用HTTP GET方法注册:

1
2
app.router.add_get('/ws', websocket_handler)

WebSocket中读取数据(await ws.receive())必须在请求处理器内部完成,不过写数据(ws.send_str(...)),关闭(await ws.close())和取消操作可以在其他任务中完成。

aiohttp.web 隐式地使用asyncio.Task处理每个请求。

异常

aiohttp.web定义了所有HTTP状态码的异常。

每个异常都是HTTPException的子类和某个HTTP状态码。 同样还都是Response的子类,所以就允许你在请求处理器中返回或抛出它们。 请看下面这些代码:

1
2
3
async def handler(request):
return aiohttp.web.HTTPFound('/redirect')

1
2
3
async def handler(request):
raise aiohttp.web.HTTPFound('/redirect')

每个异常的状态码是根据RFC 2068规定来确定的: 100-300不是由错误引起的; 400之后是客户端错误,500之后是服务器端错误。

异常等级图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
HTTPException
HTTPSuccessful
* 200 - HTTPOk
* 201 - HTTPCreated
* 202 - HTTPAccepted
* 203 - HTTPNonAuthoritativeInformation
* 204 - HTTPNoContent
* 205 - HTTPResetContent
* 206 - HTTPPartialContent
HTTPRedirection
* 300 - HTTPMultipleChoices
* 301 - HTTPMovedPermanently
* 302 - HTTPFound
* 303 - HTTPSeeOther
* 304 - HTTPNotModified
* 305 - HTTPUseProxy
* 307 - HTTPTemporaryRedirect
* 308 - HTTPPermanentRedirect
HTTPError
HTTPClientError
* 400 - HTTPBadRequest
* 401 - HTTPUnauthorized
* 402 - HTTPPaymentRequired
* 403 - HTTPForbidden
* 404 - HTTPNotFound
* 405 - HTTPMethodNotAllowed
* 406 - HTTPNotAcceptable
* 407 - HTTPProxyAuthenticationRequired
* 408 - HTTPRequestTimeout
* 409 - HTTPConflict
* 410 - HTTPGone
* 411 - HTTPLengthRequired
* 412 - HTTPPreconditionFailed
* 413 - HTTPRequestEntityTooLarge
* 414 - HTTPRequestURITooLong
* 415 - HTTPUnsupportedMediaType
* 416 - HTTPRequestRangeNotSatisfiable
* 417 - HTTPExpectationFailed
* 421 - HTTPMisdirectedRequest
* 422 - HTTPUnprocessableEntity
* 424 - HTTPFailedDependency
* 426 - HTTPUpgradeRequired
* 428 - HTTPPreconditionRequired
* 429 - HTTPTooManyRequests
* 431 - HTTPRequestHeaderFieldsTooLarge
* 451 - HTTPUnavailableForLegalReasons
HTTPServerError
* 500 - HTTPInternalServerError
* 501 - HTTPNotImplemented
* 502 - HTTPBadGateway
* 503 - HTTPServiceUnavailable
* 504 - HTTPGatewayTimeout
* 505 - HTTPVersionNotSupported
* 506 - HTTPVariantAlsoNegotiates
* 507 - HTTPInsufficientStorage
* 510 - HTTPNotExtended
* 511 - HTTPNetworkAuthenticationRequired

所有的异常都拥有相同的结构:

1
2
3
HTTPNotFound(*, headers=None, reason=None,
body=None, text=None, content_type=None)

如果没有指定headers,默认是响应中的headers。 其中HTTPMultipleChoices, HTTPMovedPermanently, HTTPFound, HTTPSeeOther, HTTPUseProxy, HTTPTemporaryRedirect的结构是下面这样的:

1
2
3
HTTPFound(location, *, headers=None, reason=None,
body=None, text=None, content_type=None)

location参数的值会写入到HTTP头部的Location中。

HTTPMethodNotAllowed的结构是这样的:

1
2
3
4
HTTPMethodNotAllowed(method, allowed_methods, *,
headers=None, reason=None,
body=None, text=None, content_type=None)

method是不支持的那个方法,allowed_methods是所支持的方法。

数据共享

将类全局变量存储到Application实例对象中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async def create_app(branch):
"""Create and return aiohttp webserver app."""
app = web.Application()

CONFIG_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "conf", f"ailawyer-{branch}.json")
with open(CONFIG_PATH, "r") as f:
CONFIG_DICT = json.loads(f.read())
print(CONFIG_DICT)

app['db'] = init_mongo(CONFIG_DICT)
app['redis_conn'] = await init_redis(CONFIG_DICT, app['db'])
app['alipay'] = init_alipay(CONFIG_DICT)
app['domain'] = CONFIG_DICT["alipay"]["domain"]
app['msg_sender'] = init_tencent_msg()
app['email_sender'] = init_tencent_email()
app['minio'] = connection.MinioAPI(CONFIG_DICT["minio"])

return app

之后就可以在web处理器中获取出来:

1
2
3
async def handler(request):
redis_conn = request.app['redis_conn']
...

如果变量的生命周期是一次请求,可以在请求中存储。

1
2
3
async def handler(request):
request['my_private_key'] = "data"
...

关闭退出

可能会有一些websockets或流,在服务器关闭时这些连接还是打开状态。 aiohttp没有内置如何进行关闭,但可以使用Applicaiton.on_shutdown信号来完善这一功能。 下面的代码是关闭websocket处理器的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
app = web.Application()
app['websockets'] = []

async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)

request.app['websockets'].append(ws)
try:
async for msg in ws:
...
finally:
request.app['websockets'].remove(ws)

return ws

日志

aiohttp使用标准库logging追踪库活动。

aiohttp中有以下日志记录器(以名字排序):

  • ‘aiohttp.access’
  • ‘aiohttp.client’
  • ‘aiohttp.internal’
  • ‘aiohttp.server’
  • ‘aiohttp.web’
  • ‘aiohttp.websocket’

你可以追踪这些记录器来查看日志信息。

默认情况下访问日志是开启的使用的是’aiohttp.access’记录器。
可以调用aiohttp.web.Applicaiton.make_handler()来控制日志。
将logging.Logger实例通过access_log参数传递即可覆盖默认记录器。

测试

aiohttp有一个pytest插件(pytest-aiohttp)可以轻松构建web服务器测试程序,同时该插件还有一个用于测试其他框架(单元测试等)的测试框架包。

服务器部署

常见的可以有两种

  • nginx + supervisord
  • nginx + gunicorn

将aiohttp服务器组运行在nginx之后有好多好处:

  • nginx是个很好的前端服务器。它可以预防很多攻击如格式错误的http协议的攻击
  • 部署nginx后可以同时运行多个aiohttp实例,这样可以有效利用CPU
  • nginx提供的静态文件服务器要比aiohttp内置的静态文件支持快很多

Nginx + supervisord

配置Nginx

下面是一份简短的配置Nginx参考,并没涉及到所有的Nginx选项。

你可以阅读Nginx指南官方文档来找到所有的参考。

好啦,首先我们要配置HTTP服务器本身:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
http {
upstream aiohttp {
server 127.0.0.1:8081 fail_timeout=0;
server 127.0.0.1:8082 fail_timeout=0;
server 127.0.0.1:8083 fail_timeout=0;
server 127.0.0.1:8084 fail_timeout=0;
}

server {
listen 80;
client_max_body_size 4G;

server_name example.com;

location / {
proxy_set_header Host $http_host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_redirect off;
proxy_buffering off;
proxy_pass http://aiohttp;
}

location /static {
# path for static files
root /path/to/app/static;
}
}
}

这样配置之后会监听80端口,服务器名为example.com,所有的请求都会被重定向到aiohttp后端处理组。默认情况下,Nginx使用轮询调度算法(round-robin)来选择后端服务器。

Supervisord

配置完Nginx,我们要开始配置aiohttp后端服务器了。使用些工具可以在系统重启或后端出现错误时更快地自动启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[program:aiohttp]
numprocs = 4
numprocs_start = 1
process_name = example_%(process_num)s

; Unix socket paths are specified by command line.
command=/path/to/aiohttp_example.py --path=/tmp/example_%(process_num)s.sock

; We can just as easily pass TCP port numbers:
; command=/path/to/aiohttp_example.py --port=808%(process_num)s

user=nobody
autostart=true
autorestart=true

aiohtto服务器

最后我们要让aiohttp服务器在supervisord上工作。
假设我们已经正确配置aiohttp.web.Application,端口也被正确指定,这些工作挺烦的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# aiohttp_example.py
import argparse
from aiohttp import web

parser = argparse.ArgumentParser(description="aiohttp server example")
parser.add_argument('--path')
parser.add_argument('--port')


if __name__ == '__main__':
app = web.Application()
# configure app

args = parser.parse_args()
web.run_app(app, path=args.path, port=args.port)

当然在真实环境中我们还要做些其他事情,比如配置日志等等。

Nginx + Gunicorn

我们还可以使用Gunicorn来部署aiohttp,Gunicorn基于pre-fork worker模式。Gunicorn将你的app当做worker进程来处理即将到来的请求。
与部署Ngnix相反,使用Gunicorn不需要我们手动启动aiohttp进程,也不需要使用如supervisord之类的工具进行监控。

应用程序

我们写一个简单的应用程序,将其命名为 my_app_module.py:

1
2
3
4
5
6
7
8
9
from aiohttp import web

def index(request):
return web.Response(text="Welcome home!")


my_web_app = web.Application()
my_web_app.router.add_get('/', index)

启动Gunicorn

启动Gunicorn时我们要将模块名字(如my_app_module)和应用程序的名字(如my_web_app)传入,可以一起在配置Gunicorn其他选项时写入也可以写在配置文件中。
本章例子所使用到的选项:

  • -bind 用于设置服务器套接字地址。
  • -worker-class 表示使用我们自定义的worker代替Gunicorn的默认worker。
1
2
3
4
>> gunicorn my_app_module:my_web_app --bind localhost:8080 --worker-class aiohttp.GunicornWebWorker
... Starting gunicorn 19.3.0
... Listening at: http://127.0.0.1:8080

现在,Gunicorn已成功运行,随时可以将请求交由应用程序的worker处理。

参考: