Skip to content

Module neuroio.groups.v1

None

None

View Source
from typing import List, Union

from httpx import Response

from neuroio.base import APIBase, APIBaseAsync, APIBaseBase

from neuroio.constants import sentinel

from neuroio.utils import request_query_processing

class GroupsBase(APIBaseBase):

    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/groups/persons/{key}/"

        else:

            return self.base_url + "/v1/groups/persons/"

class Impl(APIBase, GroupsBase):

    def create(self, name: str) -> Response:

        data = {"name": name}

        with self.get_client() as client:

            return client.post(url=self.get_url(), json=data)

    def list(

        self,

        q: Union[str, object] = sentinel,

        pids_include: Union[List[str], object] = sentinel,

        pids_exclude: Union[List[str], object] = sentinel,

        groups_ids: Union[List[int], object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        with self.get_client() as client:

            return client.get(url=self.get_url(), params=data)

    def get(self, id: int) -> Response:

        with self.get_client() as client:

            return client.get(url=self.get_url(f"{id}"))

    def update(self, id: int, name: str) -> Response:

        data = {"name": name}

        with self.get_client() as client:

            return client.patch(url=self.get_url(f"{id}"), json=data)

    def delete(self, id: int) -> Response:

        with self.get_client() as client:

            return client.delete(url=self.get_url(f"{id}"))

    def persons(

        self,

        id: int,

        pids: Union[List[str], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self", "id"])

        with self.get_client() as client:

            return client.get(url=self.get_url(f"{id}/pids"), params=data)

    def add(self, pids: List[str], groups_ids: List[int]) -> Response:

        data = {"pids": pids, "groups_ids": groups_ids}

        with self.get_client() as client:

            return client.post(url=self.get_url("pids"), json=data)

    def remove(self, pids: List[str], groups_ids: List[int]) -> Response:

        data = {"pids": pids, "groups_ids": groups_ids}

        with self.get_client() as client:

            return client.request(

                "DELETE", url=self.get_url("pids"), json=data

            )

class ImplAsync(APIBaseAsync, GroupsBase):

    async def create(self, name: str) -> Response:

        data = {"name": name}

        async with self.get_client() as client:

            return await client.post(url=self.get_url(), json=data)

    async def list(

        self,

        q: Union[str, object] = sentinel,

        pids_include: Union[List[str], object] = sentinel,

        pids_exclude: Union[List[str], object] = sentinel,

        groups_ids: Union[List[int], object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        async with self.get_client() as client:

            return await client.get(url=self.get_url(), params=data)

    async def get(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.get(url=self.get_url(f"{id}"))

    async def update(self, id: int, name: str) -> Response:

        data = {"name": name}

        async with self.get_client() as client:

            return await client.patch(url=self.get_url(f"{id}"), json=data)

    async def delete(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.delete(url=self.get_url(f"{id}"))

    async def persons(

        self,

        id: int,

        pids: Union[List[str], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self", "id"])

        async with self.get_client() as client:

            return await client.get(

                url=self.get_url(f"{id}/pids"), params=data

            )

    async def add(self, pids: List[str], groups_ids: List[int]) -> Response:

        data = {"pids": pids, "groups_ids": groups_ids}

        async with self.get_client() as client:

            return await client.post(url=self.get_url("pids"), json=data)

    async def remove(self, pids: List[str], groups_ids: List[int]) -> Response:

        data = {"pids": pids, "groups_ids": groups_ids}

        async with self.get_client() as client:

            return await client.request(

                "DELETE", url=self.get_url("pids"), json=data

            )

Variables

sentinel

Classes

GroupsBase

class GroupsBase(
    /,
    *args,
    **kwargs
)
View Source
class GroupsBase(APIBaseBase):

    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/groups/persons/{key}/"

        else:

            return self.base_url + "/v1/groups/persons/"

Ancestors (in MRO)

  • neuroio.base.APIBaseBase

Descendants

  • neuroio.groups.v1.Impl
  • neuroio.groups.v1.ImplAsync

Class variables

base_url

Methods

get_url

def get_url(
    self,
    key: str = None
) -> str
View Source
    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/groups/persons/{key}/"

        else:

            return self.base_url + "/v1/groups/persons/"

Impl

class Impl(
    settings: dict
)
View Source
class Impl(APIBase, GroupsBase):

    def create(self, name: str) -> Response:

        data = {"name": name}

        with self.get_client() as client:

            return client.post(url=self.get_url(), json=data)

    def list(

        self,

        q: Union[str, object] = sentinel,

        pids_include: Union[List[str], object] = sentinel,

        pids_exclude: Union[List[str], object] = sentinel,

        groups_ids: Union[List[int], object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        with self.get_client() as client:

            return client.get(url=self.get_url(), params=data)

    def get(self, id: int) -> Response:

        with self.get_client() as client:

            return client.get(url=self.get_url(f"{id}"))

    def update(self, id: int, name: str) -> Response:

        data = {"name": name}

        with self.get_client() as client:

            return client.patch(url=self.get_url(f"{id}"), json=data)

    def delete(self, id: int) -> Response:

        with self.get_client() as client:

            return client.delete(url=self.get_url(f"{id}"))

    def persons(

        self,

        id: int,

        pids: Union[List[str], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self", "id"])

        with self.get_client() as client:

            return client.get(url=self.get_url(f"{id}/pids"), params=data)

    def add(self, pids: List[str], groups_ids: List[int]) -> Response:

        data = {"pids": pids, "groups_ids": groups_ids}

        with self.get_client() as client:

            return client.post(url=self.get_url("pids"), json=data)

    def remove(self, pids: List[str], groups_ids: List[int]) -> Response:

        data = {"pids": pids, "groups_ids": groups_ids}

        with self.get_client() as client:

            return client.request(

                "DELETE", url=self.get_url("pids"), json=data

            )

Ancestors (in MRO)

  • neuroio.base.APIBase
  • abc.ABC
  • neuroio.base.Base
  • neuroio.groups.v1.GroupsBase
  • neuroio.base.APIBaseBase

Class variables

base_url

Methods

add

def add(
    self,
    pids: List[str],
    groups_ids: List[int]
) -> httpx.Response
View Source
    def add(self, pids: List[str], groups_ids: List[int]) -> Response:

        data = {"pids": pids, "groups_ids": groups_ids}

        with self.get_client() as client:

            return client.post(url=self.get_url("pids"), json=data)

create

def create(
    self,
    name: str
) -> httpx.Response
View Source
    def create(self, name: str) -> Response:

        data = {"name": name}

        with self.get_client() as client:

            return client.post(url=self.get_url(), json=data)

delete

def delete(
    self,
    id: int
) -> httpx.Response
View Source
    def delete(self, id: int) -> Response:

        with self.get_client() as client:

            return client.delete(url=self.get_url(f"{id}"))

get

def get(
    self,
    id: int
) -> httpx.Response
View Source
    def get(self, id: int) -> Response:

        with self.get_client() as client:

            return client.get(url=self.get_url(f"{id}"))

get_client

def get_client(
    self
) -> httpx.Client
View Source
    def get_client(self) -> Client:

        return Client(**self.settings)

get_url

def get_url(
    self,
    key: str = None
) -> str
View Source
    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/groups/persons/{key}/"

        else:

            return self.base_url + "/v1/groups/persons/"

list

def list(
    self,
    q: Union[str, object] = <object object at 0x10c207fb0>,
    pids_include: Union[List[str], object] = <object object at 0x10c207fb0>,
    pids_exclude: Union[List[str], object] = <object object at 0x10c207fb0>,
    groups_ids: Union[List[int], object] = <object object at 0x10c207fb0>,
    spaces_ids: Union[List[int], object] = <object object at 0x10c207fb0>,
    limit: int = 20,
    offset: int = 0
) -> httpx.Response
View Source
    def list(

        self,

        q: Union[str, object] = sentinel,

        pids_include: Union[List[str], object] = sentinel,

        pids_exclude: Union[List[str], object] = sentinel,

        groups_ids: Union[List[int], object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        with self.get_client() as client:

            return client.get(url=self.get_url(), params=data)

persons

def persons(
    self,
    id: int,
    pids: Union[List[str], object] = <object object at 0x10c207fb0>,
    limit: int = 20,
    offset: int = 0
) -> httpx.Response
View Source
    def persons(

        self,

        id: int,

        pids: Union[List[str], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self", "id"])

        with self.get_client() as client:

            return client.get(url=self.get_url(f"{id}/pids"), params=data)

remove

def remove(
    self,
    pids: List[str],
    groups_ids: List[int]
) -> httpx.Response
View Source
    def remove(self, pids: List[str], groups_ids: List[int]) -> Response:

        data = {"pids": pids, "groups_ids": groups_ids}

        with self.get_client() as client:

            return client.request(

                "DELETE", url=self.get_url("pids"), json=data

            )

update

def update(
    self,
    id: int,
    name: str
) -> httpx.Response
View Source
    def update(self, id: int, name: str) -> Response:

        data = {"name": name}

        with self.get_client() as client:

            return client.patch(url=self.get_url(f"{id}"), json=data)

ImplAsync

class ImplAsync(
    settings: dict
)
View Source
class ImplAsync(APIBaseAsync, GroupsBase):

    async def create(self, name: str) -> Response:

        data = {"name": name}

        async with self.get_client() as client:

            return await client.post(url=self.get_url(), json=data)

    async def list(

        self,

        q: Union[str, object] = sentinel,

        pids_include: Union[List[str], object] = sentinel,

        pids_exclude: Union[List[str], object] = sentinel,

        groups_ids: Union[List[int], object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        async with self.get_client() as client:

            return await client.get(url=self.get_url(), params=data)

    async def get(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.get(url=self.get_url(f"{id}"))

    async def update(self, id: int, name: str) -> Response:

        data = {"name": name}

        async with self.get_client() as client:

            return await client.patch(url=self.get_url(f"{id}"), json=data)

    async def delete(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.delete(url=self.get_url(f"{id}"))

    async def persons(

        self,

        id: int,

        pids: Union[List[str], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self", "id"])

        async with self.get_client() as client:

            return await client.get(

                url=self.get_url(f"{id}/pids"), params=data

            )

    async def add(self, pids: List[str], groups_ids: List[int]) -> Response:

        data = {"pids": pids, "groups_ids": groups_ids}

        async with self.get_client() as client:

            return await client.post(url=self.get_url("pids"), json=data)

    async def remove(self, pids: List[str], groups_ids: List[int]) -> Response:

        data = {"pids": pids, "groups_ids": groups_ids}

        async with self.get_client() as client:

            return await client.request(

                "DELETE", url=self.get_url("pids"), json=data

            )

Ancestors (in MRO)

  • neuroio.base.APIBaseAsync
  • abc.ABC
  • neuroio.base.Base
  • neuroio.groups.v1.GroupsBase
  • neuroio.base.APIBaseBase

Class variables

base_url

Methods

add

def add(
    self,
    pids: List[str],
    groups_ids: List[int]
) -> httpx.Response
View Source
    async def add(self, pids: List[str], groups_ids: List[int]) -> Response:

        data = {"pids": pids, "groups_ids": groups_ids}

        async with self.get_client() as client:

            return await client.post(url=self.get_url("pids"), json=data)

create

def create(
    self,
    name: str
) -> httpx.Response
View Source
    async def create(self, name: str) -> Response:

        data = {"name": name}

        async with self.get_client() as client:

            return await client.post(url=self.get_url(), json=data)

delete

def delete(
    self,
    id: int
) -> httpx.Response
View Source
    async def delete(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.delete(url=self.get_url(f"{id}"))

get

def get(
    self,
    id: int
) -> httpx.Response
View Source
    async def get(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.get(url=self.get_url(f"{id}"))

get_client

def get_client(
    self
) -> httpx.AsyncClient
View Source
    def get_client(self) -> AsyncClient:

        return AsyncClient(**self.settings)

get_url

def get_url(
    self,
    key: str = None
) -> str
View Source
    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/groups/persons/{key}/"

        else:

            return self.base_url + "/v1/groups/persons/"

list

def list(
    self,
    q: Union[str, object] = <object object at 0x10c207fb0>,
    pids_include: Union[List[str], object] = <object object at 0x10c207fb0>,
    pids_exclude: Union[List[str], object] = <object object at 0x10c207fb0>,
    groups_ids: Union[List[int], object] = <object object at 0x10c207fb0>,
    spaces_ids: Union[List[int], object] = <object object at 0x10c207fb0>,
    limit: int = 20,
    offset: int = 0
) -> httpx.Response
View Source
    async def list(

        self,

        q: Union[str, object] = sentinel,

        pids_include: Union[List[str], object] = sentinel,

        pids_exclude: Union[List[str], object] = sentinel,

        groups_ids: Union[List[int], object] = sentinel,

        spaces_ids: Union[List[int], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self"])

        async with self.get_client() as client:

            return await client.get(url=self.get_url(), params=data)

persons

def persons(
    self,
    id: int,
    pids: Union[List[str], object] = <object object at 0x10c207fb0>,
    limit: int = 20,
    offset: int = 0
) -> httpx.Response
View Source
    async def persons(

        self,

        id: int,

        pids: Union[List[str], object] = sentinel,

        limit: int = 20,

        offset: int = 0,

    ) -> Response:

        data = request_query_processing(locals(), ["self", "id"])

        async with self.get_client() as client:

            return await client.get(

                url=self.get_url(f"{id}/pids"), params=data

            )

remove

def remove(
    self,
    pids: List[str],
    groups_ids: List[int]
) -> httpx.Response
View Source
    async def remove(self, pids: List[str], groups_ids: List[int]) -> Response:

        data = {"pids": pids, "groups_ids": groups_ids}

        async with self.get_client() as client:

            return await client.request(

                "DELETE", url=self.get_url("pids"), json=data

            )

update

def update(
    self,
    id: int,
    name: str
) -> httpx.Response
View Source
    async def update(self, id: int, name: str) -> Response:

        data = {"name": name}

        async with self.get_client() as client:

            return await client.patch(url=self.get_url(f"{id}"), json=data)