Skip to content

Commit

Permalink
Queue and ResourceQueue issues (fix #202136) (#202190)
Browse files Browse the repository at this point in the history
  • Loading branch information
bpasero authored Jan 11, 2024
1 parent c6f71ba commit e34dc5f
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 50 deletions.
17 changes: 11 additions & 6 deletions src/vs/base/common/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -715,10 +715,9 @@ export class Limiter<T> implements ILimiter<T> {
}

dispose(): void {
// SEE https://github.com/microsoft/vscode/issues/202136
// this._isDisposed = true;
// this.outstandingPromises.length = 0; // stop further processing
// this._size = 0;
this._isDisposed = true;
this.outstandingPromises.length = 0; // stop further processing
this._size = 0;
this._onDrained.dispose();
}
}
Expand Down Expand Up @@ -792,7 +791,13 @@ export class ResourceQueue implements IDisposable {
return true;
}

queueFor(resource: URI, extUri: IExtUri = defaultExtUri): ILimiter<void> {
queueSize(resource: URI, extUri: IExtUri = defaultExtUri): number {
const key = extUri.getComparisonKey(resource);

return this.queues.get(key)?.size ?? 0;
}

queueFor(resource: URI, factory: ITask<Promise<void>>, extUri: IExtUri = defaultExtUri): Promise<void> {
const key = extUri.getComparisonKey(resource);

let queue = this.queues.get(key);
Expand Down Expand Up @@ -820,7 +825,7 @@ export class ResourceQueue implements IDisposable {
this.queues.set(key, queue);
}

return queue;
return queue.queue(factory);
}

private onDidQueueDrain(): void {
Expand Down
4 changes: 2 additions & 2 deletions src/vs/base/node/pfs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -380,11 +380,11 @@ function writeFile(path: string, data: Buffer, options?: IWriteFileOptions): Pro
function writeFile(path: string, data: Uint8Array, options?: IWriteFileOptions): Promise<void>;
function writeFile(path: string, data: string | Buffer | Uint8Array, options?: IWriteFileOptions): Promise<void>;
function writeFile(path: string, data: string | Buffer | Uint8Array, options?: IWriteFileOptions): Promise<void> {
return writeQueues.queueFor(URI.file(path), extUriBiasedIgnorePathCase).queue(() => {
return writeQueues.queueFor(URI.file(path), () => {
const ensuredOptions = ensureWriteOptions(options);

return new Promise((resolve, reject) => doWriteFileAndFlush(path, data, ensuredOptions, error => error ? reject(error) : resolve()));
});
}, extUriBiasedIgnorePathCase);
}

interface IWriteFileOptions {
Expand Down
30 changes: 12 additions & 18 deletions src/vs/base/test/common/async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,7 @@ suite('Async', () => {
});
});

// skipped because of https://github.com/microsoft/vscode/issues/202136
test.skip('stop processing on dispose', async function () {
test('stop processing on dispose', async function () {
const queue = new async.Queue();

let workCounter = 0;
Expand Down Expand Up @@ -762,21 +761,19 @@ suite('Async', () => {

await queue.whenDrained(); // returns immediately since empty

const r1Queue = queue.queueFor(URI.file('/some/path'));
let done1 = false;
queue.queueFor(URI.file('/some/path'), async () => { done1 = true; });
await queue.whenDrained(); // returns immediately since no work scheduled
assert.strictEqual(done1, true);

await queue.whenDrained(); // returns immediately since empty

const r2Queue = queue.queueFor(URI.file('/some/other/path'));

await queue.whenDrained(); // returns immediately since empty

assert.ok(r1Queue);
assert.ok(r2Queue);
assert.strictEqual(r1Queue, queue.queueFor(URI.file('/some/path'))); // same queue returned
let done2 = false;
queue.queueFor(URI.file('/some/other/path'), async () => { done2 = true; });
await queue.whenDrained(); // returns immediately since no work scheduled
assert.strictEqual(done2, true);

// schedule some work
const w1 = new async.DeferredPromise<void>();
r1Queue.queue(() => w1.p);
queue.queueFor(URI.file('/some/path'), () => w1.p);

let drained = false;
queue.whenDrained().then(() => drained = true);
Expand All @@ -785,14 +782,11 @@ suite('Async', () => {
await async.timeout(0);
assert.strictEqual(drained, true);

const r1Queue2 = queue.queueFor(URI.file('/some/path'));
assert.notStrictEqual(r1Queue, r1Queue2); // previous one got disposed after finishing

// schedule some work
const w2 = new async.DeferredPromise<void>();
const w3 = new async.DeferredPromise<void>();
r1Queue.queue(() => w2.p);
r2Queue.queue(() => w3.p);
queue.queueFor(URI.file('/some/path'), () => w2.p);
queue.queueFor(URI.file('/some/other/path'), () => w3.p);

drained = false;
queue.whenDrained().then(() => drained = true);
Expand Down
24 changes: 11 additions & 13 deletions src/vs/platform/files/common/fileService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -505,14 +505,14 @@ export class FileService extends Disposable implements IFileService {

private async doReadFileAtomic(provider: IFileSystemProviderWithFileReadWriteCapability | IFileSystemProviderWithOpenReadWriteCloseCapability | IFileSystemProviderWithFileReadStreamCapability, resource: URI, options?: IReadFileOptions, token?: CancellationToken): Promise<IFileContent> {
return new Promise<IFileContent>((resolve, reject) => {
this.writeQueue.queueFor(resource, this.getExtUri(provider).providerExtUri).queue(async () => {
this.writeQueue.queueFor(resource, async () => {
try {
const content = await this.doReadFile(provider, resource, options, token);
resolve(content);
} catch (error) {
reject(error);
}
});
}, this.getExtUri(provider).providerExtUri);
});
}

Expand Down Expand Up @@ -1087,17 +1087,15 @@ export class FileService extends Disposable implements IFileService {
// create parent folders
await this.mkdirp(targetProvider, this.getExtUri(targetProvider).providerExtUri.dirname(target));

// queue on the source to ensure atomic read
const sourceWriteQueue = this.writeQueue.queueFor(source, this.getExtUri(sourceProvider).providerExtUri);

// leverage `copy` method if provided and providers are identical
// queue on the source to ensure atomic read
if (sourceProvider === targetProvider && hasFileFolderCopyCapability(sourceProvider)) {
return sourceWriteQueue.queue(() => sourceProvider.copy(source, target, { overwrite: true }));
return this.writeQueue.queueFor(source, () => sourceProvider.copy(source, target, { overwrite: true }), this.getExtUri(sourceProvider).providerExtUri);
}

// otherwise copy via buffer/unbuffered and use a write queue
// on the source to ensure atomic operation as much as possible
return sourceWriteQueue.queue(() => this.doCopyFile(sourceProvider, source, targetProvider, target));
return this.writeQueue.queueFor(source, () => this.doCopyFile(sourceProvider, source, targetProvider, target), this.getExtUri(sourceProvider).providerExtUri);
}

//#endregion
Expand Down Expand Up @@ -1223,7 +1221,7 @@ export class FileService extends Disposable implements IFileService {
private readonly writeQueue = this._register(new ResourceQueue());

private async doWriteBuffered(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, options: IWriteFileOptions | undefined, readableOrStreamOrBufferedStream: VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream): Promise<void> {
return this.writeQueue.queueFor(resource, this.getExtUri(provider).providerExtUri).queue(async () => {
return this.writeQueue.queueFor(resource, async () => {

// open handle
const handle = await provider.open(resource, { create: true, unlock: options?.unlock ?? false });
Expand All @@ -1242,7 +1240,7 @@ export class FileService extends Disposable implements IFileService {
// close handle always
await provider.close(handle);
}
});
}, this.getExtUri(provider).providerExtUri);
}

private async doWriteStreamBufferedQueued(provider: IFileSystemProviderWithOpenReadWriteCloseCapability, handle: number, streamOrBufferedStream: VSBufferReadableStream | VSBufferReadableBufferedStream): Promise<void> {
Expand Down Expand Up @@ -1321,7 +1319,7 @@ export class FileService extends Disposable implements IFileService {
}

private async doWriteUnbuffered(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, options: IWriteFileOptions | undefined, bufferOrReadableOrStreamOrBufferedStream: VSBuffer | VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream): Promise<void> {
return this.writeQueue.queueFor(resource, this.getExtUri(provider).providerExtUri).queue(() => this.doWriteUnbufferedQueued(provider, resource, options, bufferOrReadableOrStreamOrBufferedStream));
return this.writeQueue.queueFor(resource, () => this.doWriteUnbufferedQueued(provider, resource, options, bufferOrReadableOrStreamOrBufferedStream), this.getExtUri(provider).providerExtUri);
}

private async doWriteUnbufferedQueued(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, options: IWriteFileOptions | undefined, bufferOrReadableOrStreamOrBufferedStream: VSBuffer | VSBufferReadable | VSBufferReadableStream | VSBufferReadableBufferedStream): Promise<void> {
Expand All @@ -1341,7 +1339,7 @@ export class FileService extends Disposable implements IFileService {
}

private async doPipeBuffered(sourceProvider: IFileSystemProviderWithOpenReadWriteCloseCapability, source: URI, targetProvider: IFileSystemProviderWithOpenReadWriteCloseCapability, target: URI): Promise<void> {
return this.writeQueue.queueFor(target, this.getExtUri(targetProvider).providerExtUri).queue(() => this.doPipeBufferedQueued(sourceProvider, source, targetProvider, target));
return this.writeQueue.queueFor(target, () => this.doPipeBufferedQueued(sourceProvider, source, targetProvider, target), this.getExtUri(targetProvider).providerExtUri);
}

private async doPipeBufferedQueued(sourceProvider: IFileSystemProviderWithOpenReadWriteCloseCapability, source: URI, targetProvider: IFileSystemProviderWithOpenReadWriteCloseCapability, target: URI): Promise<void> {
Expand Down Expand Up @@ -1387,15 +1385,15 @@ export class FileService extends Disposable implements IFileService {
}

private async doPipeUnbuffered(sourceProvider: IFileSystemProviderWithFileReadWriteCapability, source: URI, targetProvider: IFileSystemProviderWithFileReadWriteCapability, target: URI): Promise<void> {
return this.writeQueue.queueFor(target, this.getExtUri(targetProvider).providerExtUri).queue(() => this.doPipeUnbufferedQueued(sourceProvider, source, targetProvider, target));
return this.writeQueue.queueFor(target, () => this.doPipeUnbufferedQueued(sourceProvider, source, targetProvider, target), this.getExtUri(targetProvider).providerExtUri);
}

private async doPipeUnbufferedQueued(sourceProvider: IFileSystemProviderWithFileReadWriteCapability, source: URI, targetProvider: IFileSystemProviderWithFileReadWriteCapability, target: URI): Promise<void> {
return targetProvider.writeFile(target, await sourceProvider.readFile(source), { create: true, overwrite: true, unlock: false, atomic: false });
}

private async doPipeUnbufferedToBuffered(sourceProvider: IFileSystemProviderWithFileReadWriteCapability, source: URI, targetProvider: IFileSystemProviderWithOpenReadWriteCloseCapability, target: URI): Promise<void> {
return this.writeQueue.queueFor(target, this.getExtUri(targetProvider).providerExtUri).queue(() => this.doPipeUnbufferedToBufferedQueued(sourceProvider, source, targetProvider, target));
return this.writeQueue.queueFor(target, () => this.doPipeUnbufferedToBufferedQueued(sourceProvider, source, targetProvider, target), this.getExtUri(targetProvider).providerExtUri);
}

private async doPipeUnbufferedToBufferedQueued(sourceProvider: IFileSystemProviderWithFileReadWriteCapability, source: URI, targetProvider: IFileSystemProviderWithOpenReadWriteCloseCapability, target: URI): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion src/vs/workbench/api/common/extHostFileSystemConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export class ExtHostConsumerFileSystem {
// use shortcut
await that._proxy.$ensureActivation(uri.scheme);
await that.mkdirp(provider.impl, provider.extUri, provider.extUri.dirname(uri));
return await that._writeQueue.queueFor(uri).queue(() => Promise.resolve(provider.impl.writeFile(uri, content, { create: true, overwrite: true })));
return await that._writeQueue.queueFor(uri, () => Promise.resolve(provider.impl.writeFile(uri, content, { create: true, overwrite: true })));
} else {
return await that._proxy.$writeFile(uri, VSBuffer.wrap(content));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ export class TextFileEditorModelManager extends Disposable implements ITextFileE
// Resolve model to update (use a queue to prevent accumulation of resolves
// when the resolve actually takes long. At most we only want the queue
// to have a size of 2 (1 running resolve and 1 queued resolve).
const queue = this.modelResolveQueue.queueFor(model.resource);
if (queue.size <= 1) {
queue.queue(async () => {
const queueSize = this.modelResolveQueue.queueSize(model.resource);
if (queueSize <= 1) {
this.modelResolveQueue.queueFor(model.resource, async () => {
try {
await this.reload(model);
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,9 @@ export class StoredFileWorkingCopyManager<M extends IStoredFileWorkingCopyModel>
// Resolves a working copy to update (use a queue to prevent accumulation of
// resolve when the resolving actually takes long. At most we only want the
// queue to have a size of 2 (1 running resolve and 1 queued resolve).
const queue = this.workingCopyResolveQueue.queueFor(workingCopy.resource);
if (queue.size <= 1) {
queue.queue(async () => {
const queueSize = this.workingCopyResolveQueue.queueSize(workingCopy.resource);
if (queueSize <= 1) {
this.workingCopyResolveQueue.queueFor(workingCopy.resource, async () => {
try {
await this.reload(workingCopy);
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ class WorkingCopyBackupServiceImpl extends Disposable implements IWorkingCopyBac
return;
}

return this.ioOperationQueues.queueFor(backupResource).queue(async () => {
return this.ioOperationQueues.queueFor(backupResource, async () => {
if (token?.isCancellationRequested) {
return;
}
Expand Down Expand Up @@ -344,7 +344,7 @@ class WorkingCopyBackupServiceImpl extends Disposable implements IWorkingCopyBac
return;
}

return this.ioOperationQueues.queueFor(backupResource).queue(async () => {
return this.ioOperationQueues.queueFor(backupResource, async () => {
if (token?.isCancellationRequested) {
return;
}
Expand Down Expand Up @@ -386,7 +386,7 @@ class WorkingCopyBackupServiceImpl extends Disposable implements IWorkingCopyBac
private async resolveIdentifier(backupResource: URI, model: WorkingCopyBackupsModel): Promise<IWorkingCopyIdentifier | undefined> {
let res: IWorkingCopyIdentifier | undefined = undefined;

await this.ioOperationQueues.queueFor(backupResource).queue(async () => {
await this.ioOperationQueues.queueFor(backupResource, async () => {
if (!model.has(backupResource)) {
return; // require backup to be present
}
Expand Down Expand Up @@ -450,7 +450,7 @@ class WorkingCopyBackupServiceImpl extends Disposable implements IWorkingCopyBac

let res: IResolvedWorkingCopyBackup<T> | undefined = undefined;

await this.ioOperationQueues.queueFor(backupResource).queue(async () => {
await this.ioOperationQueues.queueFor(backupResource, async () => {
if (!model.has(backupResource)) {
return; // require backup to be present
}
Expand Down

0 comments on commit e34dc5f

Please sign in to comment.