From 63447bb6ed5101d41b436f44b39144a11f350caa Mon Sep 17 00:00:00 2001 From: Nadeshiko Kagamihara Date: Wed, 6 Sep 2023 20:11:54 -0700 Subject: [PATCH 1/7] Refactor Reactor::{connect, connectMulti, canConnect} 0. `canConnect` doesn't experience situations where throwing Error is required. 1. `canConnect` is solely for testing and should not throw. 2. `connect` and `connectMulti` should throw. Simply return result from `canConnect` and throw in functions that calls them. --- src/core/reactor.ts | 66 +++++++++++++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 23 deletions(-) diff --git a/src/core/reactor.ts b/src/core/reactor.ts index 21e3558f..85cf264b 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -61,6 +61,18 @@ export interface Call extends Write, Read { invoke: (args: A) => R | undefined; } +export enum CanConnectResult { + SUCCESS = 0, + SELF_LOOP = "Source port and destination port are the same.", + DESTINATION_OCCUPIED = "Destination port is already occupied.", + DOWNSTREAM_WRITE_CONFLICT = "Write conflict: port is already occupied.", + NOT_IN_SCOPE = "Source and destination ports are not in scope.", + RT_CONNECTION_OUTSIDE_CONTAINER = "New connection is outside of container.", + RT_DIRECT_FEED_THROUGH = "New connection introduces direct feed through.", + RT_CYCLE = "New connection introduces cycle.", + MUTATION_CAUSALITY_LOOP = "New connection will change the causal effect of the mutation that triggered this connection." +} + /** * Abstract class for a schedulable action. It is intended as a wrapper for a * regular action. In addition to a get method, it also has a schedule method @@ -1092,10 +1104,13 @@ export abstract class Reactor extends Component { * @param src The start point of a new connection. * @param dst The end point of a new connection. */ - public canConnect(src: IOPort, dst: IOPort): boolean { + public canConnect( + src: IOPort, + dst: IOPort + ): CanConnectResult { // Immediate rule out trivial self loops. if (src === dst) { - throw Error("Source port and destination port are the same."); + return CanConnectResult.SELF_LOOP; } // Check the race condition @@ -1103,7 +1118,7 @@ export abstract class Reactor extends Component { // in addReaction) const deps = this._dependencyGraph.getUpstreamNeighbors(dst); // FIXME this will change with multiplex ports if (deps !== undefined && deps.size > 0) { - throw Error("Destination port is already occupied."); + return CanConnectResult.DESTINATION_OCCUPIED; } if (!this._runtime.isRunning()) { @@ -1115,10 +1130,13 @@ export abstract class Reactor extends Component { // Rule out write conflicts. // - (between reactors) if (this._dependencyGraph.getDownstreamNeighbors(dst).size > 0) { - return false; + return CanConnectResult.DOWNSTREAM_WRITE_CONFLICT; } - return this._isInScope(src, dst); + if (!this._isInScope(src, dst)) { + return CanConnectResult.NOT_IN_SCOPE; + } + return CanConnectResult.SUCCESS; } else { // Attempt to make a connection while executing. // Check the local dependency graph to figure out whether this change @@ -1132,7 +1150,7 @@ export abstract class Reactor extends Component { src._isContainedBy(this) && dst._isContainedBy(this) ) { - throw Error("New connection is outside of container."); + return CanConnectResult.RT_CONNECTION_OUTSIDE_CONTAINER; } // Take the local graph and merge in all the causality interfaces @@ -1149,23 +1167,21 @@ export abstract class Reactor extends Component { // 1) check for loops const hasCycle = graph.hasCycle(); + if (hasCycle) { + return CanConnectResult.RT_CYCLE; + } // 2) check for direct feed through. // FIXME: This doesn't handle while direct feed thorugh cases. - let hasDirectFeedThrough = false; - if (src instanceof InPort && dst instanceof OutPort) { - hasDirectFeedThrough = dst.getContainer() === src.getContainer(); - } - // Throw error cases - if (hasDirectFeedThrough && hasCycle) { - throw Error("New connection introduces direct feed through and cycle."); - } else if (hasCycle) { - throw Error("New connection introduces cycle."); - } else if (hasDirectFeedThrough) { - throw Error("New connection introduces direct feed through."); + if ( + src instanceof InPort && + dst instanceof OutPort && + dst.getContainer() === src.getContainer() + ) { + return CanConnectResult.RT_DIRECT_FEED_THROUGH; } - return true; + return CanConnectResult.SUCCESS; } } @@ -1259,11 +1275,14 @@ export abstract class Reactor extends Component { if (dst === undefined || dst === null) { throw new Error("Cannot connect unspecified destination"); } - if (this.canConnect(src, dst)) { - this._uncheckedConnect(src, dst); - } else { - throw new Error(`ERROR connecting ${src} to ${dst}`); + const canConnectResult = this.canConnect(src, dst); + // I know, this looks a bit weird. But + if (canConnectResult !== CanConnectResult.SUCCESS) { + throw new Error( + `ERROR connecting ${src} to ${dst}. Reason is ${canConnectResult.valueOf()}` + ); } + this._uncheckedConnect(src, dst); } protected _connectMulti( @@ -1317,7 +1336,8 @@ export abstract class Reactor extends Component { } for (let i = 0; i < leftPorts.length && i < rightPorts.length; i++) { - if (!this.canConnect(leftPorts[i], rightPorts[i])) { + const canConnectResult = this.canConnect(leftPorts[i], rightPorts[i]); + if (canConnectResult !== CanConnectResult.SUCCESS) { throw new Error( `ERROR connecting ${leftPorts[i]} to ${rightPorts[i]} From 6d10efb9aa5c14821bf9814f310cbd22e9f80bd4 Mon Sep 17 00:00:00 2001 From: Nadeshiko Kagamihara Date: Mon, 11 Sep 2023 21:44:14 -0700 Subject: [PATCH 2/7] Change tests accordingly so they will pass Most of them were achieved by a dirty hack: by changing `toBe(true)` to be `toBeFalsy()` and vice versa. The negation of truthiness is because if connection can be established, we return 0, which is falsy. Otherwise we return an error enum which is truthy. This is a bit C-esque and not JS-y, but let's keep it this way as of now. Throw-catch would technically work, but it's not technically an error because we are **testing** if a connection can be established. Returning an error would also technically work, and we can add some type matching to make it work more smoothly, but as of now I intend to keep it this way. One WIP would be to change `toBeTruthy` to the actual error code, but then some tests might break. We will need to investigate instead of coding tests based on the results. --- __tests__/HierarchicalSingleEvent.test.ts | 5 +- __tests__/SingleEvent.test.ts | 8 +-- __tests__/connection.test.ts | 9 ++-- __tests__/hierarchy.ts | 64 ++++++++++------------- 4 files changed, 38 insertions(+), 48 deletions(-) diff --git a/__tests__/HierarchicalSingleEvent.test.ts b/__tests__/HierarchicalSingleEvent.test.ts index 45e7b998..77eb8034 100644 --- a/__tests__/HierarchicalSingleEvent.test.ts +++ b/__tests__/HierarchicalSingleEvent.test.ts @@ -4,7 +4,8 @@ import { Parameter, OutPort, InPort, - TimeValue + TimeValue, + CanConnectResult } from "../src/core/internal"; import {SingleEvent} from "../src/share/SingleEvent"; @@ -93,7 +94,7 @@ describe("HierarchicalSingleEvent", function () { seTest.seContainer.child.o, seTest.logContainer.child.i ) - ).toBe(false); + ).toBe(CanConnectResult.NOT_IN_SCOPE); seTest._start(); }); diff --git a/__tests__/SingleEvent.test.ts b/__tests__/SingleEvent.test.ts index ac89fb1f..ab49c55f 100644 --- a/__tests__/SingleEvent.test.ts +++ b/__tests__/SingleEvent.test.ts @@ -32,12 +32,8 @@ describe("SingleEvent", function () { expect(expect(seTest.singleEvent).toBeInstanceOf(SingleEvent)); expect(expect(seTest.logger).toBeInstanceOf(Logger)); - expect(function () { - seTest.canConnect(seTest.singleEvent.o, seTest.logger.i); - }).toThrow(new Error("Destination port is already occupied.")); - expect(seTest.canConnect(seTest.logger.i, seTest.singleEvent.o)).toBe( - false - ); + expect(seTest.canConnect(seTest.singleEvent.o, seTest.logger.i)).toBeTruthy(); + expect(seTest.canConnect(seTest.logger.i, seTest.singleEvent.o)).toBeTruthy(); seTest._start(); }); diff --git a/__tests__/connection.test.ts b/__tests__/connection.test.ts index 3d8725e3..4798b7bc 100644 --- a/__tests__/connection.test.ts +++ b/__tests__/connection.test.ts @@ -5,7 +5,8 @@ import { OutPort, InPort, TimeUnit, - TimeValue + TimeValue, + CanConnectResult } from "../src/core/internal"; describe("Check canConnect", () => { @@ -30,19 +31,19 @@ describe("Check canConnect", () => { it("canConnect success out->in", () => { expect(this.canConnect(this.source.out, this.destination.in)).toBe( - true + CanConnectResult.SUCCESS ); }); it("canConnect success out->out", () => { expect(this.canConnect(this.source.out, this.destination.out)).toBe( - true + CanConnectResult.SUCCESS ); }); it("canConnect failure", () => { expect(this.canConnect(this.destination.in, this.source.out)).toBe( - false + CanConnectResult.NOT_IN_SCOPE ); }); } diff --git a/__tests__/hierarchy.ts b/__tests__/hierarchy.ts index ec7fda22..0515e861 100644 --- a/__tests__/hierarchy.ts +++ b/__tests__/hierarchy.ts @@ -1,4 +1,4 @@ -import {Reactor, App, InPort, OutPort} from "../src/core/internal"; +import {Reactor, App, InPort, OutPort, CanConnectResult} from "../src/core/internal"; class InOut extends Reactor { a = new InPort(this); @@ -36,65 +36,57 @@ describe("Container to Contained", () => { it("testing canConnect", () => { expect( app.container.canConnect(app.container.a, app.container.contained.a) - ).toBe(true); + ).toBe(CanConnectResult.SUCCESS); expect( app.container.canConnect(app.container.contained.a, app.container.a) - ).toBe(false); + ).toBe(CanConnectResult.NOT_IN_SCOPE); expect( app.container.canConnect( app.container.a, app.container.b ) - ).toBe(true); + ).toBe(CanConnectResult.SUCCESS); expect( app.container.canConnect( app.container.contained.a, app.container.contained.b ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect( app.container.contained.b, app.container.contained.a ) - ).toBe(true); + ).toBeFalsy(); expect( app.container.canConnect(app.container.a, app.container.contained.b) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect(app.container.contained.b, app.container.a) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect(app.container.b, app.container.contained.a) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect(app.container.contained.a, app.container.b) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect(app.container.b, app.container.contained.b) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect(app.container.contained.b, app.container.b) - ).toBe(true); - - expect(app.container.canConnect(app.container.contained.a, app.foo.a)).toBe( - false - ); - expect(app.container.canConnect(app.container.contained.a, app.foo.b)).toBe( - false - ); - expect(app.container.canConnect(app.foo.a, app.container.contained.a)).toBe( - false - ); - expect(app.container.canConnect(app.foo.a, app.container.contained.a)).toBe( - false - ); - - expect(app.container.canConnect(app.foo.a, app.container.b)).toBe(false); - expect(app.container.canConnect(app.foo.a, app.container.a)).toBe(false); + ).toBeFalsy(); + + expect(app.container.canConnect(app.container.contained.a, app.foo.a)).toBeTruthy(); + expect(app.container.canConnect(app.container.contained.a, app.foo.b)).toBeTruthy(); + expect(app.container.canConnect(app.foo.a, app.container.contained.a)).toBeTruthy(); + expect(app.container.canConnect(app.foo.a, app.container.contained.a)).toBeTruthy(); + + expect(app.container.canConnect(app.foo.a, app.container.b)).toBeTruthy(); + expect(app.container.canConnect(app.foo.a, app.container.a)).toBeTruthy(); // expect(app.container.contained).toBeDefined(); @@ -104,49 +96,49 @@ describe("Container to Contained", () => { app.container.contained.containedAgain.a, app.container.contained.a ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.b, app.container.contained.b ) - ).toBe(true); + ).toBeFalsy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.a, app.container.a ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.b, app.container.b ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.a, app.foo.a ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.b, app.foo.b ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.a, app.foo.a ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.b, app.foo.b ) - ).toBe(false); + ).toBeTruthy(); // } }); }); From def4c808abfd6191c8d80a7b2a50ad3b07837b62 Mon Sep 17 00:00:00 2001 From: Kagamihara Nadeshiko Date: Mon, 18 Sep 2023 14:06:58 -0700 Subject: [PATCH 3/7] Implemented connectable port 1. Implemented ConnectablePort that can be used as `Variable` 2. Refactored ConnectablePort with function overloading at the same time to facilitate better type check (but due to TS limitation there's no type narrowing, see https://github.com/microsoft/TypeScript/issues/22609) 3. Made tests conform to the new connection API --- __tests__/InvalidMutations.ts | 8 +++---- __tests__/SimpleMutation.test.ts | 2 +- __tests__/disconnect.test.ts | 6 ++--- __tests__/mutations.test.ts | 2 +- src/core/port.ts | 14 +++++++++++- src/core/reactor.ts | 39 ++++++++++++++++++++++++-------- 6 files changed, 51 insertions(+), 20 deletions(-) diff --git a/__tests__/InvalidMutations.ts b/__tests__/InvalidMutations.ts index 17b62f6b..2345eb39 100644 --- a/__tests__/InvalidMutations.ts +++ b/__tests__/InvalidMutations.ts @@ -40,23 +40,23 @@ class R1 extends Reactor { function (this, __in1, __in2, __out1, __out2) { test("expect error on creating creating direct feed through", () => { expect(() => { - this.connect(__in2, __out2); + this.connect(__in2.asConnectable(), __out2.asConnectable()); }).toThrowError("New connection introduces direct feed through."); }); test("expect error when creating connection outside container", () => { expect(() => { - this.connect(__out2, __in2); + this.connect(__out2.asConnectable(), __in2.asConnectable()); }).toThrowError("New connection is outside of container."); }); const R2 = new R1(this.getReactor()); test("expect error on mutation creating race condition on an output port", () => { expect(() => { - this.connect(R2.out1, __out1); + this.connect(R2.out1.asConnectable(), __out1.asConnectable()); }).toThrowError("Destination port is already occupied."); }); test("expect error on spawning and creating loop within a reactor", () => { expect(() => { - this.connect(R2.out1, R2.in1); + this.connect(R2.out1.asConnectable(), R2.in1.asConnectable()); }).toThrowError("New connection introduces cycle."); }); } diff --git a/__tests__/SimpleMutation.test.ts b/__tests__/SimpleMutation.test.ts index e85d0676..bd134558 100644 --- a/__tests__/SimpleMutation.test.ts +++ b/__tests__/SimpleMutation.test.ts @@ -46,7 +46,7 @@ class R2 extends Reactor { function (this, __in, __out) { test("expect error to be thrown", () => { expect(() => { - this.connect(__out, __in); + this.connect(__out.asConnectable(), __in.asConnectable()); }).toThrowError("New connection is outside of container."); }); } diff --git a/__tests__/disconnect.test.ts b/__tests__/disconnect.test.ts index b25b31cb..c876c336 100644 --- a/__tests__/disconnect.test.ts +++ b/__tests__/disconnect.test.ts @@ -54,11 +54,11 @@ class R1 extends Reactor { const R2 = new R1(this.getReactor()); test("expect that disconnecting an existing connection will not result in an error being thrown", () => { expect(() => { - this.connect(R2.out2, R2.in2); + this.connect(R2.out2.asConnectable(), R2.in2.asConnectable()); this.disconnect(R2.out2, R2.in2); - this.connect(R2.out2, R2.in2); + this.connect(R2.out2.asConnectable(), R2.in2.asConnectable()); this.disconnect(R2.out2); - this.connect(R2.out2, R2.in2); + this.connect(R2.out2.asConnectable(), R2.in2.asConnectable()); }).not.toThrow(); }); } diff --git a/__tests__/mutations.test.ts b/__tests__/mutations.test.ts index 090e9005..55c94964 100644 --- a/__tests__/mutations.test.ts +++ b/__tests__/mutations.test.ts @@ -92,7 +92,7 @@ class Computer extends Reactor { continue; } const x = new AddOne(this.getReactor(), id); - this.connect(src, x.input); + this.connect(src.asConnectable(), x.input.asConnectable()); } } }); diff --git a/src/core/port.ts b/src/core/port.ts index e6206e82..8870d1cd 100644 --- a/src/core/port.ts +++ b/src/core/port.ts @@ -7,7 +7,8 @@ import type { Absent, MultiReadWrite, ReadWrite, - Variable + Variable, + Read } from "./internal"; import {Trigger, Log} from "./internal"; @@ -59,6 +60,13 @@ export abstract class Port extends Trigger { } } +export class ConnectablePort implements Read { + public get = (): Absent => undefined; + public getPort = (): IOPort => this.port; + + constructor(public port: IOPort) {} +} + /** * Abstract class for a writable port. It is intended as a wrapper for a * regular port. In addition to a get method, it also has a set method and @@ -103,6 +111,10 @@ export abstract class IOPort extends Port { } } + public asConnectable(): ConnectablePort { + return new ConnectablePort(this); + } + /** * Only the holder of the key may obtain a writable port. * @param key diff --git a/src/core/reactor.ts b/src/core/reactor.ts index 85cf264b..10929854 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -42,7 +42,8 @@ import { Startup, Shutdown, WritableMultiPort, - Dummy + Dummy, + ConnectablePort } from "./internal"; import {v4 as uuidv4} from "uuid"; import {Bank} from "./bank"; @@ -440,16 +441,31 @@ export abstract class Reactor extends Component { * @param src * @param dst */ + + public connect( + src: ConnectablePort, + dst: ConnectablePort + ): void; + public connect( + src: CallerPort, + dst: CalleePort + ): void; public connect( - src: CallerPort | IOPort, - dst: CalleePort | IOPort + ...[src, dst]: + | [ConnectablePort, ConnectablePort] + | [CallerPort, CalleePort] ): void { if (src instanceof CallerPort && dst instanceof CalleePort) { this.reactor._connectCall(src, dst); - } else if (src instanceof IOPort && dst instanceof IOPort) { - this.reactor._connect(src, dst); + } else if ( + src instanceof ConnectablePort && + dst instanceof ConnectablePort + ) { + this.reactor._connect(src.getPort(), dst.getPort()); } else { - // ERROR + throw Error( + "Logically unreachable code: src and dst type mismatch, Caller(ee) port cannot be connected to IOPort." + ); } } @@ -1805,10 +1821,13 @@ interface UtilityFunctions { } export interface MutationSandbox extends ReactionSandbox { - connect: ( - src: CallerPort | IOPort, - dst: CalleePort | IOPort - ) => void; + connect: { + (src: ConnectablePort, dst: ConnectablePort): void; + ( + src: CallerPort, + dst: CalleePort + ): void; + }; disconnect: (src: IOPort, dst?: IOPort) => void; From b20e9bd74b433b05ef95d494e4bd8038c4806b11 Mon Sep 17 00:00:00 2001 From: Kagamihara Nadeshiko Date: Fri, 22 Sep 2023 17:40:13 -0700 Subject: [PATCH 4/7] Adapt Benchmarks to the new ConnectablePort 1. Adapt Sieve by simply using ConnectablePort 2. Adapt Online Facility Location by using ConnectablePort *and* making necessary changes to reflect the actual logic, as discussed with @lhstrh. In this case, two sources: an OutPort (port A), and a mutation, wants to write to an OutPort (port B), but at different times. The correct thing to do, then, will be to make the mutation have its own OutPort (port C) to write to, and when it wants to write to port B, it disconnects A->B, makes connection C->B, and write to C. --- src/benchmark/FacilityLocation.ts | 106 +++++++++++++++++------------- src/benchmark/Sieve.ts | 9 ++- 2 files changed, 66 insertions(+), 49 deletions(-) diff --git a/src/benchmark/FacilityLocation.ts b/src/benchmark/FacilityLocation.ts index bf72e0bd..54eb637d 100644 --- a/src/benchmark/FacilityLocation.ts +++ b/src/benchmark/FacilityLocation.ts @@ -4,7 +4,7 @@ * * @author Hokeun Kim (hokeunkim@berkeley.edu) */ -import type {WritablePort} from "../core/internal"; +import type {IOPort} from "../core/internal"; import { Log, TimeValue, @@ -428,6 +428,7 @@ export class Quadrant extends Reactor { childrenBoundaries = new State(new Array()); totalCost = new State(0); + trollPort = new OutPort(this); constructor( parent: Reactor, @@ -525,14 +526,19 @@ export class Quadrant extends Reactor { this.writable(this.toSecondChild), this.writable(this.toThirdChild), this.writable(this.toFourthChild), - this.writable(this.toAccumulator), + this.toFirstChild.asConnectable(), + this.toSecondChild.asConnectable(), + this.toThirdChild.asConnectable(), + this.toFourthChild.asConnectable(), + this.toAccumulator.asConnectable(), this.localFacilities, this.knownFacilities, this.maxDepthOfKnownOpenFacility, this.supportCustomers, this.hasChildren, this.childrenBoundaries, - this.totalCost + this.totalCost, + this.writable(this.trollPort) ], function ( this, @@ -544,20 +550,26 @@ export class Quadrant extends Reactor { depth, fromProducer, toProducer, - toFirstChild, - toSecondChild, - toThirdChild, - toFourthChild, - toAccumulator, + toFirstChildW, + toSecondChildW, + toThirdChildW, + toFourthChildW, + toFirstChildC, + toSecondChildC, + toThirdChildC, + toFourthChildC, + toAccumulatorC, localFacilities, knownFacilities, maxDepthOfKnownOpenFacility, supportCustomers, hasChildren, childrenBoundaries, - totalCost + totalCost, + trollPort ) { const thisReactor = this.getReactor(); + const toAccumulatorUpstreams: Array> = []; // Helper functions for mutation reaction. const notifyParentOfFacility = function (p: Point): void { @@ -622,11 +634,12 @@ export class Quadrant extends Reactor { // console.log(`Children boundaries: ${childrenBoundaries.get()[0]}, ${childrenBoundaries.get()[1]}, ${childrenBoundaries.get()[2]}, ${childrenBoundaries.get()[3]}`) const accumulator = new Accumulator(thisReactor); - const toAccumulatorOfQuadrant = ( - toAccumulator as unknown as WritablePort - ).getPort(); // Connect Accumulator's output to Quadrant's output. - this.connect(accumulator.toNextAccumulator, toAccumulatorOfQuadrant); + toAccumulatorUpstreams.push(accumulator.toNextAccumulator); + this.connect( + accumulator.toNextAccumulator.asConnectable(), + toAccumulatorC + ); const firstChild = new Quadrant( thisReactor, @@ -640,11 +653,11 @@ export class Quadrant extends Reactor { maxDepthOfKnownOpenFacility.get(), Point.arrayClone(supportCustomers.get()) ); - const toFirstChildPort = ( - toFirstChild as unknown as WritablePort - ).getPort(); - this.connect(toFirstChildPort, firstChild.fromProducer); - this.connect(firstChild.toAccumulator, accumulator.fromFirstQuadrant); + this.connect(toFirstChildC, firstChild.fromProducer.asConnectable()); + this.connect( + firstChild.toAccumulator.asConnectable(), + accumulator.fromFirstQuadrant.asConnectable() + ); const secondChild = new Quadrant( thisReactor, @@ -658,13 +671,13 @@ export class Quadrant extends Reactor { maxDepthOfKnownOpenFacility.get(), Point.arrayClone(supportCustomers.get()) ); - const toSecondChildPort = ( - toSecondChild as unknown as WritablePort - ).getPort(); - this.connect(toSecondChildPort, secondChild.fromProducer); this.connect( - secondChild.toAccumulator, - accumulator.fromSecondQuadrant + toSecondChildC, + secondChild.fromProducer.asConnectable() + ); + this.connect( + secondChild.toAccumulator.asConnectable(), + accumulator.fromSecondQuadrant.asConnectable() ); const thirdChild = new Quadrant( @@ -679,11 +692,11 @@ export class Quadrant extends Reactor { maxDepthOfKnownOpenFacility.get(), Point.arrayClone(supportCustomers.get()) ); - const toThirdChildPort = ( - toThirdChild as unknown as WritablePort - ).getPort(); - this.connect(toThirdChildPort, thirdChild.fromProducer); - this.connect(thirdChild.toAccumulator, accumulator.fromThirdQuadrant); + this.connect(toThirdChildC, thirdChild.fromProducer.asConnectable()); + this.connect( + thirdChild.toAccumulator.asConnectable(), + accumulator.fromThirdQuadrant.asConnectable() + ); const fourthChild = new Quadrant( thisReactor, @@ -697,13 +710,13 @@ export class Quadrant extends Reactor { maxDepthOfKnownOpenFacility.get(), Point.arrayClone(supportCustomers.get()) ); - const toFourthChildPort = ( - toFourthChild as unknown as WritablePort - ).getPort(); - this.connect(toFourthChildPort, fourthChild.fromProducer); this.connect( - fourthChild.toAccumulator, - accumulator.fromFourthQuadrant + toFourthChildC, + fourthChild.fromProducer.asConnectable() + ); + this.connect( + fourthChild.toAccumulator.asConnectable(), + accumulator.fromFourthQuadrant.asConnectable() ); supportCustomers.set(new Array()); @@ -733,16 +746,16 @@ export class Quadrant extends Reactor { if (childrenBoundaries.get()[i].contains(point)) { switch (i) { case 0: - toFirstChild.set(msg); + toFirstChildW.set(msg); break; case 1: - toSecondChild.set(msg); + toSecondChildW.set(msg); break; case 2: - toThirdChild.set(msg); + toThirdChildW.set(msg); break; case 3: - toFourthChild.set(msg); + toFourthChildW.set(msg); break; } break; @@ -754,7 +767,12 @@ export class Quadrant extends Reactor { case RequestExitMsg: if (!hasChildren.get()) { // No children, number of facilities will be counted on parent's side. - toAccumulator.set( + toAccumulatorUpstreams.forEach((val) => { + this.disconnect(val, toAccumulatorC.getPort()); + }); + this.connect(trollPort.getPort().asConnectable(), toAccumulatorC); + + trollPort.set( new ConfirmExitMsg( 0, // facilities supportCustomers.get().length, // supportCustomers @@ -762,10 +780,10 @@ export class Quadrant extends Reactor { ) ); } else { - toFirstChild.set(msg); - toSecondChild.set(msg); - toThirdChild.set(msg); - toFourthChild.set(msg); + toFirstChildW.set(msg); + toSecondChildW.set(msg); + toThirdChildW.set(msg); + toFourthChildW.set(msg); } break; default: diff --git a/src/benchmark/Sieve.ts b/src/benchmark/Sieve.ts index 4b88776e..e64327b2 100644 --- a/src/benchmark/Sieve.ts +++ b/src/benchmark/Sieve.ts @@ -1,5 +1,4 @@ import { - type WritablePort, Parameter, InPort, OutPort, @@ -66,11 +65,12 @@ class Filter extends Reactor { [ this.inp, this.writable(this.out), + this.out.asConnectable(), this.startPrime, this.hasChild, this.localPrimes ], - function (this, inp, out, prime, hasChild, localPrimes) { + function (this, inp, outW, outC, prime, hasChild, localPrimes) { const p = inp.get(); if (p !== undefined) { const seen = localPrimes.get(); @@ -96,14 +96,13 @@ class Filter extends Reactor { // let x = this.create(Filter, [this.getReactor(), p]) // console.log("CREATED: " + x._getFullyQualifiedName()) // FIXME: weird hack. Maybe just accept writable ports as well? - const port = (out as unknown as WritablePort).getPort(); - this.connect(port, n.inp); + this.connect(outC, n.inp.asConnectable()); // FIXME: this updates the dependency graph, but it doesn't redo the topological sort // For a pipeline like this one, it is not necessary, but in general it is. // Can we avoid redoing the entire sort? hasChild.set(true); } else { - out.set(p); + outW.set(p); } } } From 02f9497bdb5d0021a65a9cf06e1e7a9ee6eb5e30 Mon Sep 17 00:00:00 2001 From: Kagamihara Nadeshiko Date: Tue, 26 Sep 2023 03:23:53 -0700 Subject: [PATCH 5/7] Added some tests for `ConnectablePort` Specifically ones that are related to causality graph and `ConnectablePort` - that is, declaring `ConnectablePort` in a mutation should not create dependency between them, but connection between ports should. --- __tests__/mutations.test.ts | 60 ++++++++++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/__tests__/mutations.test.ts b/__tests__/mutations.test.ts index 55c94964..59645c24 100644 --- a/__tests__/mutations.test.ts +++ b/__tests__/mutations.test.ts @@ -4,7 +4,8 @@ import { Timer, OutPort, InPort, - TimeValue + TimeValue, + IOPort } from "../src/core/internal"; class Source extends Reactor { @@ -189,3 +190,60 @@ describe("Creating reactors at runtime", function () { // }); // }); +describe("Test the result from refactor-canconnect: referencing ConnectablePort should not introduce any change in the causality graph", () => { + class InnocentReactor extends Reactor { + public inp = new InPort(this); + public outp = new OutPort(this); + public cip = new InPort(this); + public oip = new OutPort(this); + public child = new (class InnocentChild extends Reactor { + public oip = new OutPort(this); + constructor(parent: InnocentReactor) { + super(parent); + } + })(this); + + constructor(parent: TestApp) { + super(parent); + + this.addMutation( + [this.startup], + [ + this.inp, + this.writable(this.outp), + this.cip.asConnectable(), + this.oip.asConnectable(), + this.child.oip.asConnectable() + ], + function (this, a0, a1, a2, a3, a4) { + // This is current failing as we disallow direct feedthrough. Nevertheless I'm unsure why that is the case as of now? + // this.connect(a2, a3); + this.connect(a2, a4); + } + ); + } + } + + class TestApp extends App { + public child = new InnocentReactor(this); + constructor(done: () => void) { + super(undefined, undefined, undefined, () => { + const mut = this.child["_mutations"][1]; // M0 is the shutdown mutation; M1 is our mutation + const pg = this._getPrecedenceGraph(); + // child.oip should be an island in the causality graph + expect(pg.getUpstreamNeighbors(this.child.oip).size).toBe(0); + expect(pg.getDownstreamNeighbors(this.child.oip).size).toBe(0); + // The only dependency child.child.oip should have is child.cip + expect(pg.getUpstreamNeighbors(this.child.child.oip).size).toBe(1); + expect( + pg.getUpstreamNeighbors(this.child.child.oip).has(this.child.cip) + ).toBeTruthy(); + done(); + }); + } + } + test("test dependencies", (done) => { + const tapp = new TestApp(done); + tapp._start(); + }); +}); From cd36e80db11dbdb92f822a14db73db3a61bcda15 Mon Sep 17 00:00:00 2001 From: Kagamihara Nadeshiko Date: Wed, 13 Sep 2023 17:29:56 -0700 Subject: [PATCH 6/7] Added a bonus test for connection that reflects bugs in the current logic --- __tests__/connection.bonus.test.ts | 99 ++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 __tests__/connection.bonus.test.ts diff --git a/__tests__/connection.bonus.test.ts b/__tests__/connection.bonus.test.ts new file mode 100644 index 00000000..b63a7107 --- /dev/null +++ b/__tests__/connection.bonus.test.ts @@ -0,0 +1,99 @@ +import {App, InPort, OutPort, Reactor} from "../src/core/internal"; + +// Readers might wonder why this test case exist; +// This is mainly because in the past, we assume direct feedthrough is forbidden, and will not establish the connection if we try to do so. However, it is possible that such a thing happen without introducing any causality issue. +describe("Direct feedthrough without causality issue", () => { + class BigReactor extends App { + public children: SmallReactor; + + constructor() { + super(undefined, false, false); + this.children = new SmallReactor(this); + } + } + + class SmallReactor extends Reactor { + public inp: InPort; + public outp: OutPort; + + constructor(parent: Reactor) { + super(parent); + this.inp = new InPort(this); + this.outp = new OutPort(this); + this.addMutation( + [this.startup], + [this.inp, this.outp], + function (this, inp, outp) { + it("test", () => { + expect(this.getReactor().canConnect(inp, outp)).toBeFalsy(); + }); + } + ); + } + } + + const root = new BigReactor(); + root._start(); +}); + +describe("Causality loop that can't be detected by only checking local graph", () => { + class FeedThrougher extends Reactor { + public inp = new InPort(this); + public outp = new OutPort(this); + + constructor(parent: Reactor) { + super(parent); + this.addReaction( + [this.inp], + [this.inp, this.writable(this.outp)], + function (this, inp, outp) { + // nop troll + } + ); + } + } + + class Looper extends Reactor { + public leftChild = new FeedThrougher(this); + public rightChild = new FeedThrougher(this); + + public inp = new InPort(this); + public outp = new OutPort(this); + + constructor(parent: Reactor) { + super(parent); + this.addMutation( + [this.startup], + [ + this.inp.asConnectable(), + this.outp.asConnectable(), + this.leftChild.inp.asConnectable(), + this.leftChild.outp.asConnectable(), + this.rightChild.inp.asConnectable(), + this.rightChild.outp.asConnectable() + ], + function (this, inp, outp, inp_lc, outp_lc, inp_rc, outp_rc) { + this.connect(inp, inp_lc); + this.connect(outp_rc, outp); + } + ); + } + } + + class TestApp extends App { + public child = new Looper(this); + + constructor() { + super(undefined, undefined, undefined, () => {}); + this._connect(this.child.outp, this.child.inp); + it("Test a connection that would create zero delay loop cannot be made", () => { + expect( + this.canConnect(this.child.leftChild.outp, this.child.rightChild.inp) + ).toBeTruthy(); + }); + } + } + + const app = new TestApp(); + app._start(); +}); From d140fdca21e3ccbdb5a9b657046f4e2c5c4eb96b Mon Sep 17 00:00:00 2001 From: Kagamihara Nadeshiko Date: Mon, 2 Oct 2023 14:38:36 -0700 Subject: [PATCH 7/7] Change CanConnect logic: remove blanket direct feedthrough ban, and always check against the global precedence graph. --- __tests__/InvalidMutations.ts | 5 ----- src/core/reactor.ts | 35 ++++++++++++----------------------- 2 files changed, 12 insertions(+), 28 deletions(-) diff --git a/__tests__/InvalidMutations.ts b/__tests__/InvalidMutations.ts index 2345eb39..977cad09 100644 --- a/__tests__/InvalidMutations.ts +++ b/__tests__/InvalidMutations.ts @@ -38,11 +38,6 @@ class R1 extends Reactor { [this.in1], [this.in1, this.in2, this.out1, this.out2], function (this, __in1, __in2, __out1, __out2) { - test("expect error on creating creating direct feed through", () => { - expect(() => { - this.connect(__in2.asConnectable(), __out2.asConnectable()); - }).toThrowError("New connection introduces direct feed through."); - }); test("expect error when creating connection outside container", () => { expect(() => { this.connect(__out2.asConnectable(), __in2.asConnectable()); diff --git a/src/core/reactor.ts b/src/core/reactor.ts index 10929854..0d7da3b1 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -1169,34 +1169,23 @@ export abstract class Reactor extends Component { return CanConnectResult.RT_CONNECTION_OUTSIDE_CONTAINER; } - // Take the local graph and merge in all the causality interfaces - // of contained reactors. Then: - const graph = new PrecedenceGraph | Reaction>(); - graph.addAll(this._dependencyGraph); - - for (const r of this._getOwnReactors()) { - graph.addAll(r._getCausalityInterface()); + /** + * TODO (axmmisaka): The following code is commented for multiple reasons: + * The causality interface check is not fully implemented so new checks are failing + * Second, direct feedthrough itself would not cause any problem *per se*. + * To ensure there is no cycle, the safest way is to check against the global dependency graph. + */ + + let app = this as Reactor; + while (app._getContainer() !== app) { + app = app._getContainer(); } - - // Add the new edge. + const graph = app._getPrecedenceGraph(); graph.addEdge(src, dst); - - // 1) check for loops - const hasCycle = graph.hasCycle(); - if (hasCycle) { + if (graph.hasCycle()) { return CanConnectResult.RT_CYCLE; } - // 2) check for direct feed through. - // FIXME: This doesn't handle while direct feed thorugh cases. - if ( - src instanceof InPort && - dst instanceof OutPort && - dst.getContainer() === src.getContainer() - ) { - return CanConnectResult.RT_DIRECT_FEED_THROUGH; - } - return CanConnectResult.SUCCESS; } }