Skip to content

Module neuroio.entries.v1

None

None

View Source
from datetime import datetime

from typing import List, Union

from httpx import Response

from neuroio.base import APIBase, APIBaseAsync, APIBaseBase

from neuroio.constants import EntryLiveness, EntryMood, EntryResult, sentinel

from neuroio.utils import request_query_processing

class EntriesBase(APIBaseBase):

    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/entries/{key}/"

        else:

            return self.base_url + "/v1/entries/"

class Impl(APIBase, EntriesBase):

    def list(

        self,

        pid: Union[List[str], object] = sentinel,

        result: Union[List[EntryResult], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[int, object] = sentinel,

        mood: Union[List[EntryMood], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        sources_ids: Union[List[int], object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        date_from: Union[datetime, object] = sentinel,

        date_to: Union[datetime, object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        with self.get_client() as client:

            return client.get(url=self.get_url(), params=data)

    def get(self, pid: str) -> Response:

        with self.get_client() as client:

            return client.get(url=self.get_url(f"stats/pid/{pid}"))

    def delete(self, id: int) -> Response:

        with self.get_client() as client:

            return client.delete(url=self.get_url(f"{id}"))

class ImplAsync(APIBaseAsync, EntriesBase):

    async def list(

        self,

        pid: Union[List[str], object] = sentinel,

        result: Union[List[EntryResult], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[int, object] = sentinel,

        mood: Union[List[EntryMood], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        sources_ids: Union[List[int], object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        date_from: Union[datetime, object] = sentinel,

        date_to: Union[datetime, object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        async with self.get_client() as client:

            return await client.get(url=self.get_url(), params=data)

    async def get(self, pid: str) -> Response:

        async with self.get_client() as client:

            return await client.get(url=self.get_url(f"stats/pid/{pid}"))

    async def delete(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.delete(url=self.get_url(f"{id}"))

Variables

sentinel

Classes

EntriesBase

class EntriesBase(
    /,
    *args,
    **kwargs
)
View Source
class EntriesBase(APIBaseBase):

    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/entries/{key}/"

        else:

            return self.base_url + "/v1/entries/"

Ancestors (in MRO)

  • neuroio.base.APIBaseBase

Descendants

  • neuroio.entries.v1.Impl
  • neuroio.entries.v1.ImplAsync

Class variables

base_url

Methods

get_url

def get_url(
    self,
    key: str = None
) -> str
View Source
    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/entries/{key}/"

        else:

            return self.base_url + "/v1/entries/"

Impl

class Impl(
    settings: dict
)
View Source
class Impl(APIBase, EntriesBase):

    def list(

        self,

        pid: Union[List[str], object] = sentinel,

        result: Union[List[EntryResult], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[int, object] = sentinel,

        mood: Union[List[EntryMood], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        sources_ids: Union[List[int], object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        date_from: Union[datetime, object] = sentinel,

        date_to: Union[datetime, object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        with self.get_client() as client:

            return client.get(url=self.get_url(), params=data)

    def get(self, pid: str) -> Response:

        with self.get_client() as client:

            return client.get(url=self.get_url(f"stats/pid/{pid}"))

    def delete(self, id: int) -> Response:

        with self.get_client() as client:

            return client.delete(url=self.get_url(f"{id}"))

Ancestors (in MRO)

  • neuroio.base.APIBase
  • abc.ABC
  • neuroio.base.Base
  • neuroio.entries.v1.EntriesBase
  • neuroio.base.APIBaseBase

Class variables

base_url

Methods

delete

def delete(
    self,
    id: int
) -> httpx.Response
View Source
    def delete(self, id: int) -> Response:

        with self.get_client() as client:

            return client.delete(url=self.get_url(f"{id}"))

get

def get(
    self,
    pid: str
) -> httpx.Response
View Source
    def get(self, pid: str) -> Response:

        with self.get_client() as client:

            return client.get(url=self.get_url(f"stats/pid/{pid}"))

get_client

def get_client(
    self
) -> httpx.Client
View Source
    def get_client(self) -> Client:

        return Client(**self.settings)

get_url

def get_url(
    self,
    key: str = None
) -> str
View Source
    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/entries/{key}/"

        else:

            return self.base_url + "/v1/entries/"

list

def list(
    self,
    pid: Union[List[str], object] = <object object at 0x10c207fb0>,
    result: Union[List[neuroio.constants.EntryResult], object] = <object object at 0x10c207fb0>,
    age_from: Union[int, object] = <object object at 0x10c207fb0>,
    age_to: Union[int, object] = <object object at 0x10c207fb0>,
    sex: Union[int, object] = <object object at 0x10c207fb0>,
    mood: Union[List[neuroio.constants.EntryMood], object] = <object object at 0x10c207fb0>,
    liveness: Union[List[neuroio.constants.EntryLiveness], object] = <object object at 0x10c207fb0>,
    sources_ids: Union[List[int], object] = <object object at 0x10c207fb0>,
    spaces_ids: Union[List[int], object] = <object object at 0x10c207fb0>,
    date_from: Union[datetime.datetime, object] = <object object at 0x10c207fb0>,
    date_to: Union[datetime.datetime, object] = <object object at 0x10c207fb0>,
    limit: int = 20,
    offset: int = 0
) -> httpx.Response
View Source
    def list(

        self,

        pid: Union[List[str], object] = sentinel,

        result: Union[List[EntryResult], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[int, object] = sentinel,

        mood: Union[List[EntryMood], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        sources_ids: Union[List[int], object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        date_from: Union[datetime, object] = sentinel,

        date_to: Union[datetime, object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        with self.get_client() as client:

            return client.get(url=self.get_url(), params=data)

ImplAsync

class ImplAsync(
    settings: dict
)
View Source
class ImplAsync(APIBaseAsync, EntriesBase):

    async def list(

        self,

        pid: Union[List[str], object] = sentinel,

        result: Union[List[EntryResult], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[int, object] = sentinel,

        mood: Union[List[EntryMood], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        sources_ids: Union[List[int], object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        date_from: Union[datetime, object] = sentinel,

        date_to: Union[datetime, object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        async with self.get_client() as client:

            return await client.get(url=self.get_url(), params=data)

    async def get(self, pid: str) -> Response:

        async with self.get_client() as client:

            return await client.get(url=self.get_url(f"stats/pid/{pid}"))

    async def delete(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.delete(url=self.get_url(f"{id}"))

Ancestors (in MRO)

  • neuroio.base.APIBaseAsync
  • abc.ABC
  • neuroio.base.Base
  • neuroio.entries.v1.EntriesBase
  • neuroio.base.APIBaseBase

Class variables

base_url

Methods

delete

def delete(
    self,
    id: int
) -> httpx.Response
View Source
    async def delete(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.delete(url=self.get_url(f"{id}"))

get

def get(
    self,
    pid: str
) -> httpx.Response
View Source
    async def get(self, pid: str) -> Response:

        async with self.get_client() as client:

            return await client.get(url=self.get_url(f"stats/pid/{pid}"))

get_client

def get_client(
    self
) -> httpx.AsyncClient
View Source
    def get_client(self) -> AsyncClient:

        return AsyncClient(**self.settings)

get_url

def get_url(
    self,
    key: str = None
) -> str
View Source
    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/entries/{key}/"

        else:

            return self.base_url + "/v1/entries/"

list

def list(
    self,
    pid: Union[List[str], object] = <object object at 0x10c207fb0>,
    result: Union[List[neuroio.constants.EntryResult], object] = <object object at 0x10c207fb0>,
    age_from: Union[int, object] = <object object at 0x10c207fb0>,
    age_to: Union[int, object] = <object object at 0x10c207fb0>,
    sex: Union[int, object] = <object object at 0x10c207fb0>,
    mood: Union[List[neuroio.constants.EntryMood], object] = <object object at 0x10c207fb0>,
    liveness: Union[List[neuroio.constants.EntryLiveness], object] = <object object at 0x10c207fb0>,
    sources_ids: Union[List[int], object] = <object object at 0x10c207fb0>,
    spaces_ids: Union[List[int], object] = <object object at 0x10c207fb0>,
    date_from: Union[datetime.datetime, object] = <object object at 0x10c207fb0>,
    date_to: Union[datetime.datetime, object] = <object object at 0x10c207fb0>,
    limit: int = 20,
    offset: int = 0
) -> httpx.Response
View Source
    async def list(

        self,

        pid: Union[List[str], object] = sentinel,

        result: Union[List[EntryResult], object] = sentinel,

        age_from: Union[int, object] = sentinel,

        age_to: Union[int, object] = sentinel,

        sex: Union[int, object] = sentinel,

        mood: Union[List[EntryMood], object] = sentinel,

        liveness: Union[List[EntryLiveness], object] = sentinel,

        sources_ids: Union[List[int], object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        date_from: Union[datetime, object] = sentinel,

        date_to: Union[datetime, object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        async with self.get_client() as client:

            return await client.get(url=self.get_url(), params=data)