Skip to content

Commit

Permalink
feat: add upload method
Browse files Browse the repository at this point in the history
  • Loading branch information
JPHutchins committed Dec 3, 2023
1 parent 7b4bdb2 commit 71e6ed1
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ source =
smpclient/

[report]
fail_under = 61.5
fail_under = 57.0
show_missing = True
41 changes: 40 additions & 1 deletion smpclient/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""Simple Management Protocol (SMP) Client."""

from hashlib import sha256
from typing import AsyncIterator

from pydantic import ValidationError
from smp import header as smpheader
from smp import packet as smppacket

from smpclient.generics import SMPRequest, TEr0, TEr1, TErr, TRep, flatten_error
from smpclient.generics import SMPRequest, TEr0, TEr1, TErr, TRep, error, flatten_error, success
from smpclient.requests.image_management import ImageUploadWrite
from smpclient.transport import SMPTransport


Expand Down Expand Up @@ -45,3 +48,39 @@ async def request(self, request: SMPRequest[TRep, TEr0, TEr1, TErr]) -> TRep | T
if header.version == smpheader.Version.V0
else request.ErrorV1.loads(frame)
)

async def upload(
self, image: bytes, slot: int = 0, chunksize: int = 2048, upgrade: bool = False
) -> AsyncIterator[int]:
"""Iteratively upload an `image` to `slot`, yielding the offset."""

# the first write contains some extra info
r = await self.request(
ImageUploadWrite( # type: ignore
off=0,
data=image[:chunksize],
image=slot,
len=len(image),
sha=sha256(image).digest(),
upgrade=upgrade,
)
)

if error(r):
raise Exception(f"{r}")
elif success(r):
yield r.off
else:
raise Exception("Unreachable")

# send chunks until the SMP server reports that the offset is at the end of the image
while r.off != len(image):
r = await self.request(
ImageUploadWrite(off=r.off, data=image[r.off : r.off + chunksize])
)
if error(r):
raise Exception(f"{r}")
elif success(r):
yield r.off
else:
raise Exception("Unreachable")

0 comments on commit 71e6ed1

Please sign in to comment.