-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathphoto_booth.py
executable file
·321 lines (248 loc) · 9.87 KB
/
photo_booth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
#!/usr/bin/python
import RPIO as GPIO
import time
import threading
import os
import subprocess
import sys
import logging
import picamera
import tweepy
import traceback
import datetime
import photo_booth_config as cfg
############################################################################
# Twitter support
############################################################################
class Twitter(object):
"""Class to connect to and send DMs/update status on twitter"""
def __init__(self):
self.twitter_api = None
self.logger = logging.getLogger(__name__)
def connect(self):
"""Initialize Twitter API Object.
Args:
None
"""
# User may not have configured twitter - don't initialize it until it's
# first used
if self.twitter_api == None:
self.logger.info("Initializing Twitter")
if cfg.TWITTER_CONSUMER_KEY == '' or cfg.TWITTER_CONSUMER_SECRET == '':
self.logger.error("Twitter customer key/secret not specified - unable to Tweet!")
elif cfg.TWITTER_ACCESS_KEY == '' or cfg.TWITTER_ACCESS_SECRET == '':
self.logger.error("Twitter access key/secret not specified - unable to Tweet!")
else:
auth = tweepy.OAuthHandler(cfg.TWITTER_CONSUMER_KEY, cfg.TWITTER_CONSUMER_SECRET)
auth.set_access_token(cfg.TWITTER_ACCESS_KEY, cfg.TWITTER_ACCESS_SECRET)
self.twitter_api = tweepy.API(auth)
def update_status(self, msg, photo_path):
# tweet a picture to my account
self.connect()
if self.twitter_api != None:
# Twitter doesn't like the same message sent multiple times, so add a time stamp
msg = time.strftime("%Y-%m-%d %H:%M:%S: ") + msg
self.logger.info("Updating Twitter status to: %s", msg)
try:
self.twitter_api.update_with_media(photo_path, status=msg)
except tweepy.error.TweepError as ex:
self.logger.error("Unable to update Twitter status: %s", ex)
############################################################################
# Base class for a button connected via GPIO
############################################################################
class Button(object):
""" Class to manage a momentary button on the Pi GPIO """
def onPressed(self, port, value):
if self.__button_port == port and value == 1:
self.logger.debug("Button %d pressed %d", port, value)
self.__button_cb(port)
def __init__(self, port, callback):
self.__button_port = port
self.__button_cb = callback
self.logger = logging.getLogger(__name__)
self.logger.debug("Initializing Button %d", port)
GPIO.setup(port, GPIO.IN)
GPIO.add_interrupt_callback(port, self.onPressed)
self.logger.debug("Button %d initialized", port)
############################################################################
# Base class for a LED connected via GPIO
############################################################################
class LED(object):
""" Class to manage a LED on the Pi GPIO """
def Set(self, state):
self.state = state
GPIO.output(self.port, state)
def On(self):
self.Set(True)
def Off(self):
self.Set(False)
def Toggle(self):
self.Set(not self.state)
def Blink(self, delay):
self.Off()
time.sleep(delay)
self.On()
time.sleep(delay)
def __init__(self, port, state=True):
self.port = port
self.logger = logging.getLogger(__name__)
self.logger.debug("Initializing LED %d", port)
GPIO.setup(port, GPIO.OUT)
self.Set(state)
self.logger.debug("LED %d initialized", port)
############################################################################
# The photo button also lights up, so it is a Button and a LED
############################################################################
class PhotoButton(Button, LED):
def __init__(self, button_port, led_port, callback):
Button.__init__(self, button_port, callback)
LED.__init__(self, led_port)
############################################################################
# The Toggle button lights up and can be toggled into ON or OFF. The button
# will remain in a state until pressed.
############################################################################
class ToggleButton(Button, LED):
def TogglePressed(self, port):
if self.__ignore != True:
self.Toggle()
self.__cb(port)
def Ignore(self):
self.__ignore = True
def Reset(self):
self.__ignore = False
self.On()
def __init__(self, button_port, led_port, callback):
self.__ignore = False
self.__cb = callback
Button.__init__(self, button_port, self.TogglePressed)
LED.__init__(self, led_port, True)
############################################################################
# StoppableThread is encapsulates a thread and an event and provides an
# readable interface to stop the thread and to tell if the thread has been
# stopped.
#
# For the Photobooth the StopableThread is used to blink a LED on / off
# separately from the main thread.
############################################################################
class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self, target):
super(StoppableThread, self).__init__(None, target)
self._stop = threading.Event()
def stop(self):
self._stop.set()
def stopped(self):
return self._stop.isSet()
############################################################################
# Main functionality
############################################################################
class PhotoBooth(object):
""" Class with main functionality for the HER photo booth """
def __init__(self):
self.logger = logging.getLogger(__name__)
def setup_camera(self):
self.camera = picamera.PiCamera()
# ToDo: move these settings into a config file
self.camera.resolution = (500, 375) # use a smaller resolution for speed
self.camera.vflip = False
self.camera.hflip = True
self.camera.saturation = 50
self.camera.brightness = 60
def take_a_picture(self):
self.Pose.On()
time.sleep(1.5)
# slow blink delay for pose
for i in range(5):
self.Pose.Blink(0.4)
# fast blink delay for pose
for i in range(5):
self.Pose.Blink(0.1)
self.Pose.Off()
self.logger.info("Taking picture %d" % (self.pictureId))
self.camera.capture('/home/pi/photobooth_images/photobooth%d.jpg' % (self.pictureId))
def take_all_pictures(self):
self.pictureId = 1
while self.pictureId < 5:
self.take_a_picture()
self.pictureId += 1
def upload_picture(self):
self.logger.info("Uploading photo")
self.pictureId = 1
self.Upload.On()
# this calls an external process, montage, to take 4 pictures and turn them into one picture laid out in a 2 by 2 grid
subprocess.call("montage /home/pi/photobooth_images/*.jpg -tile 2x2 -geometry +10+10 /home/pi/temp_montage2.jpg", shell=True)
# upload to Twitter
self.twitter.update_status("Hollow Earth Photo booth", "/home/pi/temp_montage2.jpg")
# clean up temporary files
subprocess.call("rm /home/pi/photobooth_images/*.jpg", shell=True)
subprocess.call("rm /home/pi/temp_montage*", shell=True)
# get ready for the next picture
self.Upload.Off()
self.Photo.On()
def blinkPhotoLed(self):
while self.BlinkyThread.stopped() == False:
self.Photo.Blink(.5)
self.Photo.Off()
def onPhotoPressed(self, port):
self.logger.info("Photo button pressed")
if self.BlinkyThread is not None and self.BlinkyThread.isAlive() == True:
self.BlinkyThread.stop()
self.Photo.Off()
# the Toggle button is used to switch between single shot and all-at-once
# picture taking modes.
if self.Toggle.state == True:
self.take_all_pictures()
else:
self.Toggle.Ignore()
self.take_a_picture()
self.pictureId += 1
if self.pictureId < 5:
self.BlinkyThread = StoppableThread(target=self.blinkPhotoLed)
self.BlinkyThread.start()
if self.pictureId == 5:
self.upload_picture()
self.BlinkyThread.stop()
self.Toggle.Reset()
def onTogglePressed(self, port):
self.logger.info("Toggle button pressed")
def main(self):
""" Main functionality
"""
try:
# Set up logging
log_fmt = '%(asctime)-15s %(levelname)-8s %(message)s'
log_level = logging.INFO
if sys.stdout.isatty():
# Connected to a real terminal - log to stdout
logging.basicConfig(format=log_fmt, level=log_level)
else:
# Background mode - log to a file
logging.basicConfig(format=log_fmt, level=log_level, filename=cfg.LOG_FILENAME)
# Banner
self.logger.info("=========================================================")
self.logger.info("HER Photo Booth starting")
# Use Raspberry Pi GPIO board in Broadcom mode
self.logger.info("Configuring global settings")
GPIO.setmode(GPIO.BCM)
# initialize camera and connection to Twitter
self.pictureId = 1
self.setup_camera()
self.twitter = Twitter()
# create a thread for blinking LEDs
self.BlinkyThread = StoppableThread(target=self.blinkPhotoLed)
# set the buttons and LEDs into their initial states
self.Photo = PhotoButton(cfg.PHOTO, cfg.PHOTO_LED, self.onPhotoPressed)
self.Toggle = ToggleButton(cfg.TOGGLE, cfg.TOGGLE_LED, self.onTogglePressed)
self.Pose = LED(cfg.POSE_LED, False)
self.Upload = LED(cfg.UPLOAD_LED, False)
# wait for someone to press a button
GPIO.wait_for_interrupts()
except KeyboardInterrupt:
logging.critical("Terminating due to keyboard interrupt")
except:
logging.critical("Terminating due to unexpected error: %s", sys.exc_info()[0])
logging.critical("%s", traceback.format_exc())
GPIO.cleanup()
if __name__ == "__main__":
PhotoBooth().main()