Skip to content

Commit

Permalink
Update to Python 3
Browse files Browse the repository at this point in the history
  • Loading branch information
oldnapalm authored Jul 13, 2020
1 parent a297ab9 commit 33fa62e
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 112 deletions.
8 changes: 4 additions & 4 deletions AbstractPowerCalculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def update(self, revs_per_sec):
# We just keep track of energy and time for now
self.energy += delta_energy
self.last_time = current_time
if self._DEBUG: print "cumulative_time(): " + repr(self.cumulative_time())
if self._DEBUG: print("cumulative_time(): " + repr(self.cumulative_time()))

# We only update the observer with a power reading up to twice a second, which is roughly
# as often as a crank-based power meter
Expand All @@ -45,7 +45,7 @@ def cumulative_time(self):
return self.last_time - self.init_time

def send_power(self):
if self._DEBUG: print "send_power"
if self._DEBUG: print("send_power")
timeGap = self.cumulative_time()
if timeGap == 0.0:
return
Expand All @@ -59,6 +59,6 @@ def send_power(self):
# Tell whoever is listening
if self.observer:
self.observer.update(avePower)
if self._DEBUG: print "Power: ", repr(avePower)
if self._DEBUG: print("Power: ", repr(avePower))
else:
print "Power: ", repr(avePower)
print("Power: ", repr(avePower))
24 changes: 12 additions & 12 deletions BtAtsPowerCalculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@ def __init__(self):
self.dynamic_air_density = None

def check_for_bme280_sensor(self):
print "Check for temperature/pressure/humidity sensor"
print("Check for temperature/pressure/humidity sensor")
try:
import os, sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# import ../bme280.py
import bme280
bme280.readBME280All() # The first reading after boot-up can be off, so throw it away
temperature, pressure, humidity = bme280.readBME280All()
print "Temp (C): " + repr(temperature)
print "Pressure: " + repr(pressure)
print "Humidity: " + repr(humidity)
print "Air density: " + repr(self.calc_air_density(temperature, pressure, humidity))
print("Temp (C): " + repr(temperature))
print("Pressure: " + repr(pressure))
print("Humidity: " + repr(humidity))
print("Air density: " + repr(self.calc_air_density(temperature, pressure, humidity)))
self.dynamic_air_density = True
except (ImportError, IOError) as e:
self.dynamic_air_density = False
print "Not found"
print("Not found")

A = 0.290390167
B = -0.0461311774
Expand All @@ -44,7 +44,7 @@ def check_for_bme280_sensor(self):
# Power = A * v ^ 3 + B * v ^ 2 + C * v + d
# where v is speed in revs / sec and constants A, B, C & D are as defined above.
def power_from_speed(self, revs_per_sec):
if self._DEBUG: print "power_from_speed"
if self._DEBUG: print("power_from_speed")

if self.dynamic_air_density is None:
self.check_for_bme280_sensor()
Expand All @@ -54,12 +54,12 @@ def power_from_speed(self, revs_per_sec):
import bme280
temperature, pressure, humidity = bme280.readBME280All()
if self._DEBUG:
print "Temp (C): " + repr(temperature)
print "Pressure: " + repr(pressure)
print "Humidity: " + repr(humidity)
print("Temp (C): " + repr(temperature))
print("Pressure: " + repr(pressure))
print("Humidity: " + repr(humidity))
self.update_air_density(temperature, pressure, humidity)

if self._DEBUG: print "air_density_correction: " + repr(self.air_density_correction)
if self._DEBUG: print("air_density_correction: " + repr(self.air_density_correction))
rs = revs_per_sec
power = self.correction_factor * (self.A * rs * rs * rs * self.air_density_correction +
self.B * rs * rs +
Expand All @@ -73,7 +73,7 @@ def update_air_density_correction(self):
@staticmethod
def calc_air_density(t, p, h):
_DEBUG = BtAtsPowerCalculator._DEBUG
if _DEBUG: print "set_air_density(temp=" + repr(t) + ", press=" + repr(p) + ", humi=" + repr(h) + ")"
if _DEBUG: print("set_air_density(temp=" + repr(t) + ", press=" + repr(p) + ", humi=" + repr(h) + ")")
Rd = 287.05 # Specific gas constant for dry air J / (KgK)
Rv = 461.495 # Specific gas constant for water vapour J / (KgK)
water_vapour_pressure = BtAtsPowerCalculator.saturation_pressure(t) * h / 100.0
Expand Down
2 changes: 1 addition & 1 deletion KurtKineticPowerCalculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self):
# Power = A * v ^ 3 + B * v ^ 2 + C * v + d
# where v is speed in miles/hour and constants A, B, C & D are as defined above.
def power_from_speed(self, revs_per_sec):
if self._DEBUG: print "power_from_speed"
if self._DEBUG: print("power_from_speed")

miles_per_rev = self.wheel_circumference / 1609.34
mph = revs_per_sec * 3600 * miles_per_rev
Expand Down
41 changes: 21 additions & 20 deletions PowerMeterTx.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import sys
from ant.core import message
from ant.core import message, node
from ant.core.constants import *
from ant.core.exceptions import ChannelError

from constants import *
from config import VPOWER_DEBUG
from config import NETKEY, VPOWER_DEBUG

CHANNEL_PERIOD = 8182

Expand All @@ -25,12 +25,13 @@ def __init__(self, antnode, sensor_id):
self.channel = antnode.getFreeChannel()
try:
self.channel.name = 'C:POWER'
self.channel.assign('N:ANT+', CHANNEL_TYPE_TWOWAY_TRANSMIT)
network = node.Network(NETKEY, 'N:ANT+')
self.channel.assign(network, CHANNEL_TYPE_TWOWAY_TRANSMIT)
self.channel.setID(POWER_DEVICE_TYPE, sensor_id, 0)
self.channel.setPeriod(8182)
self.channel.setFrequency(57)
self.channel.period = CHANNEL_PERIOD
self.channel.frequency = 57
except ChannelError as e:
print "Channel config error: "+e.message
print("Channel config error: " + repr(e))
self.powerData = PowerMeterTx.PowerData()

def open(self):
Expand All @@ -44,25 +45,25 @@ def unassign(self):

# Power was updated, so send out an ANT+ message
def update(self, power):
if VPOWER_DEBUG: print 'PowerMeterTx: update called with power ', power
if VPOWER_DEBUG: print('PowerMeterTx: update called with power ', power)
self.powerData.eventCount = (self.powerData.eventCount + 1) & 0xff
if VPOWER_DEBUG: print 'eventCount ', self.powerData.eventCount
if VPOWER_DEBUG: print('eventCount ', self.powerData.eventCount)
self.powerData.cumulativePower = (self.powerData.cumulativePower + int(power)) & 0xffff
if VPOWER_DEBUG: print 'cumulativePower ', self.powerData.cumulativePower
if VPOWER_DEBUG: print('cumulativePower ', self.powerData.cumulativePower)
self.powerData.instantaneousPower = int(power)
if VPOWER_DEBUG: print 'instantaneousPower ', self.powerData.instantaneousPower
if VPOWER_DEBUG: print('instantaneousPower ', self.powerData.instantaneousPower)

payload = chr(0x10) # standard power-only message
payload += chr(self.powerData.eventCount)
payload += chr(0xFF) # Pedal power not used
payload += chr(0xFF) # Cadence not used
payload += chr(self.powerData.cumulativePower & 0xff)
payload += chr(self.powerData.cumulativePower >> 8)
payload += chr(self.powerData.instantaneousPower & 0xff)
payload += chr(self.powerData.instantaneousPower >> 8)
payload = bytearray(b'\x10') # standard power-only message
payload.append(self.powerData.eventCount)
payload.append(0xFF) # Pedal power not used
payload.append(0xFF) # Cadence not used
payload.append(self.powerData.cumulativePower & 0xff)
payload.append(self.powerData.cumulativePower >> 8)
payload.append(self.powerData.instantaneousPower & 0xff)
payload.append(self.powerData.instantaneousPower >> 8)

ant_msg = message.ChannelBroadcastDataMessage(self.channel.number, data=payload)
sys.stdout.write('+')
sys.stdout.flush()
if VPOWER_DEBUG: print 'Write message to ANT stick on channel ' + repr(self.channel.number)
self.antnode.driver.write(ant_msg.encode())
if VPOWER_DEBUG: print('Write message to ANT stick on channel ' + repr(self.channel.number))
self.antnode.send(ant_msg)
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ Supported devices:

### Running from source code

* Install [Python 2](https://www.python.org/downloads/) if not already installed
* Install [Python 3](https://www.python.org/downloads/) if not already installed
* Clone or download [python-ant](https://github.com/oldnapalm/python-ant) repo
* Open Command Prompt, CD to the python-ant repo directory and run ``C:\Python27\python.exe setup.py install``
* Open Command Prompt, CD to the python-ant repo directory and run ``python setup.py install``
* Run ``pip install configparser pywin32``
* Clone or download this repo
* CD to the repo directory and run ``C:\Python27\python.exe vpower.py``
* CD to the repo directory and run ``python vpower.py``

## Troubleshooting

Expand Down
32 changes: 16 additions & 16 deletions SpeedCadenceSensorRx.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from ant.core import event
from ant.core import message
from ant.core import event, message, node
from ant.core.constants import *

from constants import *
from config import VPOWER_DEBUG
from config import NETKEY, VPOWER_DEBUG


# Receiver for Speed and/or Cadence ANT+ sensor
Expand All @@ -19,17 +18,18 @@ def __init__(self, antnode, sensor_type, sensor_id):
# Get the channel
self.channel = antnode.getFreeChannel()
self.channel.name = 'C:SPEED'
self.channel.assign('N:ANT+', CHANNEL_TYPE_TWOWAY_RECEIVE)
network = node.Network(NETKEY, 'N:ANT+')
self.channel.assign(network, CHANNEL_TYPE_TWOWAY_RECEIVE)
self.channel.setID(sensor_type, sensor_id, 0)
self.channel.setSearchTimeout(TIMEOUT_NEVER)
self.channel.searchTimeout = TIMEOUT_NEVER
if sensor_type == SPEED_DEVICE_TYPE:
period = 8118
elif sensor_type == CADENCE_DEVICE_TYPE:
period = 8102
elif sensor_type == SPEED_CADENCE_DEVICE_TYPE:
period = 8086
self.channel.setPeriod(period)
self.channel.setFrequency(57)
self.channel.period = period
self.channel.frequency = 57

def set_revs_per_sec(self, rps):
self.revsPerSec = rps
Expand All @@ -56,7 +56,7 @@ def stopped(self):
# TODO
return False

def process(self, msg):
def process(self, msg, channel):
if isinstance(msg, message.ChannelBroadcastDataMessage):
dp = None
# Get the datapage according to the configured device type
Expand All @@ -71,7 +71,7 @@ def process(self, msg):

# Parse the incoming message into a SpeedCadenceData object
message_data = SpeedCadenceData()
dp.parse(msg.getPayload(), message_data)
dp.parse(msg.data, message_data)

if VPOWER_DEBUG: message_data.print_speed()

Expand All @@ -96,7 +96,7 @@ def process(self, msg):
self.set_revs_per_sec(revs_diff / time_diff)

elif isinstance(msg, message.ChannelStatusMessage):
if msg.getStatus() == EVENT_CHANNEL_CLOSED:
if msg.status == EVENT_CHANNEL_CLOSED:
# Channel closed, re-open
open()

Expand All @@ -109,22 +109,22 @@ def __init__(self):
self.cadenceEventTime = None

def print_speed(self):
print 'speedRevCount: ', self.speedRevCount
print 'speedEventTime: ', self.speedEventTime
print('speedRevCount: ', self.speedRevCount)
print('speedEventTime: ', self.speedEventTime)

def print_cadence(self):
print 'cadenceRevCount: ', self.cadenceRevCount
print 'cadenceEventTime: ', self.cadenceEventTime
print('cadenceRevCount: ', self.cadenceRevCount)
print('cadenceEventTime: ', self.cadenceEventTime)


class DataPage(object):
@staticmethod
def parse_event_time(payload, offset):
return (ord(payload[offset+1]) | (ord(payload[offset + 2]) << 8)) / 1024.0
return (payload[offset] | (payload[offset + 1] << 8)) / 1024.0

@staticmethod
def parse_rev_count(payload, offset):
return ord(payload[offset+1]) | (ord(payload[offset + 2]) << 8)
return payload[offset] | (payload[offset + 1] << 8)


class SpeedDataPage(DataPage):
Expand Down
33 changes: 20 additions & 13 deletions bot.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#!/usr/bin/env python
import sys
import time
import win32api
import Tkinter as tk
import tkinter as tk

from ant.core import driver
from ant.core import node

from usb.core import find

from PowerMeterTx import PowerMeterTx
from config import DEBUG, LOG, NETKEY, POWER_SENSOR_ID

Expand All @@ -14,11 +17,11 @@

def on_exit(sig, func=None):
if power_meter:
print "Closing power meter"
print("Closing power meter")
power_meter.close()
power_meter.unassign()
if antnode:
print "Stopping ANT node"
print("Stopping ANT node")
antnode.stop()

win32api.SetConsoleCtrlHandler(on_exit, True)
Expand All @@ -27,20 +30,25 @@ def disable_event():
pass

try:
stick = driver.USB2Driver(None, log=LOG, debug=DEBUG)
devs = find(find_all=True)
for dev in devs:
if dev.idVendor == 0x0fcf and dev.idProduct in [0x1008, 0x1009]:
break

stick = driver.USB2Driver(log=LOG, debug=DEBUG, idProduct=dev.idProduct)
antnode = node.Node(stick)
print "Starting ANT node"
print("Starting ANT node")
antnode.start()
key = node.NetworkKey('N:ANT+', NETKEY)
key = node.Network(NETKEY, 'N:ANT+')
antnode.setNetworkKey(0, key)

print "Starting power meter with ANT+ ID " + repr(POWER_SENSOR_ID)
print("Starting power meter with ANT+ ID " + repr(POWER_SENSOR_ID))
try:
# Create the power meter object and open it
power_meter = PowerMeterTx(antnode, POWER_SENSOR_ID)
power_meter.open()
except Exception as e:
print "power_meter error: " + e.message
print("power_meter error: " + repr(e))
power_meter = None

master = tk.Tk()
Expand All @@ -52,11 +60,9 @@ def disable_event():
w = tk.Scale(master, from_=0, to=1000, length=200, orient=tk.HORIZONTAL)
w.pack()

t = 0
last = 0
power = 0

print "Main wait loop"
print("Main wait loop")
while True:
try:
power = w.get()
Expand All @@ -70,5 +76,6 @@ def disable_event():
break

except Exception as e:
print "Exception: "+repr(e)
raw_input()
print("Exception: " + repr(e))
if getattr(sys, 'frozen', False):
input()
Loading

0 comments on commit 33fa62e

Please sign in to comment.