Skip to content

Module neuroio.listeners

None

None

View Source
import asyncio

import json

from typing import Awaitable, Callable, Dict, List, Optional, Union

from websockets.exceptions import ConnectionClosed

from websockets.legacy.client import WebSocketClientProtocol, connect

from neuroio import constants

from neuroio.utils import repeat

class EventListener:

    def __init__(

        self,

        api_token: str,

        event_handler_func: Callable[[str], Awaitable[None]],

        timeout: Optional[float] = constants.HTTP_CLIENT_TIMEOUT,

    ) -> None:

        """

        Creates and manages single WebSocket Client object, that is used to

        send & receive messages in Events service.

        """

        self.api_token = api_token

        self.event_handler_func = event_handler_func

        self.timeout = timeout

        self.websocket: Optional[WebSocketClientProtocol] = None

    async def listen(self) -> None:

        async for websocket in connect(

            constants.EVENTS_BASE_URL, open_timeout=self.timeout

        ):

            try:

                self.websocket = websocket

                await self.authorize()

                asyncio.create_task(repeat(5, self.ping))

                async for message in websocket:

                    if not isinstance(message, bytes):

                        await self.event_handler_func(message)

            except ConnectionClosed:

                self.websocket = None

                continue

    async def send_json(self, data: Union[List, Dict]) -> None:

        if self.websocket is not None:

            await self.websocket.send(json.dumps(data))

    async def authorize(self) -> None:

        await self.send_json(

            {"action": "AUTH", "data": {"token": self.api_token}}

        )

    async def ping(self) -> None:

        await self.send_json({"action": "PING"})

Classes

EventListener

class EventListener(
    api_token: str,
    event_handler_func: Callable[[str], Awaitable[NoneType]],
    timeout: Optional[float] = 4.0
)
View Source
class EventListener:

    def __init__(

        self,

        api_token: str,

        event_handler_func: Callable[[str], Awaitable[None]],

        timeout: Optional[float] = constants.HTTP_CLIENT_TIMEOUT,

    ) -> None:

        """

        Creates and manages single WebSocket Client object, that is used to

        send & receive messages in Events service.

        """

        self.api_token = api_token

        self.event_handler_func = event_handler_func

        self.timeout = timeout

        self.websocket: Optional[WebSocketClientProtocol] = None

    async def listen(self) -> None:

        async for websocket in connect(

            constants.EVENTS_BASE_URL, open_timeout=self.timeout

        ):

            try:

                self.websocket = websocket

                await self.authorize()

                asyncio.create_task(repeat(5, self.ping))

                async for message in websocket:

                    if not isinstance(message, bytes):

                        await self.event_handler_func(message)

            except ConnectionClosed:

                self.websocket = None

                continue

    async def send_json(self, data: Union[List, Dict]) -> None:

        if self.websocket is not None:

            await self.websocket.send(json.dumps(data))

    async def authorize(self) -> None:

        await self.send_json(

            {"action": "AUTH", "data": {"token": self.api_token}}

        )

    async def ping(self) -> None:

        await self.send_json({"action": "PING"})

Methods

authorize

def authorize(
    self
) -> None
View Source
    async def authorize(self) -> None:

        await self.send_json(

            {"action": "AUTH", "data": {"token": self.api_token}}

        )

listen

def listen(
    self
) -> None
View Source
    async def listen(self) -> None:

        async for websocket in connect(

            constants.EVENTS_BASE_URL, open_timeout=self.timeout

        ):

            try:

                self.websocket = websocket

                await self.authorize()

                asyncio.create_task(repeat(5, self.ping))

                async for message in websocket:

                    if not isinstance(message, bytes):

                        await self.event_handler_func(message)

            except ConnectionClosed:

                self.websocket = None

                continue

ping

def ping(
    self
) -> None
View Source
    async def ping(self) -> None:

        await self.send_json({"action": "PING"})

send_json

def send_json(
    self,
    data: Union[List, Dict]
) -> None
View Source
    async def send_json(self, data: Union[List, Dict]) -> None:

        if self.websocket is not None:

            await self.websocket.send(json.dumps(data))