-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmoondream.py
154 lines (127 loc) · 5.21 KB
/
moondream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
from typing import ClassVar, Mapping, Sequence, Any, Dict, Optional, Tuple, Final, List, cast
from typing_extensions import Self
from typing import Any, Final, List, Mapping, Optional, Union
from PIL import Image
from viam.proto.common import PointCloudObject
from viam.proto.service.vision import Classification, Detection
from viam.resource.types import RESOURCE_NAMESPACE_RDK, RESOURCE_TYPE_SERVICE, Subtype
from viam.utils import ValueTypes
from viam.module.types import Reconfigurable
from viam.proto.app.robot import ComponentConfig
from viam.proto.common import ResourceName
from viam.resource.base import ResourceBase
from viam.resource.types import Model, ModelFamily
from viam.services.vision import Vision, CaptureAllResult
from viam.proto.service.vision import GetPropertiesResponse
from viam.components.camera import Camera, ViamImage
from viam.media.utils.pil import viam_to_pil_image
from viam.logging import getLogger
from transformers import AutoModelForCausalLM, AutoTokenizer
from PIL import Image
LOGGER = getLogger(__name__)
class moondream(Vision, Reconfigurable):
"""
Vision represents a Vision service.
"""
MODEL: ClassVar[Model] = Model(ModelFamily("viam-labs", "vision"), "moondream")
model: AutoModelForCausalLM
tokenizer = AutoTokenizer
# Constructor
@classmethod
def new(cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]) -> Self:
my_class = cls(config.name)
my_class.reconfigure(config, dependencies)
return my_class
# Validates JSON Configuration
@classmethod
def validate(cls, config: ComponentConfig):
return
# Handles attribute reconfiguration
def reconfigure(self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]):
self.DEPS = dependencies
model_id = "vikhyatk/moondream2"
revision = config.attributes.fields["revision"].string_value or "2024-08-26"
self.model = AutoModelForCausalLM.from_pretrained(
model_id, trust_remote_code=True, revision=revision
)
self.tokenizer = AutoTokenizer.from_pretrained(model_id, revision=revision)
return
async def get_cam_image(
self,
camera_name: str
) -> ViamImage:
actual_cam = self.DEPS[Camera.get_resource_name(camera_name)]
cam = cast(Camera, actual_cam)
cam_image = await cam.get_image(mime_type="image/jpeg")
return cam_image
# not implemented, use classification methods
async def get_detections_from_camera(
self, camera_name: str, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None
) -> List[Detection]:
return
# not implemented, use classification methods
async def get_detections(
self,
image: ViamImage,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
) -> List[Detection]:
return
async def get_classifications_from_camera(
self,
camera_name: str,
count: int,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
) -> List[Classification]:
return await self.get_classifications(await self.get_cam_image(camera_name), count, extra=extra)
async def get_classifications(
self,
image: ViamImage,
count: int,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
) -> List[Classification]:
classifications = []
question = "describe this image"
if extra != None and extra.get('question') != None:
question = extra['question']
enc_image = self.model.encode_image(viam_to_pil_image(image))
result = self.model.answer_question(enc_image, question, self.tokenizer)
classifications.append({"class_name": result, "confidence": 1})
return classifications
async def get_object_point_clouds(
self, camera_name: str, *, extra: Optional[Mapping[str, Any]] = None, timeout: Optional[float] = None
) -> List[PointCloudObject]:
return
async def do_command(self, command: Mapping[str, ValueTypes], *, timeout: Optional[float] = None) -> Mapping[str, ValueTypes]:
return
async def capture_all_from_camera(
self,
camera_name: str,
return_image: bool = False,
return_classifications: bool = False,
return_detections: bool = False,
return_object_point_clouds: bool = False,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
) -> CaptureAllResult:
result = CaptureAllResult()
result.image = await self.get_cam_image(camera_name)
result.classifications = await self.get_classifications(result.image, 1)
return result
async def get_properties(
self,
*,
extra: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
) -> GetPropertiesResponse:
return GetPropertiesResponse(
classifications_supported=True,
detections_supported=False,
object_point_clouds_supported=False
)