-
-
Notifications
You must be signed in to change notification settings - Fork 238
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Cedric PAILLE
committed
Oct 3, 2017
1 parent
3ba56d9
commit de07c13
Showing
2 changed files
with
82 additions
and
117 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,6 @@ | |
# | ||
# Initial patch to use ctypes by Giovanni Bajo <[email protected]> | ||
|
||
# pylint: disable=invalid-name,too-few-public-methods | ||
import ctypes | ||
import time | ||
from serial import win32 | ||
|
@@ -25,10 +24,11 @@ class Serial(SerialBase): | |
9600, 19200, 38400, 57600, 115200) | ||
|
||
def __init__(self, *args, **kwargs): | ||
super(SerialBase, self).__init__() | ||
self._port_handle = None | ||
self._overlapped_read = None | ||
self._overlapped_write = None | ||
super(Serial, self).__init__(*args, **kwargs) | ||
SerialBase.__init__(self, *args, **kwargs) | ||
|
||
def open(self): | ||
"""\ | ||
|
@@ -50,16 +50,16 @@ def open(self): | |
# for like COMnotanumber | ||
pass | ||
self._port_handle = win32.CreateFile( | ||
port, | ||
win32.GENERIC_READ | win32.GENERIC_WRITE, | ||
0, # exclusive access | ||
None, # no security | ||
win32.OPEN_EXISTING, | ||
win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED, | ||
0) | ||
port, | ||
win32.GENERIC_READ | win32.GENERIC_WRITE, | ||
0, # exclusive access | ||
None, # no security | ||
win32.OPEN_EXISTING, | ||
win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED, | ||
0) | ||
if self._port_handle == win32.INVALID_HANDLE_VALUE: | ||
self._port_handle = None # 'cause __del__ is called anyway | ||
raise SerialException("could not open port {!r}: {!r}".format(self.portstr, ctypes.WinError())) | ||
raise SerialException("could not open port %r: %r" % (self.portstr, ctypes.WinError())) | ||
|
||
try: | ||
self._overlapped_read = win32.OVERLAPPED() | ||
|
@@ -80,9 +80,9 @@ def open(self): | |
# Clear buffers: | ||
# Remove anything that was there | ||
win32.PurgeComm( | ||
self._port_handle, | ||
win32.PURGE_TXCLEAR | win32.PURGE_TXABORT | | ||
win32.PURGE_RXCLEAR | win32.PURGE_RXABORT) | ||
self._port_handle, | ||
win32.PURGE_TXCLEAR | win32.PURGE_TXABORT | | ||
win32.PURGE_RXCLEAR | win32.PURGE_RXABORT) | ||
except: | ||
try: | ||
self._close() | ||
|
@@ -140,7 +140,7 @@ def _reconfigure_port(self): | |
elif self._bytesize == serial.EIGHTBITS: | ||
comDCB.ByteSize = 8 | ||
else: | ||
raise ValueError("Unsupported number of data bits: {!r}".format(self._bytesize)) | ||
raise ValueError("Unsupported number of data bits: %r" % self._bytesize) | ||
|
||
if self._parity == serial.PARITY_NONE: | ||
comDCB.Parity = win32.NOPARITY | ||
|
@@ -158,7 +158,7 @@ def _reconfigure_port(self): | |
comDCB.Parity = win32.SPACEPARITY | ||
comDCB.fParity = 1 # Enable Parity Check | ||
else: | ||
raise ValueError("Unsupported parity mode: {!r}".format(self._parity)) | ||
raise ValueError("Unsupported parity mode: %r" % self._parity) | ||
|
||
if self._stopbits == serial.STOPBITS_ONE: | ||
comDCB.StopBits = win32.ONESTOPBIT | ||
|
@@ -167,7 +167,7 @@ def _reconfigure_port(self): | |
elif self._stopbits == serial.STOPBITS_TWO: | ||
comDCB.StopBits = win32.TWOSTOPBITS | ||
else: | ||
raise ValueError("Unsupported number of stop bits: {!r}".format(self._stopbits)) | ||
raise ValueError("Unsupported number of stop bits: %r" % self._stopbits) | ||
|
||
comDCB.fBinary = 1 # Enable Binary Transmission | ||
# Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE) | ||
|
@@ -182,24 +182,24 @@ def _reconfigure_port(self): | |
# XXX verify if platform really does not have a setting for those | ||
if not self._rs485_mode.rts_level_for_tx: | ||
raise ValueError( | ||
'Unsupported value for RS485Settings.rts_level_for_tx: {!r}'.format( | ||
self._rs485_mode.rts_level_for_tx,)) | ||
'Unsupported value for RS485Settings.rts_level_for_tx: %r' % ( | ||
self._rs485_mode.rts_level_for_tx,)) | ||
if self._rs485_mode.rts_level_for_rx: | ||
raise ValueError( | ||
'Unsupported value for RS485Settings.rts_level_for_rx: {!r}'.format( | ||
self._rs485_mode.rts_level_for_rx,)) | ||
'Unsupported value for RS485Settings.rts_level_for_rx: %r' % ( | ||
self._rs485_mode.rts_level_for_rx,)) | ||
if self._rs485_mode.delay_before_tx is not None: | ||
raise ValueError( | ||
'Unsupported value for RS485Settings.delay_before_tx: {!r}'.format( | ||
self._rs485_mode.delay_before_tx,)) | ||
'Unsupported value for RS485Settings.delay_before_tx: %r' % ( | ||
self._rs485_mode.delay_before_tx,)) | ||
if self._rs485_mode.delay_before_rx is not None: | ||
raise ValueError( | ||
'Unsupported value for RS485Settings.delay_before_rx: {!r}'.format( | ||
self._rs485_mode.delay_before_rx,)) | ||
'Unsupported value for RS485Settings.delay_before_rx: %r' % ( | ||
self._rs485_mode.delay_before_rx,)) | ||
if self._rs485_mode.loopback: | ||
raise ValueError( | ||
'Unsupported value for RS485Settings.loopback: {!r}'.format( | ||
self._rs485_mode.loopback,)) | ||
'Unsupported value for RS485Settings.loopback: %r' % ( | ||
self._rs485_mode.loopback,)) | ||
comDCB.fRtsControl = win32.RTS_CONTROL_TOGGLE | ||
comDCB.fOutxCtsFlow = 0 | ||
|
||
|
@@ -217,27 +217,24 @@ def _reconfigure_port(self): | |
comDCB.XoffChar = serial.XOFF | ||
|
||
if not win32.SetCommState(self._port_handle, ctypes.byref(comDCB)): | ||
raise SerialException( | ||
'Cannot configure port, something went wrong. ' | ||
'Original message: {!r}'.format(ctypes.WinError())) | ||
raise SerialException("Cannot configure port, something went wrong. Original message: %r" % ctypes.WinError()) | ||
|
||
#~ def __del__(self): | ||
#~ self.close() | ||
|
||
def _close(self): | ||
"""internal close port helper""" | ||
if self._port_handle is not None: | ||
if self._port_handle: | ||
# Restore original timeout values: | ||
win32.SetCommTimeouts(self._port_handle, self._orgTimeouts) | ||
# Close COM-Port: | ||
win32.CloseHandle(self._port_handle) | ||
if self._overlapped_read is not None: | ||
self.cancel_read() | ||
win32.CloseHandle(self._overlapped_read.hEvent) | ||
self._overlapped_read = None | ||
if self._overlapped_write is not None: | ||
self.cancel_write() | ||
win32.CloseHandle(self._overlapped_write.hEvent) | ||
self._overlapped_write = None | ||
win32.CloseHandle(self._port_handle) | ||
self._port_handle = None | ||
|
||
def close(self): | ||
|
@@ -261,46 +258,42 @@ def read(self, size=1): | |
"""\ | ||
Read size bytes from the serial port. If a timeout is set it may | ||
return less characters as requested. With no timeout it will block | ||
until the requested number of bytes is read. | ||
""" | ||
if not self.is_open: | ||
until the requested number of bytes is read.""" | ||
if not self._port_handle: | ||
raise portNotOpenError | ||
if size > 0: | ||
win32.ResetEvent(self._overlapped_read.hEvent) | ||
flags = win32.DWORD() | ||
comstat = win32.COMSTAT() | ||
if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)): | ||
raise SerialException("ClearCommError failed ({!r})".format(ctypes.WinError())) | ||
n = min(comstat.cbInQue, size) if self.timeout == 0 else size | ||
if n > 0: | ||
buf = ctypes.create_string_buffer(n) | ||
raise SerialException('call to ClearCommError failed') | ||
if self.timeout == 0: | ||
n = min(comstat.cbInQue, size) | ||
if n > 0: | ||
buf = ctypes.create_string_buffer(n) | ||
rc = win32.DWORD() | ||
read_ok = win32.ReadFile(self._port_handle, buf, n, ctypes.byref(rc), ctypes.byref(self._overlapped_read)) | ||
if not read_ok and win32.GetLastError() not in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING): | ||
raise SerialException("ReadFile failed (%r)" % ctypes.WinError()) | ||
win32.GetOverlappedResult(self._port_handle, ctypes.byref(self._overlapped_read), ctypes.byref(rc), True) | ||
read = buf.raw[:rc.value] | ||
else: | ||
read = bytes() | ||
else: | ||
buf = ctypes.create_string_buffer(size) | ||
rc = win32.DWORD() | ||
read_ok = win32.ReadFile( | ||
self._port_handle, | ||
buf, | ||
n, | ||
ctypes.byref(rc), | ||
ctypes.byref(self._overlapped_read)) | ||
read_ok = win32.ReadFile(self._port_handle, buf, size, ctypes.byref(rc), ctypes.byref(self._overlapped_read)) | ||
if not read_ok and win32.GetLastError() not in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING): | ||
raise SerialException("ReadFile failed ({!r})".format(ctypes.WinError())) | ||
result_ok = win32.GetOverlappedResult( | ||
self._port_handle, | ||
ctypes.byref(self._overlapped_read), | ||
ctypes.byref(rc), | ||
True) | ||
if not result_ok: | ||
if win32.GetLastError() != win32.ERROR_OPERATION_ABORTED: | ||
raise SerialException("GetOverlappedResult failed ({!r})".format(ctypes.WinError())) | ||
raise SerialException("ReadFile failed (%r)" % ctypes.WinError()) | ||
win32.GetOverlappedResult(self._port_handle, ctypes.byref(self._overlapped_read), ctypes.byref(rc), True) | ||
read = buf.raw[:rc.value] | ||
else: | ||
read = bytes() | ||
else: | ||
read = bytes() | ||
return bytes(read) | ||
|
||
def write(self, data): | ||
"""Output the given byte string over the serial port.""" | ||
if not self.is_open: | ||
if not self._port_handle: | ||
raise portNotOpenError | ||
#~ if not isinstance(data, (bytes, bytearray)): | ||
#~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data))) | ||
|
@@ -309,29 +302,16 @@ def write(self, data): | |
if data: | ||
#~ win32event.ResetEvent(self._overlapped_write.hEvent) | ||
n = win32.DWORD() | ||
success = win32.WriteFile(self._port_handle, data, len(data), ctypes.byref(n), self._overlapped_write) | ||
err = win32.WriteFile(self._port_handle, data, len(data), ctypes.byref(n), self._overlapped_write) | ||
if not err and win32.GetLastError() != win32.ERROR_IO_PENDING: | ||
raise SerialException("WriteFile failed (%r)" % ctypes.WinError()) | ||
if self._write_timeout != 0: # if blocking (None) or w/ write timeout (>0) | ||
if not success and win32.GetLastError() != win32.ERROR_IO_PENDING: | ||
raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError())) | ||
|
||
# Wait for the write to complete. | ||
#~ win32.WaitForSingleObject(self._overlapped_write.hEvent, win32.INFINITE) | ||
win32.GetOverlappedResult(self._port_handle, self._overlapped_write, ctypes.byref(n), True) | ||
if win32.GetLastError() == win32.ERROR_OPERATION_ABORTED: | ||
return n.value # canceled IO is no error | ||
err = win32.GetOverlappedResult(self._port_handle, self._overlapped_write, ctypes.byref(n), True) | ||
if n.value != len(data): | ||
raise writeTimeoutError | ||
return n.value | ||
else: | ||
errorcode = win32.ERROR_SUCCESS if success else win32.GetLastError() | ||
if errorcode in (win32.ERROR_INVALID_USER_BUFFER, win32.ERROR_NOT_ENOUGH_MEMORY, | ||
win32.ERROR_OPERATION_ABORTED): | ||
return 0 | ||
elif errorcode in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING): | ||
# no info on true length provided by OS function in async mode | ||
return len(data) | ||
else: | ||
raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError())) | ||
return n.value | ||
else: | ||
return 0 | ||
|
||
|
@@ -343,12 +323,12 @@ def flush(self): | |
while self.out_waiting: | ||
time.sleep(0.05) | ||
# XXX could also use WaitCommEvent with mask EV_TXEMPTY, but it would | ||
# require overlapped IO and it's also only possible to set a single mask | ||
# require overlapped IO and its also only possible to set a single mask | ||
# on the port--- | ||
|
||
def reset_input_buffer(self): | ||
"""Clear input buffer, discarding all that is in the buffer.""" | ||
if not self.is_open: | ||
if not self._port_handle: | ||
raise portNotOpenError | ||
win32.PurgeComm(self._port_handle, win32.PURGE_RXCLEAR | win32.PURGE_RXABORT) | ||
|
||
|
@@ -357,13 +337,13 @@ def reset_output_buffer(self): | |
Clear output buffer, aborting the current output and discarding all | ||
that is in the buffer. | ||
""" | ||
if not self.is_open: | ||
if not self._port_handle: | ||
raise portNotOpenError | ||
win32.PurgeComm(self._port_handle, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT) | ||
|
||
def _update_break_state(self): | ||
"""Set break: Controls TXD. When active, to transmitting is possible.""" | ||
if not self.is_open: | ||
if not self._port_handle: | ||
raise portNotOpenError | ||
if self._break_state: | ||
win32.SetCommBreak(self._port_handle) | ||
|
@@ -385,7 +365,7 @@ def _update_dtr_state(self): | |
win32.EscapeCommFunction(self._port_handle, win32.CLRDTR) | ||
|
||
def _GetCommModemStatus(self): | ||
if not self.is_open: | ||
if not self._port_handle: | ||
raise portNotOpenError | ||
stat = win32.DWORD() | ||
win32.GetCommModemStatus(self._port_handle, ctypes.byref(stat)) | ||
|
@@ -416,7 +396,7 @@ def cd(self): | |
def set_buffer_size(self, rx_size=4096, tx_size=None): | ||
"""\ | ||
Recommend a buffer size to the driver (device driver can ignore this | ||
value). Must be called before the port is opened. | ||
value). Must be called before the port is opended. | ||
""" | ||
if tx_size is None: | ||
tx_size = rx_size | ||
|
@@ -429,7 +409,7 @@ def set_output_flow_control(self, enable=True): | |
from the other device and control the transmission accordingly. | ||
WARNING: this function is not portable to different platforms! | ||
""" | ||
if not self.is_open: | ||
if not self._port_handle: | ||
raise portNotOpenError | ||
if enable: | ||
win32.EscapeCommFunction(self._port_handle, win32.SETXON) | ||
|
@@ -445,23 +425,19 @@ def out_waiting(self): | |
raise SerialException('call to ClearCommError failed') | ||
return comstat.cbOutQue | ||
|
||
def _cancel_overlapped_io(self, overlapped): | ||
"""Cancel a blocking read operation, may be called from other thread""" | ||
# check if read operation is pending | ||
rc = win32.DWORD() | ||
err = win32.GetOverlappedResult( | ||
self._port_handle, | ||
ctypes.byref(overlapped), | ||
ctypes.byref(rc), | ||
False) | ||
if not err and win32.GetLastError() in (win32.ERROR_IO_PENDING, win32.ERROR_IO_INCOMPLETE): | ||
# cancel, ignoring any errors (e.g. it may just have finished on its own) | ||
win32.CancelIoEx(self._port_handle, overlapped) | ||
|
||
def cancel_read(self): | ||
"""Cancel a blocking read operation, may be called from other thread""" | ||
self._cancel_overlapped_io(self._overlapped_read) | ||
|
||
def cancel_write(self): | ||
"""Cancel a blocking write operation, may be called from other thread""" | ||
self._cancel_overlapped_io(self._overlapped_write) | ||
|
||
# Nur Testfunktion!! | ||
if __name__ == '__main__': | ||
import sys | ||
s = Serial(0) | ||
sys.stdout.write("%s\n" % s) | ||
|
||
s = Serial() | ||
sys.stdout.write("%s\n" % s) | ||
|
||
s.baudrate = 19200 | ||
s.databits = 7 | ||
s.close() | ||
s.port = 0 | ||
s.open() | ||
sys.stdout.write("%s\n" % s) |
Oops, something went wrong.