-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathobject_detection_camera.py
276 lines (224 loc) · 9.86 KB
/
object_detection_camera.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
##########################################################################################################
# #
# Object Detection (camera) From TF2 Saved Model #
# file: object_detection_image.py #
# #
# Author: Javier Goya Pérez #
# Date: January 2021 #
# #
##########################################################################################################
# This code is based on the TensorFlow2 Object Detection API tutorial
# (https://tensorflow-object-detection-api-tutorial.readthedocs.io/en/latest/auto_examples/plot_object_detection_saved_model.html#putting-everything-together)
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Suppress TensorFlow logging (1)
import tensorflow as tf
tf.get_logger().setLevel('ERROR') # Suppress TensorFlow logging (2)
import cv2
import numpy as np
import matplotlib
import warnings
warnings.filterwarnings('ignore') # Suppress Matplotlib warningsfrom collections import defaultdict
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as vis_util
# utils library from https://www.pyimagesearch.com (author: Adrian Rosebrock)
from imutils.video import VideoStream
from imutils.video import FPS
import argparse
import json
import pathlib
import time
import datetime
# utils functions for tesseract ocr plate recognition
import ocr_plate_recognition
# utils functions for db
from db import db_utils
# utils functions for gps
from gps import gps_utils
'''
Arguments
'''
# json
# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-c", "--conf", required=True, help="path to the JSON configuration file")
args = vars(ap.parse_args())
# load the configuration
conf = json.load(open(args["conf"]))
'''
GPS
'''
if (conf["use_gps"]):
gps_socket, data_stream = gps_utils.init_gps()
'''
Load model and labels
'''
print("[TF] loading model ...")
start_time = time.time()
# Load saved model and build the detection function
detect_fn = tf.saved_model.load(conf["model"])
end_time = time.time()
elapsed_time = end_time - start_time
print("[TF] model loaded ... took {} seconds".format(elapsed_time))
# Load labelmap
category_index = label_map_util.create_category_index_from_labelmap(conf["label"],
use_display_name=True)
'''
Input video
'''
# initialize the video stream and allow the camera
# sensor to warmup
vs = VideoStream(usePiCamera=conf["use_picamera"],
resolution=tuple(conf["resolution"]),
framerate=conf["fps"]).start()
print("[TF] warming up camera...")
time.sleep(conf["camera_warmup_time"])
fps = FPS().start()
# prepare variable for writer that we will use to write processed frames
writer = None
# prepare variables for spatial dimensions of the frames
h, w = None, None
print("[TF] starting video from camera ...")
'''
Prepare DB
'''
# Create (if not exists) DB connection
conn = db_utils.create_connection(conf["db"])
if conn != None:
# Create (if not exists) RECORDINGS table
db_utils.create_recordings_table(conn)
# Create (if not exists) DETECTIONS table
db_utils.create_detections_table(conn)
print("[TF] DB configured")
else:
print("[TF] error while configuring DB")
# Generate recording entry name
recording_name = datetime.datetime.now().strftime("%d%m%Y-%H%M%S")
# Insert recording into RECORDINGS table
recording_id = db_utils.insert_recording(conn, recording_name)
'''
Input video
'''
# Name for generated videofile
recording_path = conf["video_camera_output"] + "/" + recording_name + ".avi"
# variable for counting frames
f_count = 0
# variable for counting time
start_time = time.time()
# loop over frames from the video file stream
while True:
# read the next frame from the file
frame = vs.read()
# get spatial dimensions of the frame (only 1st time)
if w is None or h is None:
h, w = frame.shape[:2]
frame_np = np.array(frame)
'''
Run inference
'''
# The input needs to be a tensor, convert it using `tf.convert_to_tensor`.
input_tensor = tf.convert_to_tensor(frame_np)
# The model expects a batch of frames, so add an axis with `tf.newaxis`.
input_tensor = input_tensor[tf.newaxis, ...]
# input_tensor = np.expand_dims(frame_np, 0)
detections = detect_fn(input_tensor)
# All outputs are batches tensors.
# Convert to numpy arrays, and take index [0] to remove the batch dimension.
# We're only interested in the first num_detections.
num_detections = int(detections.pop('num_detections'))
detections = {key: value[0, :num_detections].numpy()
for key, value in detections.items()}
detections['num_detections'] = num_detections
# detection_classes should be ints.
detections['detection_classes'] = detections['detection_classes'].astype(np.int64)
scores = detections['detection_scores'] # Bounding box coordinates of detected objects
boxes = detections['detection_boxes'] # Confidence of detected objects
classes = detections['detection_classes'] # Class index of detected objects
# Apply Non Max Suppression
length = len([i for i in detections['detection_scores'] if i>conf["confidence"]])
nms_indices = tf.image.non_max_suppression(boxes, scores, length, conf["threshold"])
nms_boxes = tf.gather(boxes, nms_indices)
# dictionary for db
db_detections = {}
for i in range(len(scores)):
if ((scores[i] > conf["threshold"]) and (scores[i] <= 1.0)):
# Get bounding box coordinates and draw box
# Interpreter can return coordinates that are outside of frame dimensions
# need to force them to be within frame using max() and min()
ymin = int(max(1,(nms_boxes[i][0] * frame_height)))
xmin = int(max(1,(nms_boxes[i][1] * frame_width)))
ymax = int(min(frame_height,(nms_boxes[i][2] * frame_height)))
xmax = int(min(frame_width,(nms_boxes[i][3] * frame_width)))
cv2.rectangle(frame_np, (xmin,ymin), (xmax,ymax), (10, 255, 0), 2)
# Draw label
# Look up object name from "labels" array using class index
object_name = category_index[int(classes[i])]['name']
label = '%s: %d%%' % (object_name, int(scores[i]*100))
#label = "%s" % (object_name)
labelSize, baseLine = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2) # Get font size
label_ymin = max(ymin, labelSize[1] + 10) # Make sure not to draw label too close to top of window
cv2.rectangle(frame_np, (xmin, label_ymin-labelSize[1]-10), (xmin+labelSize[0], label_ymin+baseLine-10), (10, 255, 0), cv2.FILLED)
# Draw white box to put label text in
cv2.putText(frame_np, label, (xmin, label_ymin-7), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2) # Draw label text '''
plate_num = ""
#Extract the detected number plate
if object_name == "licence":
licence_img = frame_np[ymin:ymax, xmin:xmax]
image_h, image_w = licence_img.shape[:2]
if image_w != 0 and image_h != 0:
plate_num = ocr_plate_recognition.recognize_plate(licence_img)
cv2.putText(frame_np, plate_num, (xmin, ymax + 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (10, 255, 0), 2)
if plate_num != "":
print("[TF] licence recognition = {}".format(plate_num))
if (i-1) >= 0:
db_detections[plate_num] = category_index[int(classes[i-1])]['name']
else:
db_detections[plate_num] = category_index[int(classes[i+1])]['name']
cv2.namedWindow("Camera Detections", cv2.WINDOW_NORMAL)
cv2.imshow("Camera Detections", frame_np)
fps.update()
elapsed_time = round(time.time() - start_time, 2)
# increase counters for frames and total time
f_count += 1
'''
Write processed frame into file
'''
if writer is None:
# initialize video writer
fourcc = cv2.VideoWriter_fourcc(*conf["video_codec"])
writer = cv2.VideoWriter(recording_path, fourcc, 16, (frame.shape[1], frame.shape[0]), True)
# write processed current frame to the file
writer.write(frame)
'''
Insert into DB
'''
# get gps position
gps_lat = gps_lon = 0
if (conf["use_gps"]):
gps_lat, gps_lon = gps_utils.get_position(gps_socket, data_stream)
# get detection time
detection_datetime = datetime.datetime.now().strftime("%d%m%Y-%H%M%S")
# add entry to db
for key, value in db_detections.items():
detection = (recording_id, value, key, gps_lat, gps_lon, elapsed_time, f_count, detection_datetime)
db_utils.insert_detection(conn, detection)
'''
Break from loop
'''
key = cv2.waitKey(1) & 0xFF
# if the "Esc" key was pressed, break from the loop
if key == (ord("q")) or key == 27:
break
'''
Finish
'''
end_time = time.time()
# stop the timer and display FPS information
fps.stop()
print("[TF] elasped time: {:.2f}".format(fps.elapsed()))
print("[TF] approx. FPS: {:.2f}".format(fps.fps()))
# do a bit of cleanup
print("[TF] cleaning up...")
# release video reader and writer
cv2.destroyAllWindows()
vs.stop()
writer.release()