Fork me on GitHub

Простой чат на AioHTTP Apr 10, 2017

В повседневной работе я тесно связан с Python 3, но такие его замечательные возможности, как асинхронность asyncio и синтаксический сахар PEP 492 использовать не приходиться. Из асинхронных задач сталкиваюсь только с Celery, но это не совсем та асинхронность, скорее бэкграунд с очередью задач выполняемых воркерами синхронно. Пришло время исправить это и поближе познакомится с асинхронностью в Python 3.5+. Сделаем это на примере простого чата с комнатами.

Постановка задачи

Реализовать простой асинхронный чат с комнатами на WebSocket-ах.

Необходимый функционал:

  • Создать пользователя или авторизоваться под существующим.
  • Создать комнату или подключиться к уже существующей.
  • Иметь возможность видеть всю историю чат комнаты. В том числе подключения и отключения пользователей от комнаты.
  • Администрирование комнаты.

Технологии

Для реализации чата будем использовать Python 3.6 и фреймворк AioHTTP, данные хранить в базе PostgreSQL, текущие сессии в Redis, запросы проксировать через Nginx.

Асинхронный фреймворк

Текущая версия AioHTTP 2.0.6, однако оказалось, что с ней несовместима текущая версия AioHTTP debugtoolbar issue #115. Так что будем использовать предыдущую версию AioHTTP 1.3.5.

Простейшее приложение на AioHTTP выглядит так:

from aiohttp import web

async def handler(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)

app = web.Application()
app.router.add_route('GET', '/', handler)
app.router.add_route('GET', '/{name}', handler)

web.run_app(app)

Как можно заметить, роуты конфигурируются после инициализации приложения. Url аргументы задаются подобно форматированию строк {slug}.

В момент создания приложения можно указать middlewares, передав их в конструктор приложения.

app = web.Application(loop=loop, middlewares=middlewares)

База данных и модели

В качестве базы данных используем PostgreSQL, для работы с которой воспользуемся PeeWee ORM и асинхронным менеджером, PeeWee Async.

Создадим базовую модель с неинициализированной базой. Саму базу инициализируем в момент конфигурирования приложения.

import peewee
import peewee_async


database = peewee_async.PostgresqlDatabase(None)


class BaseModel(peewee.Model):

    """ Base model with db Meta """

    class Meta:
        database = database
import peewee_async
from helpers.models import database

DATABASE = {
    'database': 'aiochat',
    'password': 'supersecret',
    'user': 'aiochat_user',
    'host': 'localhost',
}

# ...

database.init(**DATABASE)
app.database = database
app.database.set_allow_sync(False)
app.objects = peewee_async.Manager(app.database)

Сама идея Менеджера базы данных взята из Django, но в PeeWee Async работает это немного по-другому и с непривычки вызывает недоумение.

High-level API provides a single point for all async ORM calls. Meet the Manager class! The idea of Manager originally comes from Django, but it’s redesigned to meet new asyncio patterns.

# Django manager
user = User.objects.get(username__iexact='Alice')

# Pee Wee manager
user = await objects.get(User, User.username ** 'Alice')

Через менеджер вызываются асинхронные запросы, но через контекст можно провести синхронный запрос, например, создание таблицы.

with objects.allow_sync():
    User.create_table(True)

Шаблоны и пути

В качестве шаблонизатора будем использовать асинхронную jinja2. Конфигурируется через вызов метода setup, в который передается загрузчик шаблонов и список контекстных процессоров.

import jinja2
import aiohttp_jinja2

# ...

jinja_env = aiohttp_jinja2.setup(
    app, loader=jinja2.FileSystemLoader(settings.TEMPLATE_DIR),
    context_processors=[aiohttp_jinja2.request_processor], )

Рендер шаблона изящно спрятан в декоратор для вьюхи.

import aiohttp_jinja2
from aiohttp import web

class Index(web.View):

    @aiohttp_jinja2.template('template_name.html')
    async def get(self):
        # return context for render
        return {'foo': 'boo'}

Routes, как уже было сказано выше, конфигурируются после инициализации приложения. Через функциию app.router.add_route. Несмотря на то, что статику у нас будет отдавать Nginx, задаем её специальным роутом add_static, чтобы в шаблонах иметь возможность подключать статические файлы.

# urls.py
from accounts.urls import routes as accounts_routes
from chat.urls import routes as chat_routes

from views import Index

routes = (
    dict(method='GET', path='/', handler=Index, name='index'),
    * accounts_routes,
    * chat_routes,
)
# app.py
from urls import routes
for route in routes:
    app.router.add_route(**route)
app.router.add_static('/static', settings.STATIC_DIR, name='static')

В шаблоне получаем доступ к роутам, через объект app.

<a href="{{ app.router['index'].url_for() }}">Main page</a>

<link href="{{ app.router.static.url(filename='chat.css') }}" rel="stylesheet">
<script src="{{ app.router.static.url(filename='chat.js') }}"></script>

Сессии

Для работы с сессиями будем использовать aiohttp session. Данная библиотека позволяет на выбор работать с cookies или redis хранилищами. Мне больше нравится redis. Для подключения потребуется библиотека aioredis.

Подключим сессии и редис.

import aioredis

from aiohttp_session import session_middleware
from aiohttp_session.redis_storage import RedisStorage

# ...

redis_pool = await aioredis.create_pool(settings.REDIS_CON, loop=loop)
middlewares = [session_middleware(RedisStorage(redis_pool))]

Чтобы получить текущюю сессию, нужно дождаться ответа из корутины get_session. Мне нравится подход в джанго, когда в каждом объекте Запрос есть прямой доступ к сессии и пользователю. Сделаем подобное в aiohttp с помощью middlewares.

from aiohttp_session import get_session
from accounts.models import User


async def request_user_middleware(app, handler):
    async def middleware(request):
        request.session = await get_session(request)
        request.user = None
        user_id = request.session.get('user')
        if user_id is not None:
            request.user = await request.app.objects.get(User, id=user_id)
        return await handler(request)
    return middleware

Раз уж у нас есть объект пользователя в запросе, создадим декораторы для ограничения доступа авторизованным и анонимным пользователям.

from aiohttp import web
from helpres.tools import redirect

def login_required(func):
    """ Allow only auth users """
    async def wrapped(self, *args, **kwargs):
        if self.request.user is None:
            redirect(self.request, 'login')
        return await func(self, *args, **kwargs)
    return wrapped


def anonymous_required(func):
    """ Allow only anonymous users """
    async def wrapped(self, *args, **kwargs):
        if self.request.user is not None:
            redirect(self.request, 'index')
        return await func(self, *args, **kwargs)
    return wrapped

В данных декораторах используем функцию redirect, она вызывает исключение, которое пораждает редирект. В AioHTTP можно вызвать лобой из web исключений это сделано круче, чем в Django, где есть возможность вернуть только 404, 403 и 400 через исключения Http404, PermissionDenied, SuspiciousOperation соответсвенно, а редирект должен быть отдан явным ответом.

from aiohttp import web

def redirect(request, router_name, *, permanent=False, **kwargs):
    """ Redirect to given URL name """
    url = request.app.router[router_name].url(**kwargs)
    if permanent:
        raise web.HTTPMovedPermanently(url)
    raise web.HTTPFound(url)

WebSockets

В AioHTTP реализованы вебсокеты через web.WebSocketResponse(), работа с ними практически не отличается от обычных вьюх, разве что добавляется асинхронный цикл пока соединение активно.

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.type == web.MsgType.text:
            await ws.send_str("Hello, {}".format(msg.data))
        elif msg.type == web.MsgType.binary:
            await ws.send_bytes(msg.data)
        elif msg.type == web.MsgType.close:
            break

    return ws

Для хранения текущих соединений и рассылки бродкаст сообщений, будем использовать объект app.

ws = web.WebSocketResponse()
await ws.prepare(self.request)
request.app.wslist.append(ws)

# ...

async def broadcast(self, message):
    """ Send messages to all in this room """
    for peer in self.request.app.wslist:
        peer.send_json(message.as_dict())

По умолчанию Nginx не проксирует заголовки Upgrade и Connection, которые используются при переключении на WS запрос.

map $http_upgrade $connection_upgrade {
    default upgrade;
    ''      close;
}

server {
    # ...

    location / {
        proxy_pass          http://localhost:8000;
        proxy_set_header    Host $host;
        proxy_set_header    X-Real-IP $remote_addr;

        # ws support
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
        proxy_read_timeout 1h;
    }
}

Тестирование

Тестировать приложение AioHTTP можно разными способами. Мне нравится подход Unittest. Единственная особенность: нужно добавить метод get_application и объявлять асинхронные тесты, декарируя их через unittest_run_loop.

from aiohttp.test_utils import AioHTTPTestCase, unittest_run_loop
from app import create_app


class AioChatTestCase(AioHTTPTestCase):

    """ Base test case for aiochat """

    async def get_application(self, loop):
        """ Return current app """
        serv_generator, handler, app = await create_app(loop)
        return app


class IndexTestCase(AioChatTestCase):

    """ Testing index app views """

    url_name = 'index'

    def setUp(self):
        super().setUp()
        self.url = self.app.router[self.url_name].url_for()

    @unittest_run_loop
    async def test_url_reversed(self):
        """ Url should be / """
        self.assertEqual(str(self.app.router[self.url_name].url_for()), '/')
        self.assertEqual(str(self.url), '/')

    @unittest_run_loop
    async def test_index(self):
        """ Should get 200 on index page """
        response = await self.client.get('/')
        self.assertEqual(response.status, 200)
        content = await response.text()
        self.assertIn('Simple asyncio chat', content)

Cоздание тестовой базы данных через PeeWee Async оказалось не очень удобно, поэтому тесты будут весьма поверхностные, только чтобы обозначить общий подход к тестированию приложения на AioHTTP.

Функциональная часть

Основная функциональность чата — передача сообщений клиентам в пределах комнаты.

class WebSocket(web.View):

    """ Process WS connections """

    async def broadcast(self, message):
        """ Send messages to all in this room """
        for peer in self.request.app.wslist[self.room.id].values():
            peer.send_json(message.as_dict())

    async def get(self):
        self.room = await get_object_or_404(
            self.request, Room, name=self.request.match_info['slug'].lower())
        user = self.request.user
        app = self.request.app

        # При подключении создаем WebSocketResponse
        ws = web.WebSocketResponse()
        await ws.prepare(self.request)
        if self.room.id not in app.wslist:
            # Создаем пустую комнату если ещё нет
            app.wslist[self.room.id] = {}

        # Сохраняем соединение в объекте app и создаем сервисное сообщение
        app.wslist[self.room.id][user.username] = ws
        message = await app.objects.create(
            Message, room=self.room, user=None, text=f'{user} join chat room')

        # Рассылка всем подключенным клиентам
        await self.broadcast(message)

        # В асинхронном цикле слушаем сообщения от текущего сокета
        async for msg in ws:
            if msg.tp == MsgType.text:
                if msg.data == 'close':
                    await ws.close()
                else:
                    # Сохраняем сообщение в базу и шлем бродкаст
                    message = await app.objects.create(
                        Message, room=self.room, user=user, text=text)
                    await self.broadcast(message)

        # Когда соединение закрывается, удаляем пользователя из сохраненных соединений
        app.wslist.pop(user.username, None)

        # Сервисное сообщение об отключении в бродкаст
        message = await app.objects.create(
            Message, room=self.room, user=None, text=f'{user} left chat room')
        await self.broadcast(message)

        # возвращаем WebSocketResponse
        return ws

В качестве администрирования комнаты, добавим возможность выполнять команды пользователей. Возьмем две простые команды: очистить историю комнаты и удалить пользователя из комнаты.

async def command(self, cmd):
    """ Run chat command """
    app = self.request.app
    if cmd.startswith('/kill'):
        # unconnect user from room
        try:
            target = cmd.split(' ')[1]
            # Найдем пользователя по имени и отключим от чата
            peer = app.wslist[self.room.id][target]
            await self.disconnect(target, peer, silent=True)
        except KeyError:
            pass
    elif cmd == '/clear':
        # Удалим все сообщения из комнаты
        count = await app.objects.execute(
            Message.delete().where(Message.room == self.room))
        # В бродкаст вышлем пользователям команды для очистки истории на клиенте
        for peer in app.wslist[self.room.id].values():
            peer.send_json({'cmd': 'empty'})
    elif cmd == '/help':
        # Вспомогательная команда для отображения справки
        return {'text': dedent('''\
            - /help - display this msg
            - /kill {username} - remove user from room
            - /clear - empty all messages in room
            ''')}
    else:
        return {'text': 'wrong cmd {cmd}'}

В асинхронном цикле будем сравнивать, если сообщение начинается с / значит обрабатывать его как команду.

async for msg in ws:
    # ...
    text = msg.data.strip()
    if text.startswith('/'):
        ans = await self.command(text)
        if ans is not None:
            ws.send_json(ans)
    else:
        message = await app.objects.create(Message, room=self.room, user=user, text=text)
        await self.broadcast(message)

Клиент

Браузерный клиент это простое WebSocket подключение с определенными командами на отправку и отображение сообщений.

var sock = new WebSocket('ws://' + window.location.host + WS_URL);

function showMessage(message) {
    /* Append message to chat area */
    console.log(message);
    var data = jQuery.parseJSON(message.data);
    $messagesContainer.append('<li class="media">' + data.text + '</li>');
    $chatArea.scrollTop($messagesContainer.height());
}

// ...

$('#send').on('submit', function (event) {
    event.preventDefault();
    var $message = $(event.target).find('input[name="text"]');
    sock.send($message.val());
    $message.val('').focus();
});

sock.onopen = function (event) {
    console.log(event);
    console.log('Connection to server started');
};

sock.onclose = function (event) {
    console.log(event);
    if(event.wasClean){
        console.log('Clean connection end');
    } else {
        console.log('Connection broken');
    }
    window.location.assign('/');
};

sock.onerror = function (error) {
    console.log(error);
};

sock.onmessage = showMessage;

Результат и критика

Как выяснилось, асинхронный код в Python 3 практически не отличается от синхронного. Работать с ним легко и весело. WebSocket вызывает отдельный восторг, позволяя отправлять сообщения клиентам со стороны сервера без явного запроса.

AioHTTP позволяет легко реализовать простой асинхронный сервер с WebSocketами, но что-то большое я бы не стал на на нем писать, иначе всё начнет превращаться в сплошную рутину-корутины.

Полный пример чата можно посмотреть в репозитории на github Samael500/aiochat, а при желании поиграться, запустив настроенный Vagrant.

chat

Данный чат создан исключительно в ознакомительных целях, поэтому имеет ряд допущений:

  • Регистрация и авторизация пользователей без паролей и каких-либо подтверждений.
  • Каждый пользователь имеет полные права администрирования комнат.
  • Нет нормальной валидации форм на создание пользователя или комнаты.
  • Команды чата реализованы без валидации формата команды.
  • История комнаты отдается без паджинации вся полностью, при хоть сколь-либо большом числе комнат/пользователей/сообщение, эти запросы будут выполнятmся долго, создавая нагрузку как на сервер, так и на клиент.
  • Подключения вебсокетов хранятся в едином объекте APP, что тоже создает лишние проблеммы при большом числе полдключений и комнат. Рационально в данном случае разпараллеливать комнаты между разными инстансами приложения.