# coding=utf-8 # Copyright 2024 The HuggingFace Inc. team. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Processor class for Moondream3. """ from typing import Optional, Union import numpy as np from transformers.feature_extraction_utils import BatchFeature from transformers.image_utils import ImageInput, is_valid_image from transformers.processing_utils import ( MultiModalData, ProcessingKwargs, ProcessorMixin, Unpack, ) from transformers.tokenization_utils_base import PreTokenizedInput, TextInput from transformers.utils import is_vision_available, logging logger = logging.get_logger(__name__) class Moondream3ProcessorKwargs(ProcessingKwargs, total=False): _defaults = { "text_kwargs": { "padding": False, "return_token_type_ids": False }, "common_kwargs": { "return_tensors": "pt", }, } def _rotate_right_array(x, k: int): """ Rotate a 1D or 2D structure k steps to the right along the last axis. Supports: list, numpy.ndarray, torch.Tensor. Works even if numpy or torch are not installed. Raises TypeError for unsupported input types. """ # optional imports try: import numpy as np except ImportError: np = None try: import torch except ImportError: torch = None # torch.Tensor if torch is not None and isinstance(x, torch.Tensor): if x.size(-1) == 0: return x return torch.roll(x, shifts=k % x.size(-1), dims=-1) # numpy.ndarray if np is not None and isinstance(x, np.ndarray): if x.shape[-1] == 0: return x return np.roll(x, k % x.shape[-1], axis=-1) # python list (1D or 2D) if isinstance(x, list): if not x: # empty list return x # 2D (batch, seq) if isinstance(x[0], list): out = [] for row in x: if not row: out.append(row) continue shift = k % len(row) out.append(row[-shift:] + row[:-shift] if shift else row[:]) return out # 1D shift = k % len(x) return x[-shift:] + x[:-shift] if shift else x[:] # unsupported type raise TypeError( f"Unsupported type {type(x).__name__} for rotation. " f"Expected list, numpy.ndarray, or torch.Tensor. " f"(numpy or torch are optional dependencies)" ) # Copied from transformers.models.idefics2.processing_idefics2.is_url def is_url(val) -> bool: return isinstance(val, str) and val.startswith("http") # Copied from transformers.models.idefics2.processing_idefics2.is_image_or_image_url def is_image_or_image_url(elem): return is_url(elem) or is_valid_image(elem) class Moondream3Processor(ProcessorMixin): r""" Constructs a Moondream3 processor which wraps a Moondream3 image processor and a Moondream3 tokenizer into a single processor. [`Moondream3Processor`] offers all the functionalities of [`CLIPImageProcessor`] and [`LlamaTokenizerFast`]. See the [`~Moondream3Processor.__call__`] and [`~Moondream3Processor.decode`] for more information. Args: image_processor ([`Moondream3ImageProcessor`], *optional*): The image processor is a required input. tokenizer ([`LlamaTokenizerFast`], *optional*): The tokenizer is a required input. patch_size (`int`, *optional*, defaults to 16): Patch size from the vision tower. spatial_merge_size (`int`, *optional*, defaults to 1): The downsampling factor for the spatial merge operation. chat_template (`str`, *optional*): A Jinja template which will be used to convert lists of messages in a chat into a tokenizable string. image_token (`str`, *optional*, defaults to `"[IMG]"`): Special token used to denote image location. image_break_token (`str`, *optional*, defaults to `"[IMG_BREAK]"`): Special token used to denote the end of a line of pixels in an image. image_end_token (`str`, *optional*, defaults to `"[IMG_END]"`): Special token used to denote the end of an image input. """ attributes = ["image_processor", "tokenizer"] image_processor_class = "AutoImageProcessor" tokenizer_class = "AutoTokenizer" def __init__( self, image_processor=None, tokenizer=None, chat_template=None, image_token_id=0, **kwargs, ): self.image_token_id = image_token_id super().__init__(image_processor, tokenizer, chat_template=chat_template) def __call__( self, images: Optional[ImageInput] = None, text: Union[TextInput, PreTokenizedInput, list[TextInput], list[PreTokenizedInput]] = None, **kwargs: Unpack[Moondream3ProcessorKwargs], ) -> BatchFeature: """ Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text` and `kwargs` arguments to LlamaTokenizerFast's [`~LlamaTokenizerFast.__call__`] if `text` is not `None` to encode the text. To prepare the image(s), this method forwards the `images` and `kwargs` arguments to CLIPImageProcessor's [`~CLIPImageProcessor.__call__`] if `images` is not `None`. Please refer to the docstring of the above two methods for more information. Args: images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `list[PIL.Image.Image]`, `list[np.ndarray]`, `list[torch.Tensor]`): The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch tensor. Both channels-first and channels-last formats are supported. text (`str`, `list[str]`, `list[list[str]]`): The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings (pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set `is_split_into_words=True` (to lift the ambiguity with a batch of sequences). return_tensors (`str` or [`~utils.TensorType`], *optional*): If set, will return tensors of a particular framework. Acceptable values are: - `'pt'`: Return PyTorch `torch.Tensor` objects. - `'np'`: Return NumPy `np.ndarray` objects. Returns: [`BatchFeature`]: A [`BatchFeature`] with the following fields: - **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`. - **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when `return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not `None`). - **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`. """ output_kwargs = self._merge_kwargs( Moondream3ProcessorKwargs, tokenizer_init_kwargs=self.tokenizer.init_kwargs, **kwargs, ) if images is not None: image_inputs = self.image_processor(images, **output_kwargs["images_kwargs"]) else: image_inputs = {} if isinstance(text, str): text = [text] elif not isinstance(text, list) and not isinstance(text[0], str): raise TypeError("Invalid input text. Please provide a string, or a list of strings") # try to expand inputs in processing if we have the necessary parts prompt_strings = text return_tensors = output_kwargs["text_kwargs"].pop("return_tensors", None) text_inputs = self.tokenizer(prompt_strings, **output_kwargs["text_kwargs"], return_tensors=None) # if "input_ids" in text_inputs: # # prepend 1 bos_token_id and 729 image_token_id to the text_inputs # for i in range(len(text_inputs["input_ids"])): # prepended_tokens = [self.tokenizer.bos_token_id] + [self.image_token_id] * 729 # text_inputs["input_ids"][i] = prepended_tokens + text_inputs["input_ids"][i] # if "attention_mask" in text_inputs: # # attend to the 730 prepended tokens # for i in range(len(text_inputs["attention_mask"])): # prepended_mask = [1] * 730 # text_inputs["attention_mask"][i] = prepended_mask + text_inputs["attention_mask"][i] return BatchFeature(data={**text_inputs, **image_inputs}, tensor_type=return_tensors) # def apply_chat_template( # self, # conversation: Union[list[dict[str, str]], list[list[dict[str, str]]]], # chat_template: Optional[str] = None, # **kwargs, # ) -> str: # # Call the original behavior first # out = super().apply_chat_template( # conversation=conversation, # chat_template=chat_template, # **kwargs, # ) # # Only post-process when: # # - user requested assistant mask # # - output is a dict (tokenized + return_dict=True path) # if isinstance(out, BatchFeature) and kwargs.get("return_assistant_tokens_mask", False): # if "assistant_masks" in out and out["assistant_masks"] is not None: # out["assistant_masks"] = _rotate_right_array(out["assistant_masks"], 730) # return out @property def model_input_names(self): tokenizer_input_names = self.tokenizer.model_input_names image_processor_input_names = self.image_processor.model_input_names return tokenizer_input_names + image_processor_input_names + ["image_sizes"] __all__ = ["Moondream3Processor"]