Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: improve shutdown handling #205

Merged
merged 7 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,7 @@ RustBuffer uniffi_iroh_ffi_fn_method_node_my_rpc_addr(void*_Nonnull ptr, RustCal
#endif
#ifndef UNIFFI_FFIDEF_UNIFFI_IROH_FFI_FN_METHOD_NODE_SHUTDOWN
#define UNIFFI_FFIDEF_UNIFFI_IROH_FFI_FN_METHOD_NODE_SHUTDOWN
uint64_t uniffi_iroh_ffi_fn_method_node_shutdown(void*_Nonnull ptr, int8_t force
uint64_t uniffi_iroh_ffi_fn_method_node_shutdown(void*_Nonnull ptr
);
#endif
#ifndef UNIFFI_FFIDEF_UNIFFI_IROH_FFI_FN_METHOD_NODE_STATS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,7 @@ RustBuffer uniffi_iroh_ffi_fn_method_node_my_rpc_addr(void*_Nonnull ptr, RustCal
#endif
#ifndef UNIFFI_FFIDEF_UNIFFI_IROH_FFI_FN_METHOD_NODE_SHUTDOWN
#define UNIFFI_FFIDEF_UNIFFI_IROH_FFI_FN_METHOD_NODE_SHUTDOWN
uint64_t uniffi_iroh_ffi_fn_method_node_shutdown(void*_Nonnull ptr, int8_t force
uint64_t uniffi_iroh_ffi_fn_method_node_shutdown(void*_Nonnull ptr
);
#endif
#ifndef UNIFFI_FFIDEF_UNIFFI_IROH_FFI_FN_METHOD_NODE_STATS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,7 @@ RustBuffer uniffi_iroh_ffi_fn_method_node_my_rpc_addr(void*_Nonnull ptr, RustCal
#endif
#ifndef UNIFFI_FFIDEF_UNIFFI_IROH_FFI_FN_METHOD_NODE_SHUTDOWN
#define UNIFFI_FFIDEF_UNIFFI_IROH_FFI_FN_METHOD_NODE_SHUTDOWN
uint64_t uniffi_iroh_ffi_fn_method_node_shutdown(void*_Nonnull ptr, int8_t force
uint64_t uniffi_iroh_ffi_fn_method_node_shutdown(void*_Nonnull ptr
);
#endif
#ifndef UNIFFI_FFIDEF_UNIFFI_IROH_FFI_FN_METHOD_NODE_STATS
Expand Down
9 changes: 4 additions & 5 deletions IrohLib/Sources/IrohLib/IrohLib.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8317,7 +8317,7 @@ public protocol NodeProtocol: AnyObject {
/**
* Shutdown this iroh node.
*/
func shutdown(force: Bool) async throws
func shutdown() async throws

/**
* Get statistics of the running node.
Expand Down Expand Up @@ -8400,13 +8400,12 @@ open class Node:
/**
* Shutdown this iroh node.
*/
open func shutdown(force: Bool) async throws {
open func shutdown() async throws {
return
try await uniffiRustCallAsync(
rustFutureFunc: {
uniffi_iroh_ffi_fn_method_node_shutdown(
self.uniffiClonePointer(),
FfiConverterBool.lower(force)
self.uniffiClonePointer()
)
},
pollFunc: ffi_iroh_ffi_rust_future_poll_void,
Expand Down Expand Up @@ -18145,7 +18144,7 @@ private var initializationResult: InitializationResult = {
if uniffi_iroh_ffi_checksum_method_node_my_rpc_addr() != 34751 {
return InitializationResult.apiChecksumMismatch
}
if uniffi_iroh_ffi_checksum_method_node_shutdown() != 21075 {
if uniffi_iroh_ffi_checksum_method_node_shutdown() != 49624 {
return InitializationResult.apiChecksumMismatch
}
if uniffi_iroh_ffi_checksum_method_node_stats() != 13439 {
Expand Down
10 changes: 5 additions & 5 deletions iroh-js/__test__/blob.spec.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ test('add blob from path', async (t) => {
t.truthy(allDone.tag)
t.truthy(allDone.hash)

await node.node.shutdown(false)
await node.node.shutdown()
})

test('hash basics', (t) => {
Expand Down Expand Up @@ -104,7 +104,7 @@ test('collections', async (t) => {
t.is(collectionList[0].hash, res.hash)
t.is(collectionList[0].totalBlobsCount, BigInt(numFiles + 1))

await node.node.shutdown(false)
await node.node.shutdown()
})

test('share', async (t) => {
Expand All @@ -119,7 +119,7 @@ test('share', async (t) => {
t.is(ticket.hash, res.hash)
t.deepEqual(ticket.nodeAddr, nodeAddr)

await node.node.shutdown(false)
await node.node.shutdown()
})

test('provide events', async (t) => {
Expand Down Expand Up @@ -174,6 +174,6 @@ test('provide events', async (t) => {

t.is(events.length, 4)

await node1.node.shutdown(false)
await node2.node.shutdown(false)
await node1.node.shutdown()
await node2.node.shutdown()
})
4 changes: 2 additions & 2 deletions iroh-js/__test__/gossip.spec.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ test('gossip basic', async (t) => {
await sink0.close()
await sink1.close()

await n0.node.shutdown(false)
await n1.node.shutdown(false)
await n0.node.shutdown()
await n1.node.shutdown()

t.pass()
})
35 changes: 20 additions & 15 deletions iroh-js/__test__/node.spec.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ import test from 'ava'

import { Iroh } from '../index.js'


test('create memory node', async (t) => {
const node = await Iroh.memory()
await node.node.shutdown()
t.pass()
})

test('create memory node, with options', async (t) => {
const node = await Iroh.memory({
gcIntervalMillis: 10000
})
await node.node.shutdown()
t.pass()
})

Expand All @@ -20,6 +21,7 @@ test('node status', async (t) => {
const status = await iroh.node.status()

t.is(status.version, '0.28.1')
await iroh.node.shutdown()
})

test('rpc client memory node', async (t) => {
Expand All @@ -33,43 +35,45 @@ test('rpc client memory node', async (t) => {
const clientId = await client.net.nodeId()

t.is(nodeId, clientId)

await node.node.shutdown()
})


test('custom protocol', async (t) => {
t.plan(4)
const alpn = Buffer.from('iroh-example/hello/0')

const protocols = {
[alpn]: (err, ep, client) => ({
accept: async (err, connecting) => {
console.log('accept')
// console.log('accept')
t.falsy(err)
const nodeId = await client.net.nodeId()
console.log(`accepting on node ${nodeId}`)
// console.log(`accepting on node ${nodeId}`)
const alpn = await connecting.alpn()
console.log(`incoming on ${alpn.toString()}`)
// console.log(`incoming on ${alpn.toString()}`)

const conn = await connecting.connect()
const remote = await conn.getRemoteNodeId()
console.log(`connected id ${remote.toString()}`)
// console.log(`connected id ${remote.toString()}`)

const bi = await conn.acceptBi()

const bytes = await bi.recv.readToEnd(64)
console.log(`got: ${bytes.toString()}`)
// console.log(`got: ${bytes.toString()}`)
t.is(bytes.toString(), 'yo')
await bi.send.writeAll(Buffer.from('hello'))
await bi.send.finish()
await conn.closed()
},
shutdown: (err) => {
if (err != null) {
if (!err.message.contains("closed by peer")) {
console.log('shutdown error', err)
if (!err.message.contains('closed')) {
throw err
}
}
console.log('shutting down')
// console.log('shutting down')
}
})
}
Expand All @@ -82,11 +86,11 @@ test('custom protocol', async (t) => {
const node2 = await Iroh.memory({ protocols })

const endpoint = node2.node.endpoint()
console.log(`connecting to ${nodeAddr.nodeId}`)
// console.log(`connecting to ${nodeAddr.nodeId}`)

const conn = await endpoint.connect(nodeAddr, alpn)
const remote = await conn.getRemoteNodeId()
console.log(`connected to ${remote.toString()}`)
// console.log(`connected to ${remote.toString()}`)

const bi = await conn.openBi()

Expand All @@ -96,12 +100,13 @@ test('custom protocol', async (t) => {
let out = Buffer.alloc(5)
await bi.recv.readExact(out)

console.log(`read: ${out.toString()}`)
// console.log(`read: ${out.toString()}`)
t.is(out.toString(), 'hello')

await node2.node.shutdown(false)
await node1.node.shutdown(false)
await node2.node.shutdown()
await node1.node.shutdown()

// console.log('end')

console.log('end')
t.pass()
})
2 changes: 1 addition & 1 deletion iroh-js/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ export declare class Node {
/** Get status information about a node */
status(): Promise<NodeStatus>
/** Shutdown this iroh node. */
shutdown(force: boolean): Promise<void>
shutdown(): Promise<void>
/** Returns `Some(addr)` if an RPC endpoint is running, `None` otherwise. */
myRpcAddr(): string | null
endpoint(): Endpoint | null
Expand Down
8 changes: 6 additions & 2 deletions iroh-js/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,12 @@ impl Node {

/// Shutdown this iroh node.
#[napi]
pub async fn shutdown(&self, force: bool) -> Result<()> {
self.node().shutdown(force).await?;
pub async fn shutdown(&self) -> Result<()> {
match self.node.0 {
InnerIroh::Fs(ref fs) => fs.clone().shutdown().await?,
InnerIroh::Memory(ref mem) => mem.clone().shutdown().await?,
InnerIroh::Client(ref client) => client.shutdown(false).await?,
}
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions kotlin/author_test.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ kotlinx.coroutines.runBlocking {

// check there is 1 author on the node
assert(node.authors().list().size == 2)

node.node().shutdown()
}
8 changes: 8 additions & 0 deletions kotlin/blob_test.kts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ runBlocking {
val gotBytes = node.blobs().readToBytes(hash)
assert(gotBytes.size == blobSize)
assert(gotBytes contentEquals bytes)

node.node().shutdown()
}

// test functionality between reading bytes from a path and writing bytes to a path
Expand Down Expand Up @@ -177,6 +179,8 @@ runBlocking {
val gotBytesFile = java.io.File(outPath).readBytes()
assert(gotBytesFile.size == blobSize)
assert(gotBytesFile contentEquals bytes)

node.node().shutdown()
}

// Collections
Expand Down Expand Up @@ -250,6 +254,8 @@ runBlocking {
// collections also create a metadata hash that is not accounted for
// in the list of hashes
assert(collectionHashes.size + 1 == gotHashes.size)

node.node().shutdown()
}

// List and delete
Expand Down Expand Up @@ -297,4 +303,6 @@ runBlocking {
throw Exception(String.format("blob $removeHash should have been removed"))
}
}

node.node().shutdown()
}
9 changes: 8 additions & 1 deletion kotlin/doc_test.kts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ runBlocking {
val entry = doc.getOne(query)!!
assert(hash.equal(entry.contentHash()))
assert(v.size.toULong() == entry.contentLen())
val gotVal = entry.contentBytes(doc)
val gotVal: ByteArray = try {
entry.contentBytes(doc)
} catch (e: IrohException) {
println("failed content bytes ${e.message}")
throw e
}

assert(v contentEquals gotVal)
node.node().shutdown()
}
6 changes: 3 additions & 3 deletions kotlin/gossip_test.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class Callback : GossipMessageCallback {
}

runBlocking {
setLogLevel(LogLevel.DEBUG)
// setLogLevel(LogLevel.DEBUG)

val n0 = Iroh.memory()
val n1 = Iroh.memory()
Expand Down Expand Up @@ -75,6 +75,6 @@ runBlocking {
sink0.cancel()
sink1.cancel()

n0.node().shutdown(false)
n1.node().shutdown(false)
n0.node().shutdown()
n1.node().shutdown()
}
12 changes: 4 additions & 8 deletions kotlin/iroh/iroh_ffi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2386,10 +2386,7 @@ internal interface UniffiLib : Library {
uniffi_out_err: UniffiRustCallStatus,
): RustBuffer.ByValue

fun uniffi_iroh_ffi_fn_method_node_shutdown(
`ptr`: Pointer,
`force`: Byte,
): Long
fun uniffi_iroh_ffi_fn_method_node_shutdown(`ptr`: Pointer): Long

fun uniffi_iroh_ffi_fn_method_node_stats(`ptr`: Pointer): Long

Expand Down Expand Up @@ -4118,7 +4115,7 @@ private fun uniffiCheckApiChecksums(lib: UniffiLib) {
if (lib.uniffi_iroh_ffi_checksum_method_node_my_rpc_addr() != 34751.toShort()) {
throw RuntimeException("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
}
if (lib.uniffi_iroh_ffi_checksum_method_node_shutdown() != 21075.toShort()) {
if (lib.uniffi_iroh_ffi_checksum_method_node_shutdown() != 49624.toShort()) {
throw RuntimeException("UniFFI API checksum mismatch: try cleaning and rebuilding your project")
}
if (lib.uniffi_iroh_ffi_checksum_method_node_stats() != 13439.toShort()) {
Expand Down Expand Up @@ -18105,7 +18102,7 @@ public interface NodeInterface {
/**
* Shutdown this iroh node.
*/
suspend fun `shutdown`(`force`: kotlin.Boolean)
suspend fun `shutdown`()

/**
* Get statistics of the running node.
Expand Down Expand Up @@ -18239,12 +18236,11 @@ open class Node :
*/
@Throws(IrohException::class)
@Suppress("ASSIGNED_BUT_NEVER_ACCESSED_VARIABLE")
override suspend fun `shutdown`(`force`: kotlin.Boolean) =
override suspend fun `shutdown`() =
uniffiRustCallAsync(
callWithPointer { thisPtr ->
UniffiLib.INSTANCE.uniffi_iroh_ffi_fn_method_node_shutdown(
thisPtr,
FfiConverterBoolean.lower(`force`),
)
},
{ future, callback, continuation -> UniffiLib.INSTANCE.ffi_iroh_ffi_rust_future_poll_void(future, callback, continuation) },
Expand Down
7 changes: 5 additions & 2 deletions kotlin/node_test.kts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ runBlocking {
break
}
}

node1.node().shutdown()
node0.node().shutdown()
}

class MyProtocol : ProtocolHandler {
Expand Down Expand Up @@ -135,6 +138,6 @@ runBlocking {
println(o.toString(Charsets.UTF_8))
assert("hello".toByteArray(Charsets.UTF_8) contentEquals o)

node2.node().shutdown(false)
node1.node().shutdown(false)
node2.node().shutdown()
node1.node().shutdown()
}
4 changes: 2 additions & 2 deletions python/gossip_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,5 @@ async def test_gossip_basic():
await sink0.cancel()
await sink1.cancel()

await n0.node().shutdown(False)
await n1.node().shutdown(False)
await n0.node().shutdown()
await n1.node().shutdown()
4 changes: 2 additions & 2 deletions python/node_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,5 +122,5 @@ def create(self, endpoint, client):
print("", out)
assert b"hello", out

await node_2.node().shutdown(True)
await node_1.node().shutdown(True)
await node_2.node().shutdown()
await node_1.node().shutdown()
Loading
Loading