Module neuroio
neuroio-python
A Python package for interacting with the NeuroIO API
View Source
"""**neuroio-python**
A Python package for interacting with the NeuroIO API
"""
from .clients import AsyncClient, Client
from .listeners import EventListener
__version__: str = "0.1.0"
__all__ = ["__version__", "Client", "AsyncClient", "EventListener"]
Sub-modules
- neuroio.auth
- neuroio.auth_token
- neuroio.base
- neuroio.billing
- neuroio.clients
- neuroio.constants
- neuroio.entries
- neuroio.groups
- neuroio.licenses
- neuroio.listeners
- neuroio.lists
- neuroio.notifications
- neuroio.persons
- neuroio.settings
- neuroio.sources
- neuroio.spaces
- neuroio.streams
- neuroio.tokens
- neuroio.utility
- neuroio.utils
- neuroio.whoami
Variables
__version__
Classes
AsyncClient
class AsyncClient(
api_token: Optional[str] = None,
api_version: int = 1,
timeout: float = 4.0
)
View Source
class AsyncClient(Client):
pass
Ancestors (in MRO)
- neuroio.Client
Instance variables
common_headers
is_async
Methods
client_settings
def client_settings(
self,
timeout: float,
token: str = None
) -> Dict[Any, Any]
View Source
def client_settings(
self, timeout: float, token: str = None
) -> Dict[Any, Any]:
settings = {
"timeout": timeout,
"headers": self.common_headers,
}
if token:
settings["auth"] = AuthorizationTokenAuth(api_token=token)
return settings
init
def init(
self
) -> None
View Source
def init(self) -> None:
self.api_atrr_names = [
"sources",
"entries",
"streams",
"utility",
"settings",
"groups",
"persons",
"notifications",
]
self.iam_atrr_names = [
"auth",
"spaces",
"lists",
"licenses",
"whoami",
"tokens",
"billing",
]
Client
class Client(
api_token: Optional[str] = None,
api_version: int = 1,
timeout: float = 4.0
)
View Source
class Client:
def __init__(
self,
api_token: Optional[str] = None,
api_version: int = 1,
timeout: float = constants.HTTP_CLIENT_TIMEOUT,
):
"""
Creates and manages singleton of HTTP client, that is used to make
request to API.
"""
self.api_version = api_version
self.csettings = self.client_settings(timeout=timeout, token=api_token)
self.init()
def init(self) -> None:
self.api_atrr_names = [
"sources",
"entries",
"streams",
"utility",
"settings",
"groups",
"persons",
"notifications",
]
self.iam_atrr_names = [
"auth",
"spaces",
"lists",
"licenses",
"whoami",
"tokens",
"billing",
]
@cached_property
def is_async(self) -> bool:
return self.__class__.__name__ == "AsyncClient"
@property
def common_headers(self) -> dict:
root = "neuroio-python"
if self.is_async:
root = "neuroio-async-python"
return {"User-Agent": f"{root}/{get_package_version()}"}
def client_settings(
self, timeout: float, token: str = None
) -> Dict[Any, Any]:
settings = {
"timeout": timeout,
"headers": self.common_headers,
}
if token:
settings["auth"] = AuthorizationTokenAuth(api_token=token)
return settings
def __getattr__(self, item: str) -> Any:
if item in self.api_atrr_names + self.iam_atrr_names:
return Catcher(
prev=item,
is_async=self.is_async,
settings=self.csettings,
api_version=self.api_version,
)
else:
# Default behaviour
raise AttributeError
Descendants
- neuroio.AsyncClient
Instance variables
common_headers
is_async
Methods
client_settings
def client_settings(
self,
timeout: float,
token: str = None
) -> Dict[Any, Any]
View Source
def client_settings(
self, timeout: float, token: str = None
) -> Dict[Any, Any]:
settings = {
"timeout": timeout,
"headers": self.common_headers,
}
if token:
settings["auth"] = AuthorizationTokenAuth(api_token=token)
return settings
init
def init(
self
) -> None
View Source
def init(self) -> None:
self.api_atrr_names = [
"sources",
"entries",
"streams",
"utility",
"settings",
"groups",
"persons",
"notifications",
]
self.iam_atrr_names = [
"auth",
"spaces",
"lists",
"licenses",
"whoami",
"tokens",
"billing",
]
EventListener
class EventListener(
api_token: str,
event_handler_func: Callable[[str], Awaitable[NoneType]],
timeout: Optional[float] = 4.0
)
View Source
class EventListener:
def __init__(
self,
api_token: str,
event_handler_func: Callable[[str], Awaitable[None]],
timeout: Optional[float] = constants.HTTP_CLIENT_TIMEOUT,
) -> None:
"""
Creates and manages single WebSocket Client object, that is used to
send & receive messages in Events service.
"""
self.api_token = api_token
self.event_handler_func = event_handler_func
self.timeout = timeout
self.websocket: Optional[WebSocketClientProtocol] = None
async def listen(self) -> None:
async for websocket in connect(
constants.EVENTS_BASE_URL, open_timeout=self.timeout
):
try:
self.websocket = websocket
await self.authorize()
asyncio.create_task(repeat(5, self.ping))
async for message in websocket:
if not isinstance(message, bytes):
await self.event_handler_func(message)
except ConnectionClosed:
self.websocket = None
continue
async def send_json(self, data: Union[List, Dict]) -> None:
if self.websocket is not None:
await self.websocket.send(json.dumps(data))
async def authorize(self) -> None:
await self.send_json(
{"action": "AUTH", "data": {"token": self.api_token}}
)
async def ping(self) -> None:
await self.send_json({"action": "PING"})
Methods
authorize
def authorize(
self
) -> None
View Source
async def authorize(self) -> None:
await self.send_json(
{"action": "AUTH", "data": {"token": self.api_token}}
)
listen
def listen(
self
) -> None
View Source
async def listen(self) -> None:
async for websocket in connect(
constants.EVENTS_BASE_URL, open_timeout=self.timeout
):
try:
self.websocket = websocket
await self.authorize()
asyncio.create_task(repeat(5, self.ping))
async for message in websocket:
if not isinstance(message, bytes):
await self.event_handler_func(message)
except ConnectionClosed:
self.websocket = None
continue
ping
def ping(
self
) -> None
View Source
async def ping(self) -> None:
await self.send_json({"action": "PING"})
send_json
def send_json(
self,
data: Union[List, Dict]
) -> None
View Source
async def send_json(self, data: Union[List, Dict]) -> None:
if self.websocket is not None:
await self.websocket.send(json.dumps(data))