chore(deps): update machine-learning (#6302)
* chore(deps): update machine-learning * fix typing, use new lifespan syntax * wrap in try / finally * move log --------- Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> Co-authored-by: mertalev <101130780+mertalev@users.noreply.github.com>
This commit is contained in:
@@ -3,10 +3,10 @@ from __future__ import annotations
|
||||
from pathlib import Path
|
||||
from typing import Any, NamedTuple
|
||||
|
||||
from numpy import ascontiguousarray
|
||||
import numpy as np
|
||||
from numpy.typing import NDArray
|
||||
|
||||
from ann.ann import Ann
|
||||
from app.schemas import ndarray_f32, ndarray_i32
|
||||
|
||||
from ..config import log, settings
|
||||
|
||||
@@ -56,10 +56,10 @@ class AnnSession:
|
||||
def run(
|
||||
self,
|
||||
output_names: list[str] | None,
|
||||
input_feed: dict[str, ndarray_f32] | dict[str, ndarray_i32],
|
||||
input_feed: dict[str, NDArray[np.float32]] | dict[str, NDArray[np.int32]],
|
||||
run_options: Any = None,
|
||||
) -> list[ndarray_f32]:
|
||||
inputs: list[ndarray_f32] = [ascontiguousarray(v) for v in input_feed.values()]
|
||||
) -> list[NDArray[np.float32]]:
|
||||
inputs: list[NDArray[np.float32]] = [np.ascontiguousarray(v) for v in input_feed.values()]
|
||||
return self.ann.execute(self.model, inputs)
|
||||
|
||||
|
||||
|
||||
@@ -6,12 +6,13 @@ from pathlib import Path
|
||||
from typing import Any, Literal
|
||||
|
||||
import numpy as np
|
||||
from numpy.typing import NDArray
|
||||
from PIL import Image
|
||||
from tokenizers import Encoding, Tokenizer
|
||||
|
||||
from app.config import clean_name, log
|
||||
from app.models.transforms import crop, get_pil_resampling, normalize, resize, to_numpy
|
||||
from app.schemas import ModelType, ndarray_f32, ndarray_i32
|
||||
from app.schemas import ModelType
|
||||
|
||||
from .base import InferenceModel
|
||||
|
||||
@@ -40,7 +41,7 @@ class BaseCLIPEncoder(InferenceModel):
|
||||
self.vision_model = self._make_session(self.visual_path)
|
||||
log.debug(f"Loaded clip vision model '{self.model_name}'")
|
||||
|
||||
def _predict(self, image_or_text: Image.Image | str) -> ndarray_f32:
|
||||
def _predict(self, image_or_text: Image.Image | str) -> NDArray[np.float32]:
|
||||
if isinstance(image_or_text, bytes):
|
||||
image_or_text = Image.open(BytesIO(image_or_text))
|
||||
|
||||
@@ -48,7 +49,7 @@ class BaseCLIPEncoder(InferenceModel):
|
||||
case Image.Image():
|
||||
if self.mode == "text":
|
||||
raise TypeError("Cannot encode image as text-only model")
|
||||
outputs: ndarray_f32 = self.vision_model.run(None, self.transform(image_or_text))[0][0]
|
||||
outputs: NDArray[np.float32] = self.vision_model.run(None, self.transform(image_or_text))[0][0]
|
||||
case str():
|
||||
if self.mode == "vision":
|
||||
raise TypeError("Cannot encode text as vision-only model")
|
||||
@@ -59,11 +60,11 @@ class BaseCLIPEncoder(InferenceModel):
|
||||
return outputs
|
||||
|
||||
@abstractmethod
|
||||
def tokenize(self, text: str) -> dict[str, ndarray_i32]:
|
||||
def tokenize(self, text: str) -> dict[str, NDArray[np.int32]]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def transform(self, image: Image.Image) -> dict[str, ndarray_f32]:
|
||||
def transform(self, image: Image.Image) -> dict[str, NDArray[np.float32]]:
|
||||
pass
|
||||
|
||||
@property
|
||||
@@ -161,11 +162,11 @@ class OpenCLIPEncoder(BaseCLIPEncoder):
|
||||
self.tokenizer.enable_truncation(max_length=context_length)
|
||||
log.debug(f"Loaded tokenizer for CLIP model '{self.model_name}'")
|
||||
|
||||
def tokenize(self, text: str) -> dict[str, ndarray_i32]:
|
||||
def tokenize(self, text: str) -> dict[str, NDArray[np.int32]]:
|
||||
tokens: Encoding = self.tokenizer.encode(text)
|
||||
return {"text": np.array([tokens.ids], dtype=np.int32)}
|
||||
|
||||
def transform(self, image: Image.Image) -> dict[str, ndarray_f32]:
|
||||
def transform(self, image: Image.Image) -> dict[str, NDArray[np.float32]]:
|
||||
image = resize(image, self.size)
|
||||
image = crop(image, self.size)
|
||||
image_np = to_numpy(image)
|
||||
@@ -174,7 +175,7 @@ class OpenCLIPEncoder(BaseCLIPEncoder):
|
||||
|
||||
|
||||
class MCLIPEncoder(OpenCLIPEncoder):
|
||||
def tokenize(self, text: str) -> dict[str, ndarray_i32]:
|
||||
def tokenize(self, text: str) -> dict[str, NDArray[np.int32]]:
|
||||
tokens: Encoding = self.tokenizer.encode(text)
|
||||
return {
|
||||
"input_ids": np.array([tokens.ids], dtype=np.int32),
|
||||
|
||||
@@ -5,9 +5,10 @@ import cv2
|
||||
import numpy as np
|
||||
from insightface.model_zoo import ArcFaceONNX, RetinaFace
|
||||
from insightface.utils.face_align import norm_crop
|
||||
from numpy.typing import NDArray
|
||||
|
||||
from app.config import clean_name
|
||||
from app.schemas import BoundingBox, Face, ModelType, ndarray_f32
|
||||
from app.schemas import Face, ModelType, is_ndarray
|
||||
|
||||
from .base import InferenceModel
|
||||
|
||||
@@ -36,22 +37,25 @@ class FaceRecognizer(InferenceModel):
|
||||
)
|
||||
self.rec_model.prepare(ctx_id=0)
|
||||
|
||||
def _predict(self, image: ndarray_f32 | bytes) -> list[Face]:
|
||||
def _predict(self, image: NDArray[np.uint8] | bytes) -> list[Face]:
|
||||
if isinstance(image, bytes):
|
||||
image = cv2.imdecode(np.frombuffer(image, np.uint8), cv2.IMREAD_COLOR)
|
||||
bboxes, kpss = self.det_model.detect(image)
|
||||
decoded_image = cv2.imdecode(np.frombuffer(image, np.uint8), cv2.IMREAD_COLOR)
|
||||
else:
|
||||
decoded_image = image
|
||||
assert is_ndarray(decoded_image, np.uint8)
|
||||
bboxes, kpss = self.det_model.detect(decoded_image)
|
||||
if bboxes.size == 0:
|
||||
return []
|
||||
assert isinstance(image, np.ndarray) and isinstance(kpss, np.ndarray)
|
||||
assert is_ndarray(kpss, np.float32)
|
||||
|
||||
scores = bboxes[:, 4].tolist()
|
||||
bboxes = bboxes[:, :4].round().tolist()
|
||||
|
||||
results = []
|
||||
height, width, _ = image.shape
|
||||
height, width, _ = decoded_image.shape
|
||||
for (x1, y1, x2, y2), score, kps in zip(bboxes, scores, kpss):
|
||||
cropped_img = norm_crop(image, kps)
|
||||
embedding: ndarray_f32 = self.rec_model.get_feat(cropped_img)[0]
|
||||
cropped_img = norm_crop(decoded_image, kps)
|
||||
embedding: NDArray[np.float32] = self.rec_model.get_feat(cropped_img)[0]
|
||||
face: Face = {
|
||||
"imageWidth": width,
|
||||
"imageHeight": height,
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
import numpy as np
|
||||
from numpy.typing import NDArray
|
||||
from PIL import Image
|
||||
|
||||
from app.schemas import ndarray_f32
|
||||
|
||||
_PIL_RESAMPLING_METHODS = {resampling.name.lower(): resampling for resampling in Image.Resampling}
|
||||
|
||||
|
||||
@@ -23,11 +22,13 @@ def crop(img: Image.Image, size: int) -> Image.Image:
|
||||
return img.crop((left, upper, right, lower))
|
||||
|
||||
|
||||
def to_numpy(img: Image.Image) -> ndarray_f32:
|
||||
def to_numpy(img: Image.Image) -> NDArray[np.float32]:
|
||||
return np.asarray(img.convert("RGB")).astype(np.float32) / 255.0
|
||||
|
||||
|
||||
def normalize(img: ndarray_f32, mean: float | ndarray_f32, std: float | ndarray_f32) -> ndarray_f32:
|
||||
def normalize(
|
||||
img: NDArray[np.float32], mean: float | NDArray[np.float32], std: float | NDArray[np.float32]
|
||||
) -> NDArray[np.float32]:
|
||||
return (img - mean) / std
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user