Skip to content

Module neuroio.notifications.v1

None

None

View Source
from typing import List, Union

from httpx import Response

from neuroio.base import APIBase, APIBaseAsync, APIBaseBase

from neuroio.constants import (

    EntryLiveness,

    EntryMood,

    EntryResult,

    HttpMethod,

    Sex,

    sentinel,

)

from neuroio.utils import request_dict_processing, request_query_processing

class NotificationsBase(APIBaseBase):

    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/notifications/{key}/"

        else:

            return self.base_url + "/v1/notifications/"

class Impl(APIBase, NotificationsBase):

    def create(

        self,

        name: str,

        http_method: HttpMethod,

        destination_url: str,

        is_active: bool = True,

        moods: Union[List[EntryMood], object] = sentinel,

        results: Union[List[EntryResult], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[List[Sex], object] = sentinel,

        sources: Union[List[int], object] = sentinel,

        persons_groups: Union[List[int], object] = sentinel,

    ) -> Response:

        data = request_dict_processing(locals(), ["self"])

        with self.get_client() as client:

            return client.post(url=self.get_url(), json=data)

    def list(

        self,

        q: Union[str, object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        with self.get_client() as client:

            return client.get(url=self.get_url(), params=data)

    def get(self, id: int) -> Response:

        with self.get_client() as client:

            return client.get(url=self.get_url(f"{id}"))

    def update(

        self,

        id: int,

        name: str,

        http_method: HttpMethod,

        destination_url: str,

        is_active: bool = True,

        moods: Union[List[EntryMood], object] = sentinel,

        results: Union[List[EntryResult], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[List[Sex], object] = sentinel,

        sources: Union[List[int], object] = sentinel,

        persons_groups: Union[List[int], object] = sentinel,

    ) -> Response:

        data = request_dict_processing(locals(), ["self", "id"])

        with self.get_client() as client:

            return client.patch(url=self.get_url(f"{id}"), json=data)

    def delete(self, id: int) -> Response:

        with self.get_client() as client:

            return client.delete(url=self.get_url(f"{id}"))

class ImplAsync(APIBaseAsync, NotificationsBase):

    async def create(

        self,

        name: str,

        http_method: HttpMethod,

        destination_url: str,

        is_active: bool = True,

        moods: Union[List[EntryMood], object] = sentinel,

        results: Union[List[EntryResult], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[List[Sex], object] = sentinel,

        sources: Union[List[int], object] = sentinel,

        persons_groups: Union[List[int], object] = sentinel,

    ) -> Response:

        data = request_dict_processing(locals(), ["self"])

        async with self.get_client() as client:

            return await client.post(url=self.get_url(), json=data)

    async def list(

        self,

        q: Union[str, object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        async with self.get_client() as client:

            return await client.get(url=self.get_url(), params=data)

    async def get(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.get(url=self.get_url(f"{id}"))

    async def update(

        self,

        id: int,

        name: str,

        http_method: HttpMethod,

        destination_url: str,

        is_active: bool = True,

        moods: Union[List[EntryMood], object] = sentinel,

        results: Union[List[EntryResult], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[List[Sex], object] = sentinel,

        sources: Union[List[int], object] = sentinel,

        persons_groups: Union[List[int], object] = sentinel,

    ) -> Response:

        data = request_dict_processing(locals(), ["self", "id"])

        async with self.get_client() as client:

            return await client.patch(url=self.get_url(f"{id}"), json=data)

    async def delete(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.delete(url=self.get_url(f"{id}"))

Variables

sentinel

Classes

Impl

class Impl(
    settings: dict
)
View Source
class Impl(APIBase, NotificationsBase):

    def create(

        self,

        name: str,

        http_method: HttpMethod,

        destination_url: str,

        is_active: bool = True,

        moods: Union[List[EntryMood], object] = sentinel,

        results: Union[List[EntryResult], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[List[Sex], object] = sentinel,

        sources: Union[List[int], object] = sentinel,

        persons_groups: Union[List[int], object] = sentinel,

    ) -> Response:

        data = request_dict_processing(locals(), ["self"])

        with self.get_client() as client:

            return client.post(url=self.get_url(), json=data)

    def list(

        self,

        q: Union[str, object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        with self.get_client() as client:

            return client.get(url=self.get_url(), params=data)

    def get(self, id: int) -> Response:

        with self.get_client() as client:

            return client.get(url=self.get_url(f"{id}"))

    def update(

        self,

        id: int,

        name: str,

        http_method: HttpMethod,

        destination_url: str,

        is_active: bool = True,

        moods: Union[List[EntryMood], object] = sentinel,

        results: Union[List[EntryResult], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[List[Sex], object] = sentinel,

        sources: Union[List[int], object] = sentinel,

        persons_groups: Union[List[int], object] = sentinel,

    ) -> Response:

        data = request_dict_processing(locals(), ["self", "id"])

        with self.get_client() as client:

            return client.patch(url=self.get_url(f"{id}"), json=data)

    def delete(self, id: int) -> Response:

        with self.get_client() as client:

            return client.delete(url=self.get_url(f"{id}"))

Ancestors (in MRO)

  • neuroio.base.APIBase
  • abc.ABC
  • neuroio.base.Base
  • neuroio.notifications.v1.NotificationsBase
  • neuroio.base.APIBaseBase

Class variables

base_url

Methods

create

def create(
    self,
    name: str,
    http_method: neuroio.constants.HttpMethod,
    destination_url: str,
    is_active: bool = True,
    moods: Union[List[neuroio.constants.EntryMood], object] = <object object at 0x10c207fb0>,
    results: Union[List[neuroio.constants.EntryResult], object] = <object object at 0x10c207fb0>,
    liveness: Union[List[neuroio.constants.EntryLiveness], object] = <object object at 0x10c207fb0>,
    age_from: Union[int, object] = <object object at 0x10c207fb0>,
    age_to: Union[int, object] = <object object at 0x10c207fb0>,
    sex: Union[List[neuroio.constants.Sex], object] = <object object at 0x10c207fb0>,
    sources: Union[List[int], object] = <object object at 0x10c207fb0>,
    persons_groups: Union[List[int], object] = <object object at 0x10c207fb0>
) -> httpx.Response
View Source
    def create(

        self,

        name: str,

        http_method: HttpMethod,

        destination_url: str,

        is_active: bool = True,

        moods: Union[List[EntryMood], object] = sentinel,

        results: Union[List[EntryResult], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[List[Sex], object] = sentinel,

        sources: Union[List[int], object] = sentinel,

        persons_groups: Union[List[int], object] = sentinel,

    ) -> Response:

        data = request_dict_processing(locals(), ["self"])

        with self.get_client() as client:

            return client.post(url=self.get_url(), json=data)

delete

def delete(
    self,
    id: int
) -> httpx.Response
View Source
    def delete(self, id: int) -> Response:

        with self.get_client() as client:

            return client.delete(url=self.get_url(f"{id}"))

get

def get(
    self,
    id: int
) -> httpx.Response
View Source
    def get(self, id: int) -> Response:

        with self.get_client() as client:

            return client.get(url=self.get_url(f"{id}"))

get_client

def get_client(
    self
) -> httpx.Client
View Source
    def get_client(self) -> Client:

        return Client(**self.settings)

get_url

def get_url(
    self,
    key: str = None
) -> str
View Source
    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/notifications/{key}/"

        else:

            return self.base_url + "/v1/notifications/"

list

def list(
    self,
    q: Union[str, object] = <object object at 0x10c207fb0>,
    spaces_ids: Union[List[int], object] = <object object at 0x10c207fb0>,
    limit: int = 20,
    offset: int = 0
) -> httpx.Response
View Source
    def list(

        self,

        q: Union[str, object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        with self.get_client() as client:

            return client.get(url=self.get_url(), params=data)

update

def update(
    self,
    id: int,
    name: str,
    http_method: neuroio.constants.HttpMethod,
    destination_url: str,
    is_active: bool = True,
    moods: Union[List[neuroio.constants.EntryMood], object] = <object object at 0x10c207fb0>,
    results: Union[List[neuroio.constants.EntryResult], object] = <object object at 0x10c207fb0>,
    liveness: Union[List[neuroio.constants.EntryLiveness], object] = <object object at 0x10c207fb0>,
    age_from: Union[int, object] = <object object at 0x10c207fb0>,
    age_to: Union[int, object] = <object object at 0x10c207fb0>,
    sex: Union[List[neuroio.constants.Sex], object] = <object object at 0x10c207fb0>,
    sources: Union[List[int], object] = <object object at 0x10c207fb0>,
    persons_groups: Union[List[int], object] = <object object at 0x10c207fb0>
) -> httpx.Response
View Source
    def update(

        self,

        id: int,

        name: str,

        http_method: HttpMethod,

        destination_url: str,

        is_active: bool = True,

        moods: Union[List[EntryMood], object] = sentinel,

        results: Union[List[EntryResult], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[List[Sex], object] = sentinel,

        sources: Union[List[int], object] = sentinel,

        persons_groups: Union[List[int], object] = sentinel,

    ) -> Response:

        data = request_dict_processing(locals(), ["self", "id"])

        with self.get_client() as client:

            return client.patch(url=self.get_url(f"{id}"), json=data)

ImplAsync

class ImplAsync(
    settings: dict
)
View Source
class ImplAsync(APIBaseAsync, NotificationsBase):

    async def create(

        self,

        name: str,

        http_method: HttpMethod,

        destination_url: str,

        is_active: bool = True,

        moods: Union[List[EntryMood], object] = sentinel,

        results: Union[List[EntryResult], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[List[Sex], object] = sentinel,

        sources: Union[List[int], object] = sentinel,

        persons_groups: Union[List[int], object] = sentinel,

    ) -> Response:

        data = request_dict_processing(locals(), ["self"])

        async with self.get_client() as client:

            return await client.post(url=self.get_url(), json=data)

    async def list(

        self,

        q: Union[str, object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        async with self.get_client() as client:

            return await client.get(url=self.get_url(), params=data)

    async def get(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.get(url=self.get_url(f"{id}"))

    async def update(

        self,

        id: int,

        name: str,

        http_method: HttpMethod,

        destination_url: str,

        is_active: bool = True,

        moods: Union[List[EntryMood], object] = sentinel,

        results: Union[List[EntryResult], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[List[Sex], object] = sentinel,

        sources: Union[List[int], object] = sentinel,

        persons_groups: Union[List[int], object] = sentinel,

    ) -> Response:

        data = request_dict_processing(locals(), ["self", "id"])

        async with self.get_client() as client:

            return await client.patch(url=self.get_url(f"{id}"), json=data)

    async def delete(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.delete(url=self.get_url(f"{id}"))

Ancestors (in MRO)

  • neuroio.base.APIBaseAsync
  • abc.ABC
  • neuroio.base.Base
  • neuroio.notifications.v1.NotificationsBase
  • neuroio.base.APIBaseBase

Class variables

base_url

Methods

create

def create(
    self,
    name: str,
    http_method: neuroio.constants.HttpMethod,
    destination_url: str,
    is_active: bool = True,
    moods: Union[List[neuroio.constants.EntryMood], object] = <object object at 0x10c207fb0>,
    results: Union[List[neuroio.constants.EntryResult], object] = <object object at 0x10c207fb0>,
    liveness: Union[List[neuroio.constants.EntryLiveness], object] = <object object at 0x10c207fb0>,
    age_from: Union[int, object] = <object object at 0x10c207fb0>,
    age_to: Union[int, object] = <object object at 0x10c207fb0>,
    sex: Union[List[neuroio.constants.Sex], object] = <object object at 0x10c207fb0>,
    sources: Union[List[int], object] = <object object at 0x10c207fb0>,
    persons_groups: Union[List[int], object] = <object object at 0x10c207fb0>
) -> httpx.Response
View Source
    async def create(

        self,

        name: str,

        http_method: HttpMethod,

        destination_url: str,

        is_active: bool = True,

        moods: Union[List[EntryMood], object] = sentinel,

        results: Union[List[EntryResult], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[List[Sex], object] = sentinel,

        sources: Union[List[int], object] = sentinel,

        persons_groups: Union[List[int], object] = sentinel,

    ) -> Response:

        data = request_dict_processing(locals(), ["self"])

        async with self.get_client() as client:

            return await client.post(url=self.get_url(), json=data)

delete

def delete(
    self,
    id: int
) -> httpx.Response
View Source
    async def delete(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.delete(url=self.get_url(f"{id}"))

get

def get(
    self,
    id: int
) -> httpx.Response
View Source
    async def get(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.get(url=self.get_url(f"{id}"))

get_client

def get_client(
    self
) -> httpx.AsyncClient
View Source
    def get_client(self) -> AsyncClient:

        return AsyncClient(**self.settings)

get_url

def get_url(
    self,
    key: str = None
) -> str
View Source
    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/notifications/{key}/"

        else:

            return self.base_url + "/v1/notifications/"

list

def list(
    self,
    q: Union[str, object] = <object object at 0x10c207fb0>,
    spaces_ids: Union[List[int], object] = <object object at 0x10c207fb0>,
    limit: int = 20,
    offset: int = 0
) -> httpx.Response
View Source
    async def list(

        self,

        q: Union[str, object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        async with self.get_client() as client:

            return await client.get(url=self.get_url(), params=data)

update

def update(
    self,
    id: int,
    name: str,
    http_method: neuroio.constants.HttpMethod,
    destination_url: str,
    is_active: bool = True,
    moods: Union[List[neuroio.constants.EntryMood], object] = <object object at 0x10c207fb0>,
    results: Union[List[neuroio.constants.EntryResult], object] = <object object at 0x10c207fb0>,
    liveness: Union[List[neuroio.constants.EntryLiveness], object] = <object object at 0x10c207fb0>,
    age_from: Union[int, object] = <object object at 0x10c207fb0>,
    age_to: Union[int, object] = <object object at 0x10c207fb0>,
    sex: Union[List[neuroio.constants.Sex], object] = <object object at 0x10c207fb0>,
    sources: Union[List[int], object] = <object object at 0x10c207fb0>,
    persons_groups: Union[List[int], object] = <object object at 0x10c207fb0>
) -> httpx.Response
View Source
    async def update(

        self,

        id: int,

        name: str,

        http_method: HttpMethod,

        destination_url: str,

        is_active: bool = True,

        moods: Union[List[EntryMood], object] = sentinel,

        results: Union[List[EntryResult], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[List[Sex], object] = sentinel,

        sources: Union[List[int], object] = sentinel,

        persons_groups: Union[List[int], object] = sentinel,

    ) -> Response:

        data = request_dict_processing(locals(), ["self", "id"])

        async with self.get_client() as client:

            return await client.patch(url=self.get_url(f"{id}"), json=data)

NotificationsBase

class NotificationsBase(
    /,
    *args,
    **kwargs
)
View Source
class NotificationsBase(APIBaseBase):

    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/notifications/{key}/"

        else:

            return self.base_url + "/v1/notifications/"

Ancestors (in MRO)

  • neuroio.base.APIBaseBase

Descendants

  • neuroio.notifications.v1.Impl
  • neuroio.notifications.v1.ImplAsync

Class variables

base_url

Methods

get_url

def get_url(
    self,
    key: str = None
) -> str
View Source
    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/notifications/{key}/"

        else:

            return self.base_url + "/v1/notifications/"