Skip to content

Commit

Permalink
Initial SPI implementation -- seems to be working! More testing is ne…
Browse files Browse the repository at this point in the history
…eded
  • Loading branch information
multiplemonomials committed Jan 21, 2024
1 parent 124b783 commit 589ea1a
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 8 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,7 @@ tests/reports
.install.stamp

# Ruff
.ruff_cache
.ruff_cache

# Poetry local config
/poetry.toml
2 changes: 1 addition & 1 deletion Development Setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This library uses Poetry to handle setting up for development and uploading the

### Setting Up for Local Dev
```shell
poetry install --with=linters
poetry install --with=linters --with=tests
poetry shell # This activates a virtual environment containing the dependencies
```

Expand Down
File renamed without changes.
Binary file not shown.
Binary file added doc/EVK_SPI_EEPROM_en.CD00290531.pdf
Binary file not shown.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ ignore = [
'TRY003', # Inline exception messages - Redundant with EM101
'COM812', # Missing trailing commas - incompatible with formatter
'ISC001', # Single line implicit string concatenation - incompatible with formatter
'TRY301', # Don't call raise within a try block -- can I help it if other libraries don't use exceptions??
'TRY300', # Returns within try blocks
]
ignore-init-module-imports = true
line-length = 120
Expand Down
147 changes: 145 additions & 2 deletions src/cy_serial_bridge/driver.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import contextlib
import sys
import time
from dataclasses import dataclass
Expand Down Expand Up @@ -869,7 +870,7 @@ def _spi_is_write_done(self) -> bool:
request=CyVendorCmds.CY_SPI_GET_STATUS_CMD,
value=self.scb_index << CY_SCB_INDEX_POS,
index=0,
data=b"",
length=CySpi.GET_STATUS_LEN,
timeout=self.timeout,
)
== b"\x00\x00\x00\x00"
Expand Down Expand Up @@ -978,7 +979,7 @@ def spi_write(self, tx_data: ByteSequence, io_timeout: int | None = None):
self.dev.controlWrite(
request_type=CY_VENDOR_REQUEST_HOST_TO_DEVICE,
request=CyVendorCmds.CY_SPI_READ_WRITE_CMD,
value=CySpi.WRITE_BIT,
value=(self.scb_index << CY_SCB_INDEX_POS) | CySpi.WRITE_BIT,
index=len(tx_data),
data=b"",
timeout=io_timeout,
Expand Down Expand Up @@ -1009,3 +1010,145 @@ def spi_write(self, tx_data: ByteSequence, io_timeout: int | None = None):
except usb1.USBErrorTimeout:
self._spi_reset()
raise

def spi_read(self, read_len: int, io_timeout: int | None = None) -> ByteSequence:
"""
Perform an SPI read-only operation from the peripheral device.
Note: When you do a read-only operation, the data sent out of the MOSI line to the peripheral
seems to be undefined -- it could literally be any garbage bytes that the serial bridge had laying around
in memory. So, unless your MOSI line is not hooked up, you probably want to use spi_transfer() instead.
:param read_len: Length to read, in words
:param io_timeout: Timeout for the transfer in ms. Leave empty to compute a reasonable timeout automatically.
Set to 0 to wait forever.
:return: Bytes read from the device
"""
if self._curr_frequency is None:
message = "Must call set_spi_configuration() before reading or writing data!"
raise CySerialBridgeError(message)

if io_timeout is None:
io_timeout = self._compute_timeout(read_len)

# Set up transfer
self.dev.controlWrite(
request_type=CY_VENDOR_REQUEST_HOST_TO_DEVICE,
request=CyVendorCmds.CY_SPI_READ_WRITE_CMD,
value=(self.scb_index << CY_SCB_INDEX_POS) | CySpi.READ_BIT,
index=read_len,
data=b"",
timeout=io_timeout,
)

# Get data.
# It seems like the hardware can send multiple packets.
try:
# Note: the Cypress driver had special logic that would, on Mac, split the bulk transfer into
# 64 byte read chunks. The comments said it was to work around a libusb bug. No idea
# if this is still an issue, but for now I decided to KISS by not doing that.

result = self.dev.bulkRead(self.ep_in, read_len, timeout=io_timeout)

if len(result) != read_len:
message = f"Expected {read_len} bytes but only received {len(result)} bytes from bulk read!"
raise CySerialBridgeError(message)

return result

except Exception:
# If anything went wrong, try and reset the SPI module so that the next transaction works
self._spi_reset()
raise

def spi_transfer(self, tx_data: ByteSequence, io_timeout: int | None = None) -> ByteSequence:
"""
Perform an SPI read-and-write operation to the peripheral device.
The bytes in tx_data will be sent, and the response by the peripheral to each
byte will be recorded and returned.
Note: This operation will always read and write the same length of data. So, you may need to add
additional padding to your tx_data to account for additional bytes that you want to read.
:param tx_data: Data to write
:param io_timeout: Timeout for the transfer in ms. Leave empty to compute a reasonable timeout automatically.
Set to 0 to wait forever.
"""
if self._curr_frequency is None:
message = "Must call set_spi_configuration() before reading or writing data!"
raise CySerialBridgeError(message)

if io_timeout is None:
io_timeout = self._compute_timeout(len(tx_data))

# Set up transfer
self.dev.controlWrite(
request_type=CY_VENDOR_REQUEST_HOST_TO_DEVICE,
request=CyVendorCmds.CY_SPI_READ_WRITE_CMD,
value=(self.scb_index << CY_SCB_INDEX_POS) | CySpi.WRITE_BIT | CySpi.READ_BIT,
index=len(tx_data),
data=b"",
timeout=io_timeout,
)

try:
# Send and receive data at the same time using async API
tx_transfer = self.dev.getTransfer()
rx_transfer = self.dev.getTransfer()

tx_transfer.setBulk(self.ep_out, tx_data, timeout=io_timeout)
rx_transfer.setBulk(self.ep_in, len(tx_data), timeout=io_timeout)

tx_transfer.submit()
rx_transfer.submit()

start_time = time.time()

# Wait for both transfers to finish, polling libusb until they are.
while tx_transfer.isSubmitted() or rx_transfer.isSubmitted():
with contextlib.suppress(usb1.USBErrorInterrupted): # Suppressing this exception is recommended by the python-libusb1 docs
# Note: the best way to do this is to use libusb_handle_events_completed(),
# which allows handling events until a specific transfer is completed.
# That would allow us to cleanly block until the transfers are done.
# However, python-libusb1 currently doesn't provide an abstraction for that
# function. Sadness. So, we have to just keep polling instead.
# TODO: Is it possible for this function to cause an infinite hang if there are no events to poll?
# Seems like maybe it could but it's used this way in the python-libusb1 example so idk...
usb_context.handleEvents()

if (time.time() - start_time) > io_timeout:
raise usb1.USBErrorTimeout

if tx_transfer.getStatus() == usb1.TRANSFER_STALL:
# Attempt to handle pipe errors similarly to how the original driver did.
self.dev.clearHalt(self.ep_out)

if tx_transfer.getStatus() != usb1.TRANSFER_COMPLETED:
message = "Tx transfer failed with error " + repr(tx_transfer.getStatus())
raise CySerialBridgeError(message)

if rx_transfer.getStatus() != usb1.TRANSFER_COMPLETED:
message = "Rx transfer failed with error " + repr(rx_transfer.getStatus())
raise CySerialBridgeError(message)

if rx_transfer.getActualLength() != len(tx_data):
message = f"Expected {len(tx_data)} bytes but only received {rx_transfer.getActualLength()} bytes from bulk read!"
raise CySerialBridgeError(message)

# Poll for write completion. Oddly, unlike I2C, there is no interrupt functionality to tell when
# the transfer is complete.
while not self._spi_is_write_done():
time.sleep(0.001)

if time.time() > start_time:
message = "Timeout waiting for SPI write completion!"
raise CySerialBridgeError(message)

return rx_transfer.getBuffer()

except Exception:
# If anything went wrong, try and reset the SPI module so that the next transaction works
self._spi_reset()
raise
31 changes: 27 additions & 4 deletions tests/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ def test_spi_config_read_write(serial_bridge: usb1.USBDevice):
Test that we can read and write SPI configs from the device
"""
print("Please connect jumpers on the eval kit:")
print("J17 = 2-5")
print("J19 = 2-3")
print("J21 = 2-3")
print("J20 = 2-5")
print("J17 = 2-5 [MISO]")
print("J19 = 2-3 [CS]")
print("J21 = 2-3 [SCLK]")
print("J20 = 2-5 [MOSI]")
input("Press [ENTER] when done...")

with cy_serial_bridge.CySPIControllerBridge(serial_bridge) as dev:
Expand Down Expand Up @@ -250,3 +250,26 @@ def test_spi_config_read_write(serial_bridge: usb1.USBDevice):
read_config_2 = dev.read_spi_configuration()
print("Got back SPI configuration: " + repr(read_config_2))
assert read_config_2 == config_2


def test_spi_transactions(serial_bridge: usb1.USBDevice):
"""
Test doing an SPI transaction with the CY7C652xx to the EEPROM on th edev board
"""

with cy_serial_bridge.CySPIControllerBridge(serial_bridge) as dev:
eeprom_spi_config = cy_serial_bridge.CySPIConfig(
frequency=2000000, # EEPROM max frequency 5MHz, so we play it a bit safe with 2MHz
word_size=8,
mode=cy_serial_bridge.CySpiMode.MOTOROLA_MODE_0, # EEPROM can use either SPI mode 0 or SPI mode 3
msbit_first=True,
continuous_ssel=True
)
dev.set_spi_configuration(eeprom_spi_config)

dev.spi_write(bytes([5, 0, 0]))

print(dev.spi_transfer(bytes([5, 0, 0])))


# TODO do an SPI test showing how to use word sizes > 8

0 comments on commit 589ea1a

Please sign in to comment.