Skip to content

Commit

Permalink
Merge pull request #125 from planetarium/14-merge-main
Browse files Browse the repository at this point in the history
release/14 merge main
  • Loading branch information
U-lis authored Sep 23, 2024
2 parents a96c32b + 4bdf551 commit f4b7bca
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 76 deletions.
32 changes: 17 additions & 15 deletions MarketService.Tests/RpcClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Bencodex.Types;
using Grpc.Core;
using Lib9c.Model.Order;
using Lib9c.Renderers;
using Libplanet.Action.State;
using Libplanet.Crypto;
using Libplanet.Types.Assets;
Expand Down Expand Up @@ -326,11 +327,16 @@ public RpcClientTest(ITestOutputHelper output)
new DbContextOptionsBuilder<MarketContext>().UseNpgsql(_connectionString)
.UseLowerCaseNamingConvention().Options, new DbContextFactorySource<MarketContext>());
#pragma warning restore EF1001
var rpcConfigOptions = new RpcConfigOptions { Host = "localhost", Port = 5000 };
var receiver = new Receiver(new Logger<Receiver>(new LoggerFactory()));
var rpcConfigOptions = new RpcConfigOptions {Host = "localhost", Port = 5000};
var workerOptions = new WorkerOptions
{
SyncProduct = false,
SyncShop = false,
};
var receiver = new Receiver(new Logger<Receiver>(new LoggerFactory()), new ActionRenderer(), new OptionsWrapper<WorkerOptions>(workerOptions));
using var logger = _output.BuildLoggerFor<RpcClient>();
_client = new TestClient(new OptionsWrapper<RpcConfigOptions>(rpcConfigOptions),
logger, receiver, _contextFactory, _testService);
logger, receiver, _contextFactory, _testService, new ActionRenderer());
}

[Theory]
Expand Down Expand Up @@ -406,9 +412,7 @@ await _client.SyncOrder(null!, _crystalEquipmentGrindingSheet,
#pragma warning disable EF1001
var nextContext = await _contextFactory.CreateDbContextAsync(ct);
#pragma warning restore EF1001
var updatedProductModel = Assert.Single(nextContext.Products);
Assert.Equal(productModel.ProductId, updatedProductModel.ProductId);
Assert.False(updatedProductModel.Exist);
Assert.Empty(nextContext.Products);
}

[Theory]
Expand Down Expand Up @@ -485,11 +489,8 @@ await _client.SyncOrder(null!, _crystalEquipmentGrindingSheet,
#pragma warning disable EF1001
var nextContext = await _contextFactory.CreateDbContextAsync(ct);
#pragma warning restore EF1001
Assert.Equal(2, nextContext.Products.Count());
var oldProduct = nextContext.Products.Single(p => p.ProductId == order.OrderId);
Assert.Equal(1, oldProduct.Price);
Assert.False(oldProduct.Exist);
var newProduct = nextContext.Products.Single(p => p.ProductId == order2.OrderId);
Assert.Empty(nextContext.Products.Where(p => p.ProductId == order.OrderId));
var newProduct = Assert.Single(nextContext.Products);
Assert.Equal(2, newProduct.Price);
Assert.True(newProduct.Exist);
}
Expand Down Expand Up @@ -578,8 +579,9 @@ await _client.SyncProduct(null!, _crystalEquipmentGrindingSheet, _crystalMonster
var nextContext = await _contextFactory.CreateDbContextAsync(ct);
#pragma warning restore EF1001
var nextProducts = nextContext.Products.AsNoTracking().ToList();
Assert.Equal(90, nextProducts.Count);
Assert.Equal(90, nextProducts.Count(p => p.Exist));
Assert.Equal(10, nextProducts.Count(p => !p.Exist));
Assert.Equal(0, nextProducts.Count(p => !p.Exist));
}

[Fact]
Expand Down Expand Up @@ -677,8 +679,8 @@ public async Task UpdateProducts(bool legacy)
private class TestClient : RpcClient
{
public TestClient(IOptions<RpcConfigOptions> options, ILogger<RpcClient> logger, Receiver receiver,
IDbContextFactory<MarketContext> contextFactory, TestService service) : base(options, logger, receiver,
contextFactory)
IDbContextFactory<MarketContext> contextFactory, TestService service, ActionRenderer renderer) : base(options, logger, receiver,
contextFactory, renderer)
{
Service = service;
var path = "../../../genesis";
Expand Down Expand Up @@ -890,4 +892,4 @@ private void SetShopStates(ItemSubType itemSubType, OrderDigest orderDigest)
}
}
}
}
}
1 change: 1 addition & 0 deletions MarketService/MarketService.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\lib9c\Lib9c.MessagePack\Lib9c.MessagePack.csproj" />
<ProjectReference Include="..\lib9c\Lib9c\Lib9c.csproj" />
<ProjectReference Include="..\MarketService.Response\MarketService.Response.csproj" />
<ProjectReference Include="..\NineChronicles.RPC.Shared\NineChronicles.RPC.Shared\NineChronicles.RPC.Shared.csproj" />
Expand Down
2 changes: 1 addition & 1 deletion MarketService/ProductWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
stopWatch.Stop();
var ts = stopWatch.Elapsed;
_logger.LogInformation("[ProductWorker]Complete sync product on {BlockIndex}. {TotalElapsed}", _rpcClient.Tip.Index, ts);
await Task.Delay(8000, stoppingToken);
await Task.Delay(60000 * 5, stoppingToken);
}
}
}
54 changes: 35 additions & 19 deletions MarketService/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
using System.Text.Json.Serialization;
using Lib9c.Formatters;
using Lib9c.Renderers;
using MessagePack;
using MessagePack.Resolvers;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Diagnostics;

Expand Down Expand Up @@ -64,31 +68,43 @@ public void ConfigureServices(IServiceCollection services)
.UseLowerCaseNamingConvention()
.ConfigureWarnings(w => w.Throw(RelationalEventId.MultipleCollectionIncludeWarning))
);
services.AddSingleton<RpcClient>();
services.AddSingleton<Receiver>();
services.AddHostedService<RpcService>();
services.AddMvc()
.AddJsonOptions(
options => { options.JsonSerializerOptions.ReferenceHandler = ReferenceHandler.IgnoreCycles; }
);
var healthChecksBuilder = services.AddHealthChecks()
.AddDbContextCheck<MarketContext>();

WorkerOptions workerOptions = new();
Configuration.GetSection(WorkerOptions.WorkerConfig)
.Bind(workerOptions);
if (workerOptions.SyncShop)
var writeMode = workerOptions.SyncProduct || workerOptions.SyncShop;
if (writeMode)
{
services.AddHostedService<ShopWorker>();
}
services
.AddHostedService<RpcNodeCheckService>()
.AddSingleton<RpcNodeHealthCheck>();
services.AddSingleton<RpcClient>();
services.AddSingleton<Receiver>();
services.AddHostedService<RpcService>();
services.AddSingleton<ActionRenderer>();
var resolver = MessagePack.Resolvers.CompositeResolver.Create(
NineChroniclesResolver.Instance,
StandardResolver.Instance
);
var options = MessagePackSerializerOptions.Standard.WithResolver(resolver);
MessagePackSerializer.DefaultOptions = options;
if (workerOptions.SyncShop)
{
services.AddHostedService<ShopWorker>();
}

if (workerOptions.SyncProduct)
{
services.AddHostedService<ProductWorker>();
if (workerOptions.SyncProduct)
{
services.AddHostedService<ProductWorker>();
}
healthChecksBuilder.AddCheck<RpcNodeHealthCheck>(nameof(RpcNodeHealthCheck));
}
services.AddMvc()
.AddJsonOptions(
options => { options.JsonSerializerOptions.ReferenceHandler = ReferenceHandler.IgnoreCycles; }
);
services
.AddHostedService<RpcNodeCheckService>()
.AddSingleton<RpcNodeHealthCheck>();
services.AddHealthChecks()
.AddDbContextCheck<MarketContext>()
.AddCheck<RpcNodeHealthCheck>(nameof(RpcNodeHealthCheck));
}

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
Expand Down
29 changes: 28 additions & 1 deletion MarketService/Receiver.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
using System.IO.Compression;
using Bencodex;
using Bencodex.Types;
using Lib9c.Renderers;
using Libplanet.Types.Blocks;
using MessagePack;
using Microsoft.Extensions.Options;
using Nekoyume.Action;
using Nekoyume.Shared.Hubs;

namespace MarketService;
Expand All @@ -11,14 +16,36 @@ public class Receiver : IActionEvaluationHubReceiver
public Block PreviousTip;
private readonly ILogger<Receiver> _logger;
private readonly Codec _codec = new Codec();
private readonly ActionRenderer _actionRenderer;
private readonly WorkerOptions _workerOptions;

public Receiver(ILogger<Receiver> logger)
public Receiver(ILogger<Receiver> logger, ActionRenderer actionRenderer, IOptions<WorkerOptions> options)
{
_logger = logger;
_actionRenderer = actionRenderer;
_workerOptions = options.Value;
}

public void OnRender(byte[] evaluation)
{
if (_workerOptions.SyncProduct || _workerOptions.SyncShop)
{
using (var cp = new MemoryStream(evaluation))
{
using (var decompressed = new MemoryStream())
{
using (var df = new DeflateStream(cp, CompressionMode.Decompress))
{
df.CopyTo(decompressed);
decompressed.Seek(0, SeekOrigin.Begin);
var dec = decompressed.ToArray();
var ev = MessagePackSerializer.Deserialize<NCActionEvaluation>(dec)
.ToActionEvaluation();
_actionRenderer.ActionRenderSubject.OnNext(ev);
}
}
}
}
}

public void OnUnrender(byte[] evaluation)
Expand Down
Loading

0 comments on commit f4b7bca

Please sign in to comment.