Skip to content

Commit

Permalink
fix: Improve request queue behavior #6 (#860)
Browse files Browse the repository at this point in the history
* Same send behavior for sendWhen=active and fastpoll

* remove sendWhen from endpoint.ts

* also remove defaultSendRequestWhen

* remove tests that are no longer needed, fix coverage, rename tests

* lint

* add defaultSendRequestWhen compatibility setter

* Revert "add defaultSendRequestWhen compatibility setter"

This reverts commit 8200aea.
  • Loading branch information
slugzero authored Jan 9, 2024
1 parent 0d636fc commit 746fafb
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 259 deletions.
6 changes: 2 additions & 4 deletions src/controller/helpers/request.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {SendRequestWhen, SendPolicy} from '../tstype';
import {SendPolicy} from '../tstype';
import * as Zcl from '../../zcl';

/* eslint-disable-next-line @typescript-eslint/no-explicit-any*/
Expand Down Expand Up @@ -34,16 +34,14 @@ class Request<Type = any> {
frame: Zcl.ZclFrame;
expires: number;
sendPolicy: SendPolicy;
sendWhen: SendRequestWhen;
private resolveQueue: Array<(value: Type) => void>;
private rejectQueue: Array <(error: Error) => void>;
private lastError: Error;
constructor (func: (frame: Zcl.ZclFrame) => Promise<Type>, frame: Zcl.ZclFrame, timeout: number,
sendWhen?: SendRequestWhen, sendPolicy?: SendPolicy, lastError?: Error,
sendPolicy?: SendPolicy, lastError?: Error,
resolve?:(value: Type) => void, reject?: (error: Error) => void) {
this.func = func;
this.frame = frame;
this.sendWhen = sendWhen ?? 'active',
this.expires = timeout + Date.now();
this.sendPolicy = sendPolicy ?? (typeof frame.getCommand !== 'function' ?
undefined : Request.defaultSendPolicy[frame.getCommand().ID]);
Expand Down
2 changes: 1 addition & 1 deletion src/controller/helpers/requestQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class RequestQueue extends Set<Request> {
`${this.size}, ${fastPolling})`);

for (const request of this) {
if (fastPolling || (request.sendWhen !== 'fastpoll' && request.sendPolicy !== 'bulk')) {
if (fastPolling || request.sendPolicy !== 'bulk') {
try {
const result = await request.send();
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): send success`);
Expand Down
64 changes: 20 additions & 44 deletions src/controller/model/device.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {KeyValue, DatabaseEntry, DeviceType, SendRequestWhen} from '../tstype';
import {KeyValue, DatabaseEntry, DeviceType} from '../tstype';
import {Events as AdapterEvents} from '../../adapter';
import ZclTransactionSequenceNumber from '../helpers/zclTransactionSequenceNumber';
import Endpoint from './endpoint';
Expand Down Expand Up @@ -53,7 +53,6 @@ class Device extends Entity {
private _skipDefaultResponse: boolean;
private _skipTimeResponse: boolean;
private _deleted: boolean;
private _defaultSendRequestWhen?: SendRequestWhen;
private _lastDefaultResponseSequenceNumber: number;
private _checkinInterval: number;
private _pendingRequestTimeout: number;
Expand Down Expand Up @@ -102,10 +101,6 @@ class Device extends Entity {
set skipDefaultResponse(skipDefaultResponse: boolean) {this._skipDefaultResponse = skipDefaultResponse;}
get skipTimeResponse(): boolean {return this._skipTimeResponse;}
set skipTimeResponse(skipTimeResponse: boolean) {this._skipTimeResponse = skipTimeResponse;}
get defaultSendRequestWhen(): SendRequestWhen {return this._defaultSendRequestWhen;}
set defaultSendRequestWhen(defaultSendRequestWhen: SendRequestWhen) {
this._defaultSendRequestWhen = defaultSendRequestWhen;
}
get checkinInterval(): number {return this._checkinInterval;}
get pendingRequestTimeout(): number {return this._pendingRequestTimeout;}
set pendingRequestTimeout(pendingRequestTimeout: number) {this._pendingRequestTimeout = pendingRequestTimeout;}
Expand Down Expand Up @@ -137,7 +132,7 @@ class Device extends Entity {
manufacturerID: number, endpoints: Endpoint[], manufacturerName: string,
powerSource: string, modelID: string, applicationVersion: number, stackVersion: number, zclVersion: number,
hardwareVersion: number, dateCode: string, softwareBuildID: string, interviewCompleted: boolean, meta: KeyValue,
lastSeen: number, defaultSendRequestWhen: SendRequestWhen, checkinInterval: number,
lastSeen: number, checkinInterval: number,
pendingRequestTimeout: number
) {
super();
Expand All @@ -162,7 +157,6 @@ class Device extends Entity {
this._skipTimeResponse = false;
this.meta = meta;
this._lastSeen = lastSeen;
this._defaultSendRequestWhen = defaultSendRequestWhen;
this._checkinInterval = checkinInterval;
this._pendingRequestTimeout = pendingRequestTimeout;
}
Expand Down Expand Up @@ -270,12 +264,12 @@ class Device extends Entity {
fastPollTimeout: 0,
};
debug.log(`check-in from ${this.ieeeAddr}: accepting fast-poll`);
await endpoint.command(frame.Cluster.ID, 'checkinRsp', payload, {sendWhen: 'immediate'});
await endpoint.command(frame.Cluster.ID, 'checkinRsp', payload, {sendPolicy: 'immediate'});

// This is a good time to read the checkin interval if we haven't stored it previously
if (this._checkinInterval === undefined) {
const pollPeriod =
await endpoint.read('genPollCtrl', ['checkinInterval'], {sendWhen: 'immediate'});
await endpoint.read('genPollCtrl', ['checkinInterval'], {sendPolicy: 'immediate'});
this._checkinInterval = pollPeriod.checkinInterval / 4; // convert to seconds
this.pendingRequestTimeout = this._checkinInterval * 1000; // milliseconds
debug.log(`Request Queue (${
Expand All @@ -285,14 +279,14 @@ class Device extends Entity {
// We *must* end fast-poll when we're done sending things. Otherwise
// we cause undue power-drain.
debug.log(`check-in from ${this.ieeeAddr}: stopping fast-poll`);
await endpoint.command(frame.Cluster.ID, 'fastPollStop', {}, {sendWhen: 'immediate'});
await endpoint.command(frame.Cluster.ID, 'fastPollStop', {}, {sendPolicy: 'immediate'});
} else {
const payload = {
startFastPolling: false,
fastPollTimeout: 0,
};
debug.log(`check-in from ${this.ieeeAddr}: declining fast-poll`);
await endpoint.command(frame.Cluster.ID, 'checkinRsp', payload, {sendWhen: 'immediate'});
await endpoint.command(frame.Cluster.ID, 'checkinRsp', payload, {sendPolicy: 'immediate'});
}
} catch (error) {
/* istanbul ignore next */
Expand Down Expand Up @@ -344,19 +338,6 @@ class Device extends Entity {
throw new Error('Cannot load device from group');
}

let defaultSendRequestWhen: SendRequestWhen = entry.defaultSendRequestWhen;
/* istanbul ignore next */
if (defaultSendRequestWhen == null) {
// Guess defaultSendRequestWhen based on old useImplicitCheckin/defaultSendWhenActive
if (entry.hasOwnProperty('useImplicitCheckin') && !entry.useImplicitCheckin) {
defaultSendRequestWhen = 'fastpoll';
} else if (entry.hasOwnProperty('defaultSendWhenActive') && entry.defaultSendWhenActive) {
defaultSendRequestWhen = 'active';
} else {
defaultSendRequestWhen = 'immediate';
}
}

// default: no timeout (messages expire immediately after first send attempt)
let pendingRequestTimeout = 0;
if((endpoints.filter((e): boolean => e.supportsInputCluster('genPollCtrl'))).length > 0) {
Expand All @@ -374,7 +355,7 @@ class Device extends Entity {
entry.id, entry.type, ieeeAddr, networkAddress, entry.manufId, endpoints,
entry.manufName, entry.powerSource, entry.modelId, entry.appVersion,
entry.stackVersion, entry.zclVersion, entry.hwVersion, entry.dateCode, entry.swBuildId,
entry.interviewCompleted, meta, entry.lastSeen || null, defaultSendRequestWhen, entry.checkinInterval,
entry.interviewCompleted, meta, entry.lastSeen || null, entry.checkinInterval,
pendingRequestTimeout
);
}
Expand All @@ -392,8 +373,7 @@ class Device extends Entity {
modelId: this.modelID, epList, endpoints, appVersion: this.applicationVersion,
stackVersion: this.stackVersion, hwVersion: this.hardwareVersion, dateCode: this.dateCode,
swBuildId: this.softwareBuildID, zclVersion: this.zclVersion, interviewCompleted: this.interviewCompleted,
meta: this.meta, lastSeen: this.lastSeen, defaultSendRequestWhen: this.defaultSendRequestWhen,
checkinInterval: this.checkinInterval
meta: this.meta, lastSeen: this.lastSeen, checkinInterval: this.checkinInterval
};
}

Expand Down Expand Up @@ -461,7 +441,7 @@ class Device extends Entity {
const device = new Device(
ID, type, ieeeAddr, networkAddress, manufacturerID, endpointsMapped, manufacturerName,
powerSource, modelID, undefined, undefined, undefined, undefined, undefined, undefined,
interviewCompleted, {}, null, 'immediate', undefined, 0
interviewCompleted, {}, null, undefined, 0
);

Entity.database.insert(device.toDatabaseEntry());
Expand Down Expand Up @@ -612,7 +592,7 @@ class Device extends Entity {
try {
const endpoint = Endpoint.create(1, undefined, undefined, [], [], this.networkAddress, this.ieeeAddr);
const result = await endpoint.read('genBasic', ['modelId', 'manufacturerName'],
{sendWhen: 'immediate'});
{sendPolicy: 'immediate'});
Object.entries(result)
.forEach((entry) => Device.ReportablePropertiesMapping[entry[0]].set(entry[1], this));
} catch (error) {
Expand Down Expand Up @@ -663,7 +643,7 @@ class Device extends Entity {
try {
let result: KeyValue;
try {
result = await endpoint.read('genBasic', [key], {sendWhen: 'immediate'});
result = await endpoint.read('genBasic', [key], {sendPolicy: 'immediate'});
} catch (error) {
// Reading attributes can fail for many reason, e.g. it could be that device rejoins
// while joining like in:
Expand All @@ -673,7 +653,7 @@ class Device extends Entity {
debug.log(`Interview - first ${item.key} retrieval attempt failed, ` +
`retrying after 10 seconds...`);
await Wait(10000);
result = await endpoint.read('genBasic', [key], {sendWhen: 'immediate'});
result = await endpoint.read('genBasic', [key], {sendPolicy: 'immediate'});
} else {
throw error;
}
Expand All @@ -698,14 +678,15 @@ class Device extends Entity {
for (const endpoint of this.endpoints.filter((e): boolean => e.supportsInputCluster('ssIasZone'))) {
debug.log(`Interview - IAS - enrolling '${this.ieeeAddr}' endpoint '${endpoint.ID}'`);

const stateBefore = await endpoint.read('ssIasZone', ['iasCieAddr', 'zoneState'], {sendWhen: 'immediate'});
const stateBefore = await endpoint.read(
'ssIasZone', ['iasCieAddr', 'zoneState'], {sendPolicy: 'immediate'});
debug.log(`Interview - IAS - before enrolling state: '${JSON.stringify(stateBefore)}'`);

// Do not enroll when device has already been enrolled
if (stateBefore.zoneState !== 1 || stateBefore.iasCieAddr !== coordinator.ieeeAddr) {
debug.log(`Interview - IAS - not enrolled, enrolling`);

await endpoint.write('ssIasZone', {'iasCieAddr': coordinator.ieeeAddr}, {sendWhen: 'immediate'});
await endpoint.write('ssIasZone', {'iasCieAddr': coordinator.ieeeAddr}, {sendPolicy: 'immediate'});
debug.log(`Interview - IAS - wrote iasCieAddr`);

// There are 2 enrollment procedures:
Expand All @@ -717,14 +698,14 @@ class Device extends Entity {
await Wait(500);
debug.log(`IAS - '${this.ieeeAddr}' sending enroll response (auto enroll)`);
const payload = {enrollrspcode: 0, zoneid: 23};
await endpoint.command('ssIasZone', 'enrollRsp', payload,
{disableDefaultResponse: true, sendWhen: 'immediate'});
await endpoint.command('ssIasZone', 'enrollRsp', payload,
{disableDefaultResponse: true, sendPolicy: 'immediate'});

let enrolled = false;
for (let attempt = 0; attempt < 20; attempt++) {
await Wait(500);
const stateAfter = await endpoint.read('ssIasZone', ['iasCieAddr', 'zoneState'],
{sendWhen: 'immediate'});
const stateAfter = await endpoint.read('ssIasZone', ['iasCieAddr', 'zoneState'],
{sendPolicy: 'immediate'});
debug.log(`Interview - IAS - after enrolling state (${attempt}): '${JSON.stringify(stateAfter)}'`);
if (stateAfter.zoneState === 1) {
enrolled = true;
Expand All @@ -749,14 +730,9 @@ class Device extends Entity {
for (const endpoint of this.endpoints.filter((e): boolean => e.supportsInputCluster('genPollCtrl'))) {
debug.log(`Interview - Poll control - binding '${this.ieeeAddr}' endpoint '${endpoint.ID}'`);
await endpoint.bind('genPollCtrl', coordinator.endpoints[0]);
const pollPeriod = await endpoint.read('genPollCtrl', ['checkinInterval'], {sendWhen: 'immediate'});
const pollPeriod = await endpoint.read('genPollCtrl', ['checkinInterval'], {sendPolicy: 'immediate'});
this._checkinInterval = pollPeriod.checkinInterval / 4; // convert to seconds
this.pendingRequestTimeout = this._checkinInterval * 1000; // milliseconds
if (pollPeriod.checkinInterval <= 2400) {// 10 minutes
this.defaultSendRequestWhen = 'fastpoll';
} else {
this.defaultSendRequestWhen = 'active';
}
}
} catch (error) {
/* istanbul ignore next */
Expand Down
24 changes: 17 additions & 7 deletions src/controller/model/endpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,21 +277,31 @@ class Endpoint extends Entity {
options.disableResponse, options.disableRecovery, options.srcEndpoint) as Promise<Type>;
}): Promise<Type> {
const logPrefix = `Request Queue (${this.deviceIeeeAddress}/${this.ID}): `;
const request = new Request(func, frame, this.getDevice().pendingRequestTimeout, options.sendWhen,
options.sendPolicy);

if(options.sendWhen) {
if ((options.sendWhen === 'immediate') && (this.getDevice().pendingRequestTimeout > 0)) {
debug.info (logPrefix
+ "sendWhen is deprecated. Interpreting sendwhen='immediate' as sendPolicy='immediate'");
options.sendPolicy = 'immediate';
} else {
debug.info (logPrefix + "sendWhen is deprecated and will be ignored.");
}
}

const request = new Request(func, frame, this.getDevice().pendingRequestTimeout, options.sendPolicy);

if (request.sendPolicy !== 'bulk') {
// Check if such a request is already in the queue and remove the old one(s) if necessary
this.pendingRequests.filter(request);
}

// send without queueing if sendWhen or sendPolicy is 'immediate' or if the device has no timeout set
if (request.sendWhen === 'immediate' || request.sendPolicy === 'immediate'
// send without queueing if sendPolicy is 'immediate' or if the device has no timeout set
if (request.sendPolicy === 'immediate'
|| !this.getDevice().pendingRequestTimeout) {
if (this.getDevice().defaultSendRequestWhen !=='immediate')
if (this.getDevice().pendingRequestTimeout > 0)
{
debug.info(logPrefix + `send ${frame.getCommand().name} request immediately ` +
`(sendWhen=${options.sendWhen})`);
`(sendPolicy=${options.sendPolicy})`);
}
return request.send();
}
Expand Down Expand Up @@ -820,7 +830,7 @@ class Endpoint extends Entity {
): Options {
const providedOptions = options || {};
return {
sendWhen: this.getDevice().defaultSendRequestWhen,
sendWhen: undefined,
timeout: 10000,
disableResponse: false,
disableRecovery: false,
Expand Down
Loading

0 comments on commit 746fafb

Please sign in to comment.