Skip to content

Commit

Permalink
Merge pull request #113 from planetarium/release/13
Browse files Browse the repository at this point in the history
Use StateRootHash instead of BlockHash
  • Loading branch information
ipdae authored Aug 30, 2024
2 parents d072fd2 + 4e625a1 commit 70ee599
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 70 deletions.
13 changes: 10 additions & 3 deletions MarketService.Tests/RpcClientTest.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -11,6 +12,9 @@
using Libplanet.Crypto;
using Libplanet.Types.Assets;
using Libplanet.Mocks;
using Libplanet.Types.Blocks;
using Libplanet.Types.Evidence;
using Libplanet.Types.Tx;
using MagicOnion;
using MagicOnion.Server;
using MarketService.Models;
Expand Down Expand Up @@ -668,7 +672,11 @@ public TestClient(IOptions<RpcConfigOptions> options, ILogger<RpcClient> logger,
contextFactory)
{
Service = service;
Init = true;
var path = "../../../genesis";
var buffer = File.ReadAllBytes(path);
var dict = (Dictionary)new Codec().Decode(buffer);
var block = BlockMarshaler.UnmarshalBlock(dict);
receiver.Tip = block;
}
}

Expand Down Expand Up @@ -780,8 +788,7 @@ public UnaryResult<Dictionary<byte[], byte[]>> GetAgentStatesByBlockHash(

public UnaryResult<Dictionary<byte[], byte[]>> GetAgentStatesByStateRootHash(
byte[] stateRootHashBytes,
IEnumerable<byte[]> addressBytesList) =>
throw new NotImplementedException();
IEnumerable<byte[]> addressBytesList) => GetAgentStatesByBlockHash(stateRootHashBytes, addressBytesList);

public UnaryResult<Dictionary<byte[], byte[]>> GetAvatarStatesByBlockHash(
byte[] blockHashBytes,
Expand Down
Binary file added MarketService.Tests/genesis
Binary file not shown.
27 changes: 15 additions & 12 deletions MarketService/ProductWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,29 @@ public ProductWorker(ILogger<ProductWorker> logger, RpcClient client)

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_rpcClient.Init)
{
#pragma warning disable CS4014
_rpcClient.StartAsync(stoppingToken);
#pragma warning restore CS4014
}

while (true)
{
if (stoppingToken.IsCancellationRequested) stoppingToken.ThrowIfCancellationRequested();

var stopWatch = new Stopwatch();
_logger.LogInformation("[ProductWorker]Start sync product");
stopWatch.Start();

while (!_rpcClient.Init) await Task.Delay(100, stoppingToken);
var retry = 0;
while (_rpcClient.Tip?.Index == _rpcClient.PreviousTip?.Index)
{
await Task.Delay((5 - retry) * 1000, stoppingToken);
retry++;
if (retry >= 3)
{
throw new InvalidOperationException();
}
}

stopWatch.Start();

try
{
var hashBytes = await _rpcClient.GetBlockHashBytes();
var hashBytes = await _rpcClient.GetBlockStateRootHashBytes();
var crystalEquipmentGrindingSheet = await _rpcClient.GetSheet<CrystalEquipmentGrindingSheet>(hashBytes);
var crystalMonsterCollectionMultiplierSheet =
await _rpcClient.GetSheet<CrystalMonsterCollectionMultiplierSheet>(hashBytes);
Expand All @@ -50,8 +53,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

stopWatch.Stop();
var ts = stopWatch.Elapsed;
_logger.LogInformation("[ProductWorker]Complete sync product. {TotalElapsed}", ts);
await Task.Delay(1000, stoppingToken);
_logger.LogInformation("[ProductWorker]Complete sync product on {BlockIndex}. {TotalElapsed}", _rpcClient.Tip.Index, ts);
await Task.Delay(8000, stoppingToken);
}
}
}
1 change: 1 addition & 0 deletions MarketService/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public void ConfigureServices(IServiceCollection services)
);
services.AddSingleton<RpcClient>();
services.AddSingleton<Receiver>();
services.AddHostedService<RpcService>();
WorkerOptions workerOptions = new();
Configuration.GetSection(WorkerOptions.WorkerConfig)
.Bind(workerOptions);
Expand Down
17 changes: 10 additions & 7 deletions MarketService/Receiver.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
using Bencodex;
using Bencodex.Types;
using Libplanet.Types.Blocks;
using Nekoyume.Shared.Hubs;

namespace MarketService;

public class Receiver : IActionEvaluationHubReceiver
{
public Block Tip;
public Block PreviousTip;
private readonly ILogger<Receiver> _logger;
private readonly Codec _codec = new Codec();

public Receiver(ILogger<Receiver> logger)
{
Expand All @@ -13,40 +19,37 @@ public Receiver(ILogger<Receiver> logger)

public void OnRender(byte[] evaluation)
{
_logger.LogDebug("Start {Method}", nameof(OnRender));
}

public void OnUnrender(byte[] evaluation)
{
_logger.LogDebug("Start {Method}", nameof(OnUnrender));
}

public void OnRenderBlock(byte[] oldTip, byte[] newTip)
{
var dict = (Dictionary)_codec.Decode(newTip);
var newTipBlock = BlockMarshaler.UnmarshalBlock(dict);
PreviousTip = Tip;
Tip = newTipBlock;
}

public void OnReorged(byte[] oldTip, byte[] newTip, byte[] branchpoint)
{
_logger.LogDebug("Start {Method}", nameof(OnReorged));
}

public void OnReorgEnd(byte[] oldTip, byte[] newTip, byte[] branchpoint)
{
_logger.LogDebug("Start {Method}", nameof(OnReorgEnd));
}

public void OnException(int code, string message)
{
_logger.LogDebug("Start {Method}", nameof(OnException));
}

public void OnPreloadStart()
{
_logger.LogDebug("Start {Method}", nameof(OnPreloadStart));
}

public void OnPreloadEnd()
{
_logger.LogDebug("Start {Method}", nameof(OnPreloadEnd));
}
}
83 changes: 51 additions & 32 deletions MarketService/RpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,19 @@ public class RpcClient
private readonly ILogger<RpcClient> _logger;
private readonly Receiver _receiver;
private bool _ready;
private bool _selfDisconnect;

private readonly ParallelOptions _parallelOptions = new()
{
MaxDegreeOfParallelism = MaxDegreeOfParallelism
};

public IBlockChainService Service = null!;
private IActionEvaluationHub _hub;

public bool Ready => _ready;
public Block Tip => _receiver.Tip;
public Block PreviousTip => _receiver.PreviousTip;


public RpcClient(IOptions<RpcConfigOptions> options, ILogger<RpcClient> logger, Receiver receiver,
Expand All @@ -80,53 +84,67 @@ public RpcClient(IOptions<RpcConfigOptions> options, ILogger<RpcClient> logger,
_contextFactory = contextFactory;
}

public bool Init { get; protected set; }

public async Task StartAsync(CancellationToken stoppingToken)
{
while (true)
{
if (stoppingToken.IsCancellationRequested) stoppingToken.ThrowIfCancellationRequested();
if (stoppingToken.IsCancellationRequested)
{
_selfDisconnect = true;
stoppingToken.ThrowIfCancellationRequested();
}

try
{
var hub = await StreamingHubClient.ConnectAsync<IActionEvaluationHub, IActionEvaluationHubReceiver>(
_channel, _receiver, cancellationToken: stoppingToken);
_logger.LogDebug("Connected to hub");
Service = MagicOnionClient.Create<IBlockChainService>(_channel).WithCancellationToken(stoppingToken);
_logger.LogDebug("Connected to service");

await hub.JoinAsync(_address.ToHex());
await Service.AddClient(_address.ToByteArray());
_logger.LogInformation("Joined to RPC headless");
Init = true;
_ready = true;

_logger.LogDebug("Waiting for disconnecting");
await hub.WaitForDisconnect();
await Join(stoppingToken);
}
catch (Exception exception)
{
_logger.LogError(exception, "Error occurred");
_ready = false;
}
finally
if (_selfDisconnect)
{
_logger.LogDebug("Retry to connect again");
_logger.LogInformation("self disconnect");
break;
}
}
}

private async Task Join(CancellationToken stoppingToken)
{
_hub = await StreamingHubClient.ConnectAsync<IActionEvaluationHub, IActionEvaluationHubReceiver>(
_channel, _receiver, cancellationToken: stoppingToken);
_logger.LogDebug("Connected to hub");
Service = MagicOnionClient.Create<IBlockChainService>(_channel)
.WithCancellationToken(stoppingToken);
_logger.LogDebug("Connected to service");

await _hub.JoinAsync(_address.ToHex());
await Service.AddClient(_address.ToByteArray());
_logger.LogInformation("Joined to RPC headless");
_ready = true;

_logger.LogDebug("Waiting for disconnecting");
await _hub.WaitForDisconnect();
}

public async Task StopAsync(CancellationToken cancellationToken)
{
_selfDisconnect = true;
await _hub.LeaveAsync();
}

public async Task<List<OrderDigest>> GetOrderDigests(ItemSubType itemSubType, byte[] hashBytes)
{
while (!Init) await Task.Delay(100);
while (Tip is null) await Task.Delay(100);

var orderDigestList = new List<OrderDigest>();
try
{
var addressList = GetShopAddress(itemSubType);
var result =
await Service.GetBulkStateByBlockHash(hashBytes, ReservedAddresses.LegacyAccount.ToByteArray(), addressList);
await Service.GetBulkStateByStateRootHash(hashBytes, ReservedAddresses.LegacyAccount.ToByteArray(), addressList);
var shopStates = GetShopStates(result);
foreach (var shopState in shopStates)
foreach (var orderDigest in shopState.OrderDigestList)
Expand Down Expand Up @@ -363,7 +381,7 @@ public async Task SyncProduct(byte[] hashBytes, CrystalEquipmentGrindingSheet cr
CrystalMonsterCollectionMultiplierSheet crystalMonsterCollectionMultiplierSheet,
CostumeStatSheet costumeStatSheet)
{
while (!Init) await Task.Delay(100);
while (Tip is null) await Task.Delay(100);

try
{
Expand Down Expand Up @@ -523,7 +541,7 @@ public async Task InsertProducts(List<Product> products, CostumeStatSheet costum
public async Task<T> GetSheet<T>(byte[] hashBytes) where T : ISheet, new()
{
var address = Addresses.GetSheetAddress<T>();
var result = await Service.GetStateByBlockHash(
var result = await Service.GetStateByStateRootHash(
hashBytes,
ReservedAddresses.LegacyAccount.ToByteArray(),
address.ToByteArray());
Expand All @@ -537,12 +555,13 @@ public async Task InsertProducts(List<Product> products, CostumeStatSheet costum
throw new Exception();
}

public async Task<byte[]> GetBlockHashBytes()
public async Task<byte[]> GetBlockStateRootHashBytes()
{
var tipBytes = await Service.GetTip();
var block =
BlockMarshaler.UnmarshalBlock((Dictionary) _codec.Decode(tipBytes));
return block.Hash.ToByteArray();
while (Tip is null)
{
await Task.Delay(1000);
}
return _receiver.Tip.StateRootHash.ToByteArray();
}

public async Task<Dictionary<Guid, IValue>> GetProductStates(IEnumerable<Address> avatarAddressList,
Expand Down Expand Up @@ -651,7 +670,7 @@ await Parallel.ForEachAsync(chunks, _parallelOptions, async (chunk, token) =>
public async Task<Dictionary<Address, IValue>> GetStates(byte[] hashBytes, byte[] accountBytes, List<byte[]> addressList)
{
var result = new ConcurrentDictionary<Address, IValue>();
var queryResult = await Service.GetBulkStateByBlockHash(hashBytes, accountBytes, addressList);
var queryResult = await Service.GetBulkStateByStateRootHash(hashBytes, accountBytes, addressList);
queryResult
.AsParallel()
.WithDegreeOfParallelism(MaxDegreeOfParallelism)
Expand All @@ -672,7 +691,7 @@ public async Task<Dictionary<Address, IValue>> GetChunkedStates(byte[] hashBytes
.ToList();
await Parallel.ForEachAsync(chunks, _parallelOptions, async (chunk, token) =>
{
var queryResult = await Service.GetBulkStateByBlockHash(hashBytes, accountBytes, chunk);
var queryResult = await Service.GetBulkStateByStateRootHash(hashBytes, accountBytes, chunk);
foreach (var kv in queryResult) result[new Address(kv.Key)] = _codec.Decode(kv.Value);
});

Expand All @@ -682,7 +701,7 @@ await Parallel.ForEachAsync(chunks, _parallelOptions, async (chunk, token) =>
public async Task<Dictionary<Address, AgentState>> GetAgentStates(byte[] hashBytes, List<byte[]> addressList)
{
var result = new ConcurrentDictionary<Address, AgentState>();
var queryResult = await Service.GetAgentStatesByBlockHash(hashBytes, addressList);
var queryResult = await Service.GetAgentStatesByStateRootHash(hashBytes, addressList);
queryResult
.AsParallel()
.WithDegreeOfParallelism(MaxDegreeOfParallelism)
Expand All @@ -704,7 +723,7 @@ public async Task<Dictionary<Address, AgentState>> GetAgentStates(byte[] hashByt

public async Task<MarketState> GetMarket(byte[] hashBytes)
{
var marketResult = await Service.GetStateByBlockHash(
var marketResult = await Service.GetStateByStateRootHash(
hashBytes,
ReservedAddresses.LegacyAccount.ToByteArray(),
Addresses.Market.ToByteArray());
Expand Down
9 changes: 1 addition & 8 deletions MarketService/RpcNodeCheckService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,11 @@ public RpcNodeCheckService(RpcNodeHealthCheck healthCheck, RpcClient rpcClient)

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_rpcClient.Init)
{
#pragma warning disable CS4014
_rpcClient.StartAsync(stoppingToken);
#pragma warning restore CS4014
}

while (true)
{
if (stoppingToken.IsCancellationRequested) stoppingToken.ThrowIfCancellationRequested();

while (!_rpcClient.Init) await Task.Delay(100, stoppingToken);
while (_rpcClient.Tip is null) await Task.Delay(100, stoppingToken);
_healthCheck.ConnectCompleted = _rpcClient.Ready;
await Task.Delay(3000, stoppingToken);
}
Expand Down
21 changes: 21 additions & 0 deletions MarketService/RpcService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
namespace MarketService;

public class RpcService : IHostedService
{
private readonly RpcClient _rpcClient;

public RpcService(RpcClient rpcClient)
{
_rpcClient = rpcClient;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_ = _rpcClient.StartAsync(cancellationToken);
await Task.CompletedTask;
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await _rpcClient.StopAsync(cancellationToken);
}
}
Loading

0 comments on commit 70ee599

Please sign in to comment.