Module neuroio.groups.v1
None
None
View Source
from typing import List, Union
from httpx import Response
from neuroio.base import APIBase, APIBaseAsync, APIBaseBase
from neuroio.constants import sentinel
from neuroio.utils import request_query_processing
class GroupsBase(APIBaseBase):
def get_url(self, key: str = None) -> str:
if key:
return self.base_url + f"/v1/groups/persons/{key}/"
else:
return self.base_url + "/v1/groups/persons/"
class Impl(APIBase, GroupsBase):
def create(self, name: str) -> Response:
data = {"name": name}
with self.get_client() as client:
return client.post(url=self.get_url(), json=data)
def list(
self,
q: Union[str, object] = sentinel,
pids_include: Union[List[str], object] = sentinel,
pids_exclude: Union[List[str], object] = sentinel,
groups_ids: Union[List[int], object] = sentinel,
spaces_ids: Union[List[int], object] = sentinel,
limit: int = 20,
offset: int = 0,
) -> Response:
data = request_query_processing(locals(), ["self"])
with self.get_client() as client:
return client.get(url=self.get_url(), params=data)
def get(self, id: int) -> Response:
with self.get_client() as client:
return client.get(url=self.get_url(f"{id}"))
def update(self, id: int, name: str) -> Response:
data = {"name": name}
with self.get_client() as client:
return client.patch(url=self.get_url(f"{id}"), json=data)
def delete(self, id: int) -> Response:
with self.get_client() as client:
return client.delete(url=self.get_url(f"{id}"))
def persons(
self,
id: int,
pids: Union[List[str], object] = sentinel,
limit: int = 20,
offset: int = 0,
) -> Response:
data = request_query_processing(locals(), ["self", "id"])
with self.get_client() as client:
return client.get(url=self.get_url(f"{id}/pids"), params=data)
def add(self, pids: List[str], groups_ids: List[int]) -> Response:
data = {"pids": pids, "groups_ids": groups_ids}
with self.get_client() as client:
return client.post(url=self.get_url("pids"), json=data)
def remove(self, pids: List[str], groups_ids: List[int]) -> Response:
data = {"pids": pids, "groups_ids": groups_ids}
with self.get_client() as client:
return client.request(
"DELETE", url=self.get_url("pids"), json=data
)
class ImplAsync(APIBaseAsync, GroupsBase):
async def create(self, name: str) -> Response:
data = {"name": name}
async with self.get_client() as client:
return await client.post(url=self.get_url(), json=data)
async def list(
self,
q: Union[str, object] = sentinel,
pids_include: Union[List[str], object] = sentinel,
pids_exclude: Union[List[str], object] = sentinel,
groups_ids: Union[List[int], object] = sentinel,
spaces_ids: Union[List[int], object] = sentinel,
limit: int = 20,
offset: int = 0,
) -> Response:
data = request_query_processing(locals(), ["self"])
async with self.get_client() as client:
return await client.get(url=self.get_url(), params=data)
async def get(self, id: int) -> Response:
async with self.get_client() as client:
return await client.get(url=self.get_url(f"{id}"))
async def update(self, id: int, name: str) -> Response:
data = {"name": name}
async with self.get_client() as client:
return await client.patch(url=self.get_url(f"{id}"), json=data)
async def delete(self, id: int) -> Response:
async with self.get_client() as client:
return await client.delete(url=self.get_url(f"{id}"))
async def persons(
self,
id: int,
pids: Union[List[str], object] = sentinel,
limit: int = 20,
offset: int = 0,
) -> Response:
data = request_query_processing(locals(), ["self", "id"])
async with self.get_client() as client:
return await client.get(
url=self.get_url(f"{id}/pids"), params=data
)
async def add(self, pids: List[str], groups_ids: List[int]) -> Response:
data = {"pids": pids, "groups_ids": groups_ids}
async with self.get_client() as client:
return await client.post(url=self.get_url("pids"), json=data)
async def remove(self, pids: List[str], groups_ids: List[int]) -> Response:
data = {"pids": pids, "groups_ids": groups_ids}
async with self.get_client() as client:
return await client.request(
"DELETE", url=self.get_url("pids"), json=data
)
Variables
sentinel
Classes
GroupsBase
class GroupsBase(
/,
*args,
**kwargs
)
View Source
class GroupsBase(APIBaseBase):
def get_url(self, key: str = None) -> str:
if key:
return self.base_url + f"/v1/groups/persons/{key}/"
else:
return self.base_url + "/v1/groups/persons/"
Ancestors (in MRO)
- neuroio.base.APIBaseBase
Descendants
- neuroio.groups.v1.Impl
- neuroio.groups.v1.ImplAsync
Class variables
base_url
Methods
get_url
def get_url(
self,
key: str = None
) -> str
View Source
def get_url(self, key: str = None) -> str:
if key:
return self.base_url + f"/v1/groups/persons/{key}/"
else:
return self.base_url + "/v1/groups/persons/"
Impl
class Impl(
settings: dict
)
View Source
class Impl(APIBase, GroupsBase):
def create(self, name: str) -> Response:
data = {"name": name}
with self.get_client() as client:
return client.post(url=self.get_url(), json=data)
def list(
self,
q: Union[str, object] = sentinel,
pids_include: Union[List[str], object] = sentinel,
pids_exclude: Union[List[str], object] = sentinel,
groups_ids: Union[List[int], object] = sentinel,
spaces_ids: Union[List[int], object] = sentinel,
limit: int = 20,
offset: int = 0,
) -> Response:
data = request_query_processing(locals(), ["self"])
with self.get_client() as client:
return client.get(url=self.get_url(), params=data)
def get(self, id: int) -> Response:
with self.get_client() as client:
return client.get(url=self.get_url(f"{id}"))
def update(self, id: int, name: str) -> Response:
data = {"name": name}
with self.get_client() as client:
return client.patch(url=self.get_url(f"{id}"), json=data)
def delete(self, id: int) -> Response:
with self.get_client() as client:
return client.delete(url=self.get_url(f"{id}"))
def persons(
self,
id: int,
pids: Union[List[str], object] = sentinel,
limit: int = 20,
offset: int = 0,
) -> Response:
data = request_query_processing(locals(), ["self", "id"])
with self.get_client() as client:
return client.get(url=self.get_url(f"{id}/pids"), params=data)
def add(self, pids: List[str], groups_ids: List[int]) -> Response:
data = {"pids": pids, "groups_ids": groups_ids}
with self.get_client() as client:
return client.post(url=self.get_url("pids"), json=data)
def remove(self, pids: List[str], groups_ids: List[int]) -> Response:
data = {"pids": pids, "groups_ids": groups_ids}
with self.get_client() as client:
return client.request(
"DELETE", url=self.get_url("pids"), json=data
)
Ancestors (in MRO)
- neuroio.base.APIBase
- abc.ABC
- neuroio.base.Base
- neuroio.groups.v1.GroupsBase
- neuroio.base.APIBaseBase
Class variables
base_url
Methods
add
def add(
self,
pids: List[str],
groups_ids: List[int]
) -> httpx.Response
View Source
def add(self, pids: List[str], groups_ids: List[int]) -> Response:
data = {"pids": pids, "groups_ids": groups_ids}
with self.get_client() as client:
return client.post(url=self.get_url("pids"), json=data)
create
def create(
self,
name: str
) -> httpx.Response
View Source
def create(self, name: str) -> Response:
data = {"name": name}
with self.get_client() as client:
return client.post(url=self.get_url(), json=data)
delete
def delete(
self,
id: int
) -> httpx.Response
View Source
def delete(self, id: int) -> Response:
with self.get_client() as client:
return client.delete(url=self.get_url(f"{id}"))
get
def get(
self,
id: int
) -> httpx.Response
View Source
def get(self, id: int) -> Response:
with self.get_client() as client:
return client.get(url=self.get_url(f"{id}"))
get_client
def get_client(
self
) -> httpx.Client
View Source
def get_client(self) -> Client:
return Client(**self.settings)
get_url
def get_url(
self,
key: str = None
) -> str
View Source
def get_url(self, key: str = None) -> str:
if key:
return self.base_url + f"/v1/groups/persons/{key}/"
else:
return self.base_url + "/v1/groups/persons/"
list
def list(
self,
q: Union[str, object] = <object object at 0x10c207fb0>,
pids_include: Union[List[str], object] = <object object at 0x10c207fb0>,
pids_exclude: Union[List[str], object] = <object object at 0x10c207fb0>,
groups_ids: Union[List[int], object] = <object object at 0x10c207fb0>,
spaces_ids: Union[List[int], object] = <object object at 0x10c207fb0>,
limit: int = 20,
offset: int = 0
) -> httpx.Response
View Source
def list(
self,
q: Union[str, object] = sentinel,
pids_include: Union[List[str], object] = sentinel,
pids_exclude: Union[List[str], object] = sentinel,
groups_ids: Union[List[int], object] = sentinel,
spaces_ids: Union[List[int], object] = sentinel,
limit: int = 20,
offset: int = 0,
) -> Response:
data = request_query_processing(locals(), ["self"])
with self.get_client() as client:
return client.get(url=self.get_url(), params=data)
persons
def persons(
self,
id: int,
pids: Union[List[str], object] = <object object at 0x10c207fb0>,
limit: int = 20,
offset: int = 0
) -> httpx.Response
View Source
def persons(
self,
id: int,
pids: Union[List[str], object] = sentinel,
limit: int = 20,
offset: int = 0,
) -> Response:
data = request_query_processing(locals(), ["self", "id"])
with self.get_client() as client:
return client.get(url=self.get_url(f"{id}/pids"), params=data)
remove
def remove(
self,
pids: List[str],
groups_ids: List[int]
) -> httpx.Response
View Source
def remove(self, pids: List[str], groups_ids: List[int]) -> Response:
data = {"pids": pids, "groups_ids": groups_ids}
with self.get_client() as client:
return client.request(
"DELETE", url=self.get_url("pids"), json=data
)
update
def update(
self,
id: int,
name: str
) -> httpx.Response
View Source
def update(self, id: int, name: str) -> Response:
data = {"name": name}
with self.get_client() as client:
return client.patch(url=self.get_url(f"{id}"), json=data)
ImplAsync
class ImplAsync(
settings: dict
)
View Source
class ImplAsync(APIBaseAsync, GroupsBase):
async def create(self, name: str) -> Response:
data = {"name": name}
async with self.get_client() as client:
return await client.post(url=self.get_url(), json=data)
async def list(
self,
q: Union[str, object] = sentinel,
pids_include: Union[List[str], object] = sentinel,
pids_exclude: Union[List[str], object] = sentinel,
groups_ids: Union[List[int], object] = sentinel,
spaces_ids: Union[List[int], object] = sentinel,
limit: int = 20,
offset: int = 0,
) -> Response:
data = request_query_processing(locals(), ["self"])
async with self.get_client() as client:
return await client.get(url=self.get_url(), params=data)
async def get(self, id: int) -> Response:
async with self.get_client() as client:
return await client.get(url=self.get_url(f"{id}"))
async def update(self, id: int, name: str) -> Response:
data = {"name": name}
async with self.get_client() as client:
return await client.patch(url=self.get_url(f"{id}"), json=data)
async def delete(self, id: int) -> Response:
async with self.get_client() as client:
return await client.delete(url=self.get_url(f"{id}"))
async def persons(
self,
id: int,
pids: Union[List[str], object] = sentinel,
limit: int = 20,
offset: int = 0,
) -> Response:
data = request_query_processing(locals(), ["self", "id"])
async with self.get_client() as client:
return await client.get(
url=self.get_url(f"{id}/pids"), params=data
)
async def add(self, pids: List[str], groups_ids: List[int]) -> Response:
data = {"pids": pids, "groups_ids": groups_ids}
async with self.get_client() as client:
return await client.post(url=self.get_url("pids"), json=data)
async def remove(self, pids: List[str], groups_ids: List[int]) -> Response:
data = {"pids": pids, "groups_ids": groups_ids}
async with self.get_client() as client:
return await client.request(
"DELETE", url=self.get_url("pids"), json=data
)
Ancestors (in MRO)
- neuroio.base.APIBaseAsync
- abc.ABC
- neuroio.base.Base
- neuroio.groups.v1.GroupsBase
- neuroio.base.APIBaseBase
Class variables
base_url
Methods
add
def add(
self,
pids: List[str],
groups_ids: List[int]
) -> httpx.Response
View Source
async def add(self, pids: List[str], groups_ids: List[int]) -> Response:
data = {"pids": pids, "groups_ids": groups_ids}
async with self.get_client() as client:
return await client.post(url=self.get_url("pids"), json=data)
create
def create(
self,
name: str
) -> httpx.Response
View Source
async def create(self, name: str) -> Response:
data = {"name": name}
async with self.get_client() as client:
return await client.post(url=self.get_url(), json=data)
delete
def delete(
self,
id: int
) -> httpx.Response
View Source
async def delete(self, id: int) -> Response:
async with self.get_client() as client:
return await client.delete(url=self.get_url(f"{id}"))
get
def get(
self,
id: int
) -> httpx.Response
View Source
async def get(self, id: int) -> Response:
async with self.get_client() as client:
return await client.get(url=self.get_url(f"{id}"))
get_client
def get_client(
self
) -> httpx.AsyncClient
View Source
def get_client(self) -> AsyncClient:
return AsyncClient(**self.settings)
get_url
def get_url(
self,
key: str = None
) -> str
View Source
def get_url(self, key: str = None) -> str:
if key:
return self.base_url + f"/v1/groups/persons/{key}/"
else:
return self.base_url + "/v1/groups/persons/"
list
def list(
self,
q: Union[str, object] = <object object at 0x10c207fb0>,
pids_include: Union[List[str], object] = <object object at 0x10c207fb0>,
pids_exclude: Union[List[str], object] = <object object at 0x10c207fb0>,
groups_ids: Union[List[int], object] = <object object at 0x10c207fb0>,
spaces_ids: Union[List[int], object] = <object object at 0x10c207fb0>,
limit: int = 20,
offset: int = 0
) -> httpx.Response
View Source
async def list(
self,
q: Union[str, object] = sentinel,
pids_include: Union[List[str], object] = sentinel,
pids_exclude: Union[List[str], object] = sentinel,
groups_ids: Union[List[int], object] = sentinel,
spaces_ids: Union[List[int], object] = sentinel,
limit: int = 20,
offset: int = 0,
) -> Response:
data = request_query_processing(locals(), ["self"])
async with self.get_client() as client:
return await client.get(url=self.get_url(), params=data)
persons
def persons(
self,
id: int,
pids: Union[List[str], object] = <object object at 0x10c207fb0>,
limit: int = 20,
offset: int = 0
) -> httpx.Response
View Source
async def persons(
self,
id: int,
pids: Union[List[str], object] = sentinel,
limit: int = 20,
offset: int = 0,
) -> Response:
data = request_query_processing(locals(), ["self", "id"])
async with self.get_client() as client:
return await client.get(
url=self.get_url(f"{id}/pids"), params=data
)
remove
def remove(
self,
pids: List[str],
groups_ids: List[int]
) -> httpx.Response
View Source
async def remove(self, pids: List[str], groups_ids: List[int]) -> Response:
data = {"pids": pids, "groups_ids": groups_ids}
async with self.get_client() as client:
return await client.request(
"DELETE", url=self.get_url("pids"), json=data
)
update
def update(
self,
id: int,
name: str
) -> httpx.Response
View Source
async def update(self, id: int, name: str) -> Response:
data = {"name": name}
async with self.get_client() as client:
return await client.patch(url=self.get_url(f"{id}"), json=data)