From db7c1fd1dca48744d876dd7c6179e6d96d2e482e Mon Sep 17 00:00:00 2001 From: Tim Miller Date: Sat, 18 Jan 2025 23:49:31 +0900 Subject: [PATCH 1/2] Address WebSocket Byte Array handling --- src/FishyFlip/ATWebSocketProtocol.cs | 64 +++++++++++++++++----------- src/FishyFlip/Models/FrameHeader.cs | 5 +++ 2 files changed, 43 insertions(+), 26 deletions(-) diff --git a/src/FishyFlip/ATWebSocketProtocol.cs b/src/FishyFlip/ATWebSocketProtocol.cs index 247ef636..853667ba 100644 --- a/src/FishyFlip/ATWebSocketProtocol.cs +++ b/src/FishyFlip/ATWebSocketProtocol.cs @@ -169,30 +169,46 @@ private void Dispose(bool disposing) private void HandleMessage(byte[] byteArray) { + if (byteArray.Length == 0) + { + this.logger?.LogDebug("WSS: ATError reading message. Empty byte array."); + return; + } + using var stream = new MemoryStream(byteArray); - CBORObject[]? objects = null; + CBORObject? frameHeaderCbor = null; + CBORObject? frameBodyCbor = null; try { - objects = CBORObject.ReadSequence(stream, new CBOREncodeOptions("useIndefLengthStrings=true;float64=true;allowduplicatekeys=true;allowEmpty=true")); + // Read the first 15 bytes to get the frame header. + frameHeaderCbor = CBORObject.Read(stream, new CBOREncodeOptions("useIndefLengthStrings=true;float64=true;allowduplicatekeys=true;allowEmpty=true")); + + if (stream.Position == stream.Length) + { + return; + } + + // Read the rest of the bytes to get the frame body. + frameBodyCbor = CBORObject.Read(stream, new CBOREncodeOptions("useIndefLengthStrings=true;float64=true;allowduplicatekeys=true;allowEmpty=true")); } catch (Exception e) { this.logger?.LogError(e, "WSS: ATError reading message."); - } - if (objects is null) - { - return; + if (frameHeaderCbor is not null) + { + this.logger?.LogDebug($"FrameHeader: {frameHeaderCbor.ToJSONString()}"); + } } - if (objects.Length != 2) + if (frameHeaderCbor is null || frameBodyCbor is null) { return; } var message = new SubscribeRepoMessage(); - var frameHeader = new FrameHeader(objects[0]); + var frameHeader = new FrameHeader(frameHeaderCbor); // this.logger?.LogDebug($"FrameHeader: {objects[0].ToJSONString()}"); message.Header = frameHeader; @@ -206,7 +222,7 @@ private void HandleMessage(byte[] byteArray) switch (frameType) { case "#commit": - var frameCommit = new FrameCommit(objects[1], this.logger); + var frameCommit = new FrameCommit(frameBodyCbor, this.logger); // this.logger?.LogDebug($"FrameBody: {objects[1].ToJSONString()}"); message.Commit = frameCommit; @@ -238,35 +254,35 @@ void HandleProgressStatus(CarProgressStatusEvent e) break; case "#handle": - var frameHandle = new FrameHandle(objects[1]); + var frameHandle = new FrameHandle(frameBodyCbor); message.Handle = frameHandle; break; case "#repoOp": - message.RepoOp = new FrameRepoOp(objects[1]); + message.RepoOp = new FrameRepoOp(frameBodyCbor); break; case "#info": - message.Info = new FrameInfo(objects[1]); + message.Info = new FrameInfo(frameBodyCbor); break; case "#tombstone": - message.Tombstone = new FrameTombstone(objects[1]); + message.Tombstone = new FrameTombstone(frameBodyCbor); break; case "#migrate": - message.Migrate = new FrameMigrate(objects[1]); + message.Migrate = new FrameMigrate(frameBodyCbor); break; case "#account": - message.Account = new FrameAccount(objects[1]); + message.Account = new FrameAccount(frameBodyCbor); break; case "#identity": - message.Identity = new FrameIdentity(objects[1]); + message.Identity = new FrameIdentity(frameBodyCbor); break; default: - this.logger?.LogDebug($"Unknown Frame: {objects[1].ToJSONString()}"); + this.logger?.LogDebug($"Unknown Frame: {frameBodyCbor.ToJSONString()}"); break; } break; case FrameHeaderOperation.Error: - var frameError = new FrameError(objects[1]); + var frameError = new FrameError(frameBodyCbor); message.Error = frameError; this.logger?.LogError($"WSS: ATError: {frameError.Message}"); this.CloseAsync(WebSocketCloseStatus.InternalServerError, frameError.Message ?? string.Empty).FireAndForgetSafeAsync(this.logger); @@ -286,25 +302,21 @@ private async Task ReceiveMessages(ClientWebSocket webSocket, CancellationToken try { #if NETSTANDARD - var result = - await webSocket.ReceiveAsync(new ArraySegment(receiveBuffer), token); + var result = await webSocket.ReceiveAsync(new ArraySegment(receiveBuffer), token); if (result is not { MessageType: WebSocketMessageType.Binary, EndOfMessage: true }) { continue; } - byte[] newArray = new byte[result.Count]; - Array.Copy(receiveBuffer, 0, newArray, 0, result.Count); + var newArray = receiveBuffer.AsSpan(0, result.Count).ToArray(); #else - var result = - await webSocket.ReceiveAsync(new Memory(receiveBuffer), token); + var result = await webSocket.ReceiveAsync(receiveBuffer, token); if (result is not { MessageType: WebSocketMessageType.Binary, EndOfMessage: true }) { continue; } - byte[] newArray = new byte[result.Count]; - Array.Copy(receiveBuffer, 0, newArray, 0, result.Count); + var newArray = receiveBuffer.AsSpan(0, result.Count).ToArray(); #endif Task.Run(() => this.HandleMessage(newArray)).FireAndForgetSafeAsync(this.logger); diff --git a/src/FishyFlip/Models/FrameHeader.cs b/src/FishyFlip/Models/FrameHeader.cs index 4496e1a1..96f57a92 100644 --- a/src/FishyFlip/Models/FrameHeader.cs +++ b/src/FishyFlip/Models/FrameHeader.cs @@ -15,6 +15,11 @@ public class FrameHeader /// The CBOR Object. public FrameHeader(CBORObject obj) { + if (obj.Count <= 0) + { + return; + } + this.Operation = (FrameHeaderOperation)(obj["op"]?.AsInt32() ?? 0); this.Type = obj["t"]?.AsString(); } From ac33bcfd9565bbd4b1696c16252d86bba2762881 Mon Sep 17 00:00:00 2001 From: Tim Miller Date: Sat, 18 Jan 2025 23:54:33 +0900 Subject: [PATCH 2/2] Original Sequence code was fine --- src/FishyFlip/ATWebSocketProtocol.cs | 46 +++++++++++----------------- 1 file changed, 18 insertions(+), 28 deletions(-) diff --git a/src/FishyFlip/ATWebSocketProtocol.cs b/src/FishyFlip/ATWebSocketProtocol.cs index 853667ba..ab72e31c 100644 --- a/src/FishyFlip/ATWebSocketProtocol.cs +++ b/src/FishyFlip/ATWebSocketProtocol.cs @@ -176,39 +176,29 @@ private void HandleMessage(byte[] byteArray) } using var stream = new MemoryStream(byteArray); - CBORObject? frameHeaderCbor = null; - CBORObject? frameBodyCbor = null; + CBORObject[]? objects = null; try { - // Read the first 15 bytes to get the frame header. - frameHeaderCbor = CBORObject.Read(stream, new CBOREncodeOptions("useIndefLengthStrings=true;float64=true;allowduplicatekeys=true;allowEmpty=true")); - - if (stream.Position == stream.Length) - { - return; - } - - // Read the rest of the bytes to get the frame body. - frameBodyCbor = CBORObject.Read(stream, new CBOREncodeOptions("useIndefLengthStrings=true;float64=true;allowduplicatekeys=true;allowEmpty=true")); + objects = CBORObject.ReadSequence(stream, new CBOREncodeOptions("useIndefLengthStrings=true;float64=true;allowduplicatekeys=true;allowEmpty=true")); } catch (Exception e) { this.logger?.LogError(e, "WSS: ATError reading message."); + } - if (frameHeaderCbor is not null) - { - this.logger?.LogDebug($"FrameHeader: {frameHeaderCbor.ToJSONString()}"); - } + if (objects is null) + { + return; } - if (frameHeaderCbor is null || frameBodyCbor is null) + if (objects.Length != 2) { return; } var message = new SubscribeRepoMessage(); - var frameHeader = new FrameHeader(frameHeaderCbor); + var frameHeader = new FrameHeader(objects[0]); // this.logger?.LogDebug($"FrameHeader: {objects[0].ToJSONString()}"); message.Header = frameHeader; @@ -222,7 +212,7 @@ private void HandleMessage(byte[] byteArray) switch (frameType) { case "#commit": - var frameCommit = new FrameCommit(frameBodyCbor, this.logger); + var frameCommit = new FrameCommit(objects[1], this.logger); // this.logger?.LogDebug($"FrameBody: {objects[1].ToJSONString()}"); message.Commit = frameCommit; @@ -254,35 +244,35 @@ void HandleProgressStatus(CarProgressStatusEvent e) break; case "#handle": - var frameHandle = new FrameHandle(frameBodyCbor); + var frameHandle = new FrameHandle(objects[1]); message.Handle = frameHandle; break; case "#repoOp": - message.RepoOp = new FrameRepoOp(frameBodyCbor); + message.RepoOp = new FrameRepoOp(objects[1]); break; case "#info": - message.Info = new FrameInfo(frameBodyCbor); + message.Info = new FrameInfo(objects[1]); break; case "#tombstone": - message.Tombstone = new FrameTombstone(frameBodyCbor); + message.Tombstone = new FrameTombstone(objects[1]); break; case "#migrate": - message.Migrate = new FrameMigrate(frameBodyCbor); + message.Migrate = new FrameMigrate(objects[1]); break; case "#account": - message.Account = new FrameAccount(frameBodyCbor); + message.Account = new FrameAccount(objects[1]); break; case "#identity": - message.Identity = new FrameIdentity(frameBodyCbor); + message.Identity = new FrameIdentity(objects[1]); break; default: - this.logger?.LogDebug($"Unknown Frame: {frameBodyCbor.ToJSONString()}"); + this.logger?.LogDebug($"Unknown Frame: {objects[1].ToJSONString()}"); break; } break; case FrameHeaderOperation.Error: - var frameError = new FrameError(frameBodyCbor); + var frameError = new FrameError(objects[1]); message.Error = frameError; this.logger?.LogError($"WSS: ATError: {frameError.Message}"); this.CloseAsync(WebSocketCloseStatus.InternalServerError, frameError.Message ?? string.Empty).FireAndForgetSafeAsync(this.logger);