Skip to content

Module neuroio

neuroio-python

A Python package for interacting with the NeuroIO API

View Source
"""**neuroio-python**

A Python package for interacting with the NeuroIO API

"""

from .clients import AsyncClient, Client

from .listeners import EventListener

__version__: str = "0.1.0"

__all__ = ["__version__", "Client", "AsyncClient", "EventListener"]

Sub-modules

Variables

__version__

Classes

AsyncClient

class AsyncClient(
    api_token: Optional[str] = None,
    api_version: int = 1,
    timeout: float = 4.0
)
View Source
class AsyncClient(Client):

    pass

Ancestors (in MRO)

  • neuroio.Client

Instance variables

common_headers
is_async

Methods

client_settings

def client_settings(
    self,
    timeout: float,
    token: str = None
) -> Dict[Any, Any]
View Source
    def client_settings(

        self, timeout: float, token: str = None

    ) -> Dict[Any, Any]:

        settings = {

            "timeout": timeout,

            "headers": self.common_headers,

        }

        if token:

            settings["auth"] = AuthorizationTokenAuth(api_token=token)

        return settings

init

def init(
    self
) -> None
View Source
    def init(self) -> None:

        self.api_atrr_names = [

            "sources",

            "entries",

            "streams",

            "utility",

            "settings",

            "groups",

            "persons",

            "notifications",

        ]

        self.iam_atrr_names = [

            "auth",

            "spaces",

            "lists",

            "licenses",

            "whoami",

            "tokens",

            "billing",

        ]

Client

class Client(
    api_token: Optional[str] = None,
    api_version: int = 1,
    timeout: float = 4.0
)
View Source
class Client:

    def __init__(

        self,

        api_token: Optional[str] = None,

        api_version: int = 1,

        timeout: float = constants.HTTP_CLIENT_TIMEOUT,

    ):

        """

        Creates and manages singleton of HTTP client, that is used to make

        request to API.

        """

        self.api_version = api_version

        self.csettings = self.client_settings(timeout=timeout, token=api_token)

        self.init()

    def init(self) -> None:

        self.api_atrr_names = [

            "sources",

            "entries",

            "streams",

            "utility",

            "settings",

            "groups",

            "persons",

            "notifications",

        ]

        self.iam_atrr_names = [

            "auth",

            "spaces",

            "lists",

            "licenses",

            "whoami",

            "tokens",

            "billing",

        ]

    @cached_property

    def is_async(self) -> bool:

        return self.__class__.__name__ == "AsyncClient"

    @property

    def common_headers(self) -> dict:

        root = "neuroio-python"

        if self.is_async:

            root = "neuroio-async-python"

        return {"User-Agent": f"{root}/{get_package_version()}"}

    def client_settings(

        self, timeout: float, token: str = None

    ) -> Dict[Any, Any]:

        settings = {

            "timeout": timeout,

            "headers": self.common_headers,

        }

        if token:

            settings["auth"] = AuthorizationTokenAuth(api_token=token)

        return settings

    def __getattr__(self, item: str) -> Any:

        if item in self.api_atrr_names + self.iam_atrr_names:

            return Catcher(

                prev=item,

                is_async=self.is_async,

                settings=self.csettings,

                api_version=self.api_version,

            )

        else:

            # Default behaviour

            raise AttributeError

Descendants

  • neuroio.AsyncClient

Instance variables

common_headers
is_async

Methods

client_settings

def client_settings(
    self,
    timeout: float,
    token: str = None
) -> Dict[Any, Any]
View Source
    def client_settings(

        self, timeout: float, token: str = None

    ) -> Dict[Any, Any]:

        settings = {

            "timeout": timeout,

            "headers": self.common_headers,

        }

        if token:

            settings["auth"] = AuthorizationTokenAuth(api_token=token)

        return settings

init

def init(
    self
) -> None
View Source
    def init(self) -> None:

        self.api_atrr_names = [

            "sources",

            "entries",

            "streams",

            "utility",

            "settings",

            "groups",

            "persons",

            "notifications",

        ]

        self.iam_atrr_names = [

            "auth",

            "spaces",

            "lists",

            "licenses",

            "whoami",

            "tokens",

            "billing",

        ]

EventListener

class EventListener(
    api_token: str,
    event_handler_func: Callable[[str], Awaitable[NoneType]],
    timeout: Optional[float] = 4.0
)
View Source
class EventListener:

    def __init__(

        self,

        api_token: str,

        event_handler_func: Callable[[str], Awaitable[None]],

        timeout: Optional[float] = constants.HTTP_CLIENT_TIMEOUT,

    ) -> None:

        """

        Creates and manages single WebSocket Client object, that is used to

        send & receive messages in Events service.

        """

        self.api_token = api_token

        self.event_handler_func = event_handler_func

        self.timeout = timeout

        self.websocket: Optional[WebSocketClientProtocol] = None

    async def listen(self) -> None:

        async for websocket in connect(

            constants.EVENTS_BASE_URL, open_timeout=self.timeout

        ):

            try:

                self.websocket = websocket

                await self.authorize()

                asyncio.create_task(repeat(5, self.ping))

                async for message in websocket:

                    if not isinstance(message, bytes):

                        await self.event_handler_func(message)

            except ConnectionClosed:

                self.websocket = None

                continue

    async def send_json(self, data: Union[List, Dict]) -> None:

        if self.websocket is not None:

            await self.websocket.send(json.dumps(data))

    async def authorize(self) -> None:

        await self.send_json(

            {"action": "AUTH", "data": {"token": self.api_token}}

        )

    async def ping(self) -> None:

        await self.send_json({"action": "PING"})

Methods

authorize

def authorize(
    self
) -> None
View Source
    async def authorize(self) -> None:

        await self.send_json(

            {"action": "AUTH", "data": {"token": self.api_token}}

        )

listen

def listen(
    self
) -> None
View Source
    async def listen(self) -> None:

        async for websocket in connect(

            constants.EVENTS_BASE_URL, open_timeout=self.timeout

        ):

            try:

                self.websocket = websocket

                await self.authorize()

                asyncio.create_task(repeat(5, self.ping))

                async for message in websocket:

                    if not isinstance(message, bytes):

                        await self.event_handler_func(message)

            except ConnectionClosed:

                self.websocket = None

                continue

ping

def ping(
    self
) -> None
View Source
    async def ping(self) -> None:

        await self.send_json({"action": "PING"})

send_json

def send_json(
    self,
    data: Union[List, Dict]
) -> None
View Source
    async def send_json(self, data: Union[List, Dict]) -> None:

        if self.websocket is not None:

            await self.websocket.send(json.dumps(data))