Skip to content
This repository has been archived by the owner on Aug 1, 2024. It is now read-only.

Commit

Permalink
fix ingress/egress reporting on outgoing data.
Browse files Browse the repository at this point in the history
  • Loading branch information
kasobol-msft committed May 1, 2020
1 parent 217d521 commit 77d2f2e
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 12 deletions.
44 changes: 35 additions & 9 deletions Lib/ClassLibraryCommon/Core/ByteCountingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,19 @@ internal class ByteCountingStream : Stream
{
private readonly Stream wrappedStream;
private readonly RequestResult requestObject;
private readonly bool reverseCapture;

/// <summary>
/// Initializes a new instance of the ByteCountingStream class with an expandable capacity initialized to zero.
/// </summary>
public ByteCountingStream(Stream wrappedStream, RequestResult requestObject)
public ByteCountingStream(Stream wrappedStream, RequestResult requestObject, bool reverseCapture = false)
: base()
{
CommonUtility.AssertNotNull("WrappedStream", wrappedStream);
CommonUtility.AssertNotNull("RequestObject", requestObject);
this.wrappedStream = wrappedStream;
this.requestObject = requestObject;
this.reverseCapture = reverseCapture;
}

public override bool CanRead
Expand Down Expand Up @@ -105,14 +107,14 @@ public override long Seek(long offset, SeekOrigin origin)
public override int Read(byte[] buffer, int offset, int count)
{
int read = this.wrappedStream.Read(buffer, offset, count);
this.requestObject.IngressBytes += read;
this.CaptureRead(read);
return read;
}

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
int read = await this.wrappedStream.ReadAsync(buffer, offset, count, cancellationToken);
this.requestObject.IngressBytes += read;
this.CaptureRead(read);
return read;
}

Expand All @@ -122,7 +124,7 @@ public override int ReadByte()

if (val != -1)
{
++this.requestObject.IngressBytes;
this.CaptureRead(1);
}

return val;
Expand Down Expand Up @@ -155,7 +157,7 @@ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, Asy
public override int EndRead(IAsyncResult asyncResult)
{
int read = this.wrappedStream.EndRead(asyncResult);
this.requestObject.IngressBytes += read;
this.CaptureRead(read);
return read;
}

Expand All @@ -171,7 +173,7 @@ public override int EndRead(IAsyncResult asyncResult)
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
IAsyncResult res = this.wrappedStream.BeginWrite(buffer, offset, count, callback, state);
this.requestObject.EgressBytes += count;
this.CaptureWrite(count);
return res;
}

Expand All @@ -187,19 +189,43 @@ public override void EndWrite(IAsyncResult asyncResult)
public override void Write(byte[] buffer, int offset, int count)
{
this.wrappedStream.Write(buffer, offset, count);
this.requestObject.EgressBytes += count;
this.CaptureWrite(count);
}

public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await this.wrappedStream.WriteAsync(buffer, offset, count, cancellationToken);
this.requestObject.EgressBytes += count;
this.CaptureWrite(count);
}

public override void WriteByte(byte value)
{
this.wrappedStream.WriteByte(value);
++this.requestObject.EgressBytes;
this.CaptureWrite(1);
}

private void CaptureWrite(int count)
{
if (reverseCapture)
{
this.requestObject.IngressBytes += count;
}
else
{
this.requestObject.EgressBytes += count;
}
}

private void CaptureRead(int count)
{
if (reverseCapture)
{
this.requestObject.EgressBytes += count;
}
else
{
this.requestObject.IngressBytes += count;
}
}

protected override void Dispose(bool disposing)
Expand Down
5 changes: 3 additions & 2 deletions Lib/Common/Core/Util/StreamExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,12 @@ private static int MinBytesToRead(long? val1, int val2)
/// </summary>
/// <param name="stream">A reference to the original stream</param>
/// <param name="result">An object that represents the result of a physical request.</param>
/// <param name="reverseCapture">A flag indicating that ingress/egress bytes should be capture in reverse.</param>
/// <returns></returns>
[DebuggerNonUserCode]
internal static Stream WrapWithByteCountingStream(this Stream stream, RequestResult result)
internal static Stream WrapWithByteCountingStream(this Stream stream, RequestResult result, bool reverseCapture=false)
{
return new ByteCountingStream(stream, result);
return new ByteCountingStream(stream, result, reverseCapture);
}
#endif

Expand Down
6 changes: 5 additions & 1 deletion Lib/Common/Shared/Protocol/HttpContentFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Microsoft.Azure.Storage.Shared.Protocol
{
using Microsoft.Azure.Storage.Core;
using Microsoft.Azure.Storage.Core.Executor;
using Microsoft.Azure.Storage.Core.Util;
using System;
using System.IO;
using System.Net.Http;
Expand All @@ -28,7 +29,10 @@ internal static class HttpContentFactory
public static HttpContent BuildContentFromStream<T>(Stream stream, long offset, long? length, Checksum checksum, RESTCommand<T> cmd, OperationContext operationContext)
{
stream.Seek(offset, SeekOrigin.Begin);


#if !(WINDOWS_RT || NETCORE)
stream = stream.WrapWithByteCountingStream(cmd.CurrentResult, true);
#endif
HttpContent retContent = new StreamContent(new NonCloseableStream(stream));
retContent.Headers.ContentLength = length;
if (checksum?.MD5 != null)
Expand Down
48 changes: 48 additions & 0 deletions Test/ClassLibraryCommon/Blob/BlobReadStreamTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,54 @@ public void BlockBlobReadStreamBasicTest()
}
}

[TestMethod]
[Description("Download a blob using CloudBlobStream With Ingress/Egress bytes tracking")]
[TestCategory(ComponentCategory.Blob)]
[TestCategory(TestTypeCategory.UnitTest)]
[TestCategory(SmokeTestCategory.NonSmoke)]
[TestCategory(TenantTypeCategory.DevStore), TestCategory(TenantTypeCategory.DevFabric), TestCategory(TenantTypeCategory.Cloud)]
public void BlockBlobReadStreamBasicWithIngressEgressBytesTest()
{
int bufferSize = 5 * 1024 * 1024;
byte[] buffer = GetRandomBuffer(bufferSize);
CloudBlobContainer container = GetRandomContainerReference();
try
{
container.Create();

CloudBlockBlob blob = container.GetBlockBlobReference("blob1");
using (MemoryStream wholeBlob = new MemoryStream(buffer))
{
OperationContext operationContext = new OperationContext();
blob.UploadFromStream(wholeBlob, null, null, operationContext);
Assert.AreEqual(bufferSize, operationContext.LastResult.EgressBytes);
Assert.AreEqual(0, operationContext.LastResult.IngressBytes);
}

using (MemoryStream wholeBlob = new MemoryStream(buffer))
{
OperationContext operationContext = new OperationContext();
using (Stream blobStream = blob.OpenRead(operationContext: operationContext))
{
TestHelper.AssertStreamsAreEqual(wholeBlob, blobStream);
}
long totalIngress = 0;
long totalEggress = 0;
foreach (var result in operationContext.RequestResults)
{
totalIngress += result.IngressBytes;
totalEggress += result.EgressBytes;
}
Assert.AreEqual(bufferSize, totalIngress);
Assert.AreEqual(0, totalEggress);
}
}
finally
{
container.DeleteIfExists();
}
}

[TestMethod]
[Description("Download a blob using CloudBlobStream")]
[TestCategory(ComponentCategory.Blob)]
Expand Down

0 comments on commit 77d2f2e

Please sign in to comment.