Skip to content

Module neuroio.persons.v1

None

None

View Source
from typing import Union

from httpx import Response

from neuroio.base import APIBase, APIBaseAsync, APIBaseBase

from neuroio.constants import EntryResult, sentinel

from neuroio.utils import (

    ImageType,

    prepare_image_processing,

    request_dict_processing,

    request_form_processing,

)

class PersonsBase(APIBaseBase):

    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/persons/{key}/"

        else:

            return self.base_url + "/v1/persons/"

class Impl(APIBase, PersonsBase):

    def create(

        self,

        image: ImageType,

        source: str,

        facesize: Union[int, object] = sentinel,

        create_on_ha: Union[bool, object] = sentinel,

        create_on_junk: Union[bool, object] = sentinel,

        identify_asm: Union[bool, object] = sentinel,

    ) -> Response:

        data = request_form_processing(locals(), ["self", "image"])

        files = prepare_image_processing(image)

        with self.get_client() as client:

            return client.post(url=self.get_url(), data=data, files=files)

    def create_by_entry(

        self, id: int, create_on_ha: bool, create_on_junk: bool

    ) -> Response:

        data = request_dict_processing(locals(), ["self"])

        with self.get_client() as client:

            return client.post(url=self.get_url("entry"), json=data)

    def reinit(self, id: int) -> Response:

        with self.get_client() as client:

            return client.post(url=self.get_url("reinit"), json={"id": id})

    def reinit_by_photo(

        self,

        pid: str,

        image: ImageType,

        source: str,

        facesize: Union[int, object] = sentinel,

        identify_asm: Union[bool, object] = sentinel,

        result: str = EntryResult.HA,

    ) -> Response:

        data = request_form_processing(locals())

        files = prepare_image_processing(image)

        with self.get_client() as client:

            return client.post(

                url=self.get_url(f"reinit/{pid}"), data=data, files=files

            )

    def search(

        self,

        image: ImageType,

        identify_asm: bool = False,

    ) -> Response:

        files = prepare_image_processing(image)

        data = {"identify_asm": str(identify_asm)}

        with self.get_client() as client:

            return client.post(

                url=self.get_url("search"), data=data, files=files

            )

    def delete(self, pid: str) -> Response:

        with self.get_client() as client:

            return client.delete(url=self.get_url(f"{pid}"))

class ImplAsync(APIBaseAsync, PersonsBase):

    async def create(

        self,

        image: ImageType,

        source: str,

        facesize: Union[int, object] = sentinel,

        create_on_ha: Union[bool, object] = sentinel,

        create_on_junk: Union[bool, object] = sentinel,

        identify_asm: Union[bool, object] = sentinel,

    ) -> Response:

        data = request_form_processing(locals(), ["self", "image"])

        files = prepare_image_processing(image)

        async with self.get_client() as client:

            return await client.post(

                url=self.get_url(), data=data, files=files

            )

    async def create_by_entry(

        self, id: int, create_on_ha: bool, create_on_junk: bool

    ) -> Response:

        data = request_dict_processing(locals(), ["self"])

        async with self.get_client() as client:

            return await client.post(url=self.get_url("entry"), json=data)

    async def reinit(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.post(

                url=self.get_url("reinit"), json={"id": id}

            )

    async def reinit_by_photo(

        self,

        pid: str,

        image: ImageType,

        source: str,

        facesize: Union[int, object] = sentinel,

        identify_asm: Union[bool, object] = sentinel,

        result: str = EntryResult.HA,

    ) -> Response:

        data = request_form_processing(locals(), ["self", "image", "pid"])

        files = prepare_image_processing(image)

        async with self.get_client() as client:

            return await client.post(

                url=self.get_url(f"reinit/{pid}"), data=data, files=files

            )

    async def search(

        self,

        image: ImageType,

        identify_asm: bool = False,

    ) -> Response:

        files = prepare_image_processing(image)

        data = {"identify_asm": str(identify_asm)}

        async with self.get_client() as client:

            return await client.post(

                url=self.get_url("search"), data=data, files=files

            )

    async def delete(self, pid: str) -> Response:

        async with self.get_client() as client:

            return await client.delete(url=self.get_url(f"{pid}"))

Variables

sentinel

Classes

Impl

class Impl(
    settings: dict
)
View Source
class Impl(APIBase, PersonsBase):

    def create(

        self,

        image: ImageType,

        source: str,

        facesize: Union[int, object] = sentinel,

        create_on_ha: Union[bool, object] = sentinel,

        create_on_junk: Union[bool, object] = sentinel,

        identify_asm: Union[bool, object] = sentinel,

    ) -> Response:

        data = request_form_processing(locals(), ["self", "image"])

        files = prepare_image_processing(image)

        with self.get_client() as client:

            return client.post(url=self.get_url(), data=data, files=files)

    def create_by_entry(

        self, id: int, create_on_ha: bool, create_on_junk: bool

    ) -> Response:

        data = request_dict_processing(locals(), ["self"])

        with self.get_client() as client:

            return client.post(url=self.get_url("entry"), json=data)

    def reinit(self, id: int) -> Response:

        with self.get_client() as client:

            return client.post(url=self.get_url("reinit"), json={"id": id})

    def reinit_by_photo(

        self,

        pid: str,

        image: ImageType,

        source: str,

        facesize: Union[int, object] = sentinel,

        identify_asm: Union[bool, object] = sentinel,

        result: str = EntryResult.HA,

    ) -> Response:

        data = request_form_processing(locals())

        files = prepare_image_processing(image)

        with self.get_client() as client:

            return client.post(

                url=self.get_url(f"reinit/{pid}"), data=data, files=files

            )

    def search(

        self,

        image: ImageType,

        identify_asm: bool = False,

    ) -> Response:

        files = prepare_image_processing(image)

        data = {"identify_asm": str(identify_asm)}

        with self.get_client() as client:

            return client.post(

                url=self.get_url("search"), data=data, files=files

            )

    def delete(self, pid: str) -> Response:

        with self.get_client() as client:

            return client.delete(url=self.get_url(f"{pid}"))

Ancestors (in MRO)

  • neuroio.base.APIBase
  • abc.ABC
  • neuroio.base.Base
  • neuroio.persons.v1.PersonsBase
  • neuroio.base.APIBaseBase

Class variables

base_url

Methods

create

def create(
    self,
    image: Union[BinaryIO, Tuple[str, _io.BytesIO], bytes],
    source: str,
    facesize: Union[int, object] = <object object at 0x10c207fb0>,
    create_on_ha: Union[bool, object] = <object object at 0x10c207fb0>,
    create_on_junk: Union[bool, object] = <object object at 0x10c207fb0>,
    identify_asm: Union[bool, object] = <object object at 0x10c207fb0>
) -> httpx.Response
View Source
    def create(

        self,

        image: ImageType,

        source: str,

        facesize: Union[int, object] = sentinel,

        create_on_ha: Union[bool, object] = sentinel,

        create_on_junk: Union[bool, object] = sentinel,

        identify_asm: Union[bool, object] = sentinel,

    ) -> Response:

        data = request_form_processing(locals(), ["self", "image"])

        files = prepare_image_processing(image)

        with self.get_client() as client:

            return client.post(url=self.get_url(), data=data, files=files)

create_by_entry

def create_by_entry(
    self,
    id: int,
    create_on_ha: bool,
    create_on_junk: bool
) -> httpx.Response
View Source
    def create_by_entry(

        self, id: int, create_on_ha: bool, create_on_junk: bool

    ) -> Response:

        data = request_dict_processing(locals(), ["self"])

        with self.get_client() as client:

            return client.post(url=self.get_url("entry"), json=data)

delete

def delete(
    self,
    pid: str
) -> httpx.Response
View Source
    def delete(self, pid: str) -> Response:

        with self.get_client() as client:

            return client.delete(url=self.get_url(f"{pid}"))

get_client

def get_client(
    self
) -> httpx.Client
View Source
    def get_client(self) -> Client:

        return Client(**self.settings)

get_url

def get_url(
    self,
    key: str = None
) -> str
View Source
    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/persons/{key}/"

        else:

            return self.base_url + "/v1/persons/"

reinit

def reinit(
    self,
    id: int
) -> httpx.Response
View Source
    def reinit(self, id: int) -> Response:

        with self.get_client() as client:

            return client.post(url=self.get_url("reinit"), json={"id": id})

reinit_by_photo

def reinit_by_photo(
    self,
    pid: str,
    image: Union[BinaryIO, Tuple[str, _io.BytesIO], bytes],
    source: str,
    facesize: Union[int, object] = <object object at 0x10c207fb0>,
    identify_asm: Union[bool, object] = <object object at 0x10c207fb0>,
    result: str = <EntryResult.HA: 'ha'>
) -> httpx.Response
View Source
    def reinit_by_photo(

        self,

        pid: str,

        image: ImageType,

        source: str,

        facesize: Union[int, object] = sentinel,

        identify_asm: Union[bool, object] = sentinel,

        result: str = EntryResult.HA,

    ) -> Response:

        data = request_form_processing(locals())

        files = prepare_image_processing(image)

        with self.get_client() as client:

            return client.post(

                url=self.get_url(f"reinit/{pid}"), data=data, files=files

            )
def search(
    self,
    image: Union[BinaryIO, Tuple[str, _io.BytesIO], bytes],
    identify_asm: bool = False
) -> httpx.Response
View Source
    def search(

        self,

        image: ImageType,

        identify_asm: bool = False,

    ) -> Response:

        files = prepare_image_processing(image)

        data = {"identify_asm": str(identify_asm)}

        with self.get_client() as client:

            return client.post(

                url=self.get_url("search"), data=data, files=files

            )

ImplAsync

class ImplAsync(
    settings: dict
)
View Source
class ImplAsync(APIBaseAsync, PersonsBase):

    async def create(

        self,

        image: ImageType,

        source: str,

        facesize: Union[int, object] = sentinel,

        create_on_ha: Union[bool, object] = sentinel,

        create_on_junk: Union[bool, object] = sentinel,

        identify_asm: Union[bool, object] = sentinel,

    ) -> Response:

        data = request_form_processing(locals(), ["self", "image"])

        files = prepare_image_processing(image)

        async with self.get_client() as client:

            return await client.post(

                url=self.get_url(), data=data, files=files

            )

    async def create_by_entry(

        self, id: int, create_on_ha: bool, create_on_junk: bool

    ) -> Response:

        data = request_dict_processing(locals(), ["self"])

        async with self.get_client() as client:

            return await client.post(url=self.get_url("entry"), json=data)

    async def reinit(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.post(

                url=self.get_url("reinit"), json={"id": id}

            )

    async def reinit_by_photo(

        self,

        pid: str,

        image: ImageType,

        source: str,

        facesize: Union[int, object] = sentinel,

        identify_asm: Union[bool, object] = sentinel,

        result: str = EntryResult.HA,

    ) -> Response:

        data = request_form_processing(locals(), ["self", "image", "pid"])

        files = prepare_image_processing(image)

        async with self.get_client() as client:

            return await client.post(

                url=self.get_url(f"reinit/{pid}"), data=data, files=files

            )

    async def search(

        self,

        image: ImageType,

        identify_asm: bool = False,

    ) -> Response:

        files = prepare_image_processing(image)

        data = {"identify_asm": str(identify_asm)}

        async with self.get_client() as client:

            return await client.post(

                url=self.get_url("search"), data=data, files=files

            )

    async def delete(self, pid: str) -> Response:

        async with self.get_client() as client:

            return await client.delete(url=self.get_url(f"{pid}"))

Ancestors (in MRO)

  • neuroio.base.APIBaseAsync
  • abc.ABC
  • neuroio.base.Base
  • neuroio.persons.v1.PersonsBase
  • neuroio.base.APIBaseBase

Class variables

base_url

Methods

create

def create(
    self,
    image: Union[BinaryIO, Tuple[str, _io.BytesIO], bytes],
    source: str,
    facesize: Union[int, object] = <object object at 0x10c207fb0>,
    create_on_ha: Union[bool, object] = <object object at 0x10c207fb0>,
    create_on_junk: Union[bool, object] = <object object at 0x10c207fb0>,
    identify_asm: Union[bool, object] = <object object at 0x10c207fb0>
) -> httpx.Response
View Source
    async def create(

        self,

        image: ImageType,

        source: str,

        facesize: Union[int, object] = sentinel,

        create_on_ha: Union[bool, object] = sentinel,

        create_on_junk: Union[bool, object] = sentinel,

        identify_asm: Union[bool, object] = sentinel,

    ) -> Response:

        data = request_form_processing(locals(), ["self", "image"])

        files = prepare_image_processing(image)

        async with self.get_client() as client:

            return await client.post(

                url=self.get_url(), data=data, files=files

            )

create_by_entry

def create_by_entry(
    self,
    id: int,
    create_on_ha: bool,
    create_on_junk: bool
) -> httpx.Response
View Source
    async def create_by_entry(

        self, id: int, create_on_ha: bool, create_on_junk: bool

    ) -> Response:

        data = request_dict_processing(locals(), ["self"])

        async with self.get_client() as client:

            return await client.post(url=self.get_url("entry"), json=data)

delete

def delete(
    self,
    pid: str
) -> httpx.Response
View Source
    async def delete(self, pid: str) -> Response:

        async with self.get_client() as client:

            return await client.delete(url=self.get_url(f"{pid}"))

get_client

def get_client(
    self
) -> httpx.AsyncClient
View Source
    def get_client(self) -> AsyncClient:

        return AsyncClient(**self.settings)

get_url

def get_url(
    self,
    key: str = None
) -> str
View Source
    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/persons/{key}/"

        else:

            return self.base_url + "/v1/persons/"

reinit

def reinit(
    self,
    id: int
) -> httpx.Response
View Source
    async def reinit(self, id: int) -> Response:

        async with self.get_client() as client:

            return await client.post(

                url=self.get_url("reinit"), json={"id": id}

            )

reinit_by_photo

def reinit_by_photo(
    self,
    pid: str,
    image: Union[BinaryIO, Tuple[str, _io.BytesIO], bytes],
    source: str,
    facesize: Union[int, object] = <object object at 0x10c207fb0>,
    identify_asm: Union[bool, object] = <object object at 0x10c207fb0>,
    result: str = <EntryResult.HA: 'ha'>
) -> httpx.Response
View Source
    async def reinit_by_photo(

        self,

        pid: str,

        image: ImageType,

        source: str,

        facesize: Union[int, object] = sentinel,

        identify_asm: Union[bool, object] = sentinel,

        result: str = EntryResult.HA,

    ) -> Response:

        data = request_form_processing(locals(), ["self", "image", "pid"])

        files = prepare_image_processing(image)

        async with self.get_client() as client:

            return await client.post(

                url=self.get_url(f"reinit/{pid}"), data=data, files=files

            )

search

def search(
    self,
    image: Union[BinaryIO, Tuple[str, _io.BytesIO], bytes],
    identify_asm: bool = False
) -> httpx.Response
View Source
    async def search(

        self,

        image: ImageType,

        identify_asm: bool = False,

    ) -> Response:

        files = prepare_image_processing(image)

        data = {"identify_asm": str(identify_asm)}

        async with self.get_client() as client:

            return await client.post(

                url=self.get_url("search"), data=data, files=files

            )

PersonsBase

class PersonsBase(
    /,
    *args,
    **kwargs
)
View Source
class PersonsBase(APIBaseBase):

    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/persons/{key}/"

        else:

            return self.base_url + "/v1/persons/"

Ancestors (in MRO)

  • neuroio.base.APIBaseBase

Descendants

  • neuroio.persons.v1.Impl
  • neuroio.persons.v1.ImplAsync

Class variables

base_url

Methods

get_url

def get_url(
    self,
    key: str = None
) -> str
View Source
    def get_url(self, key: str = None) -> str:

        if key:

            return self.base_url + f"/v1/persons/{key}/"

        else:

            return self.base_url + "/v1/persons/"