Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce whitelist filtering #22

Merged
merged 19 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions Blumchen.sln
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Microsoft Visual Studio Solution File, Format Version 12.00

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.9.34622.214
MinimumVisualStudioVersion = 10.0.40219.1
Expand All @@ -11,8 +12,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Blumchen", "src\Blumchen\Bl
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{D0AB0FF4-C8A1-4B4B-A682-64F353A2D248}"
ProjectSection(SolutionItems) = preProject
src\Directory.Build.props = src\Directory.Build.props
.github\workflows\build.dotnet.yml = .github\workflows\build.dotnet.yml
src\Directory.Build.props = src\Directory.Build.props
.github\workflows\publish-nuget.yml = .github\workflows\publish-nuget.yml
EndProjectSection
EndProject
Expand All @@ -31,15 +32,17 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tests", "src\Tests\Tests.cs
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "pgAdmin", "pgAdmin", "{C050E9E8-3FB6-4581-953F-31826E385FB4}"
ProjectSection(SolutionItems) = preProject
docker\pgAdmin\servers.json = docker\pgAdmin\servers.json
docker\pgAdmin\pgpass = docker\pgAdmin\pgpass
docker\pgAdmin\servers.json = docker\pgAdmin\servers.json
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "postgres", "postgres", "{8AAAA344-B5FD-48D9-B2BA-379E374448D4}"
ProjectSection(SolutionItems) = preProject
docker\postgres\init.sql = docker\postgres\init.sql
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "demo", "demo", "{A4044484-FE08-4399-8239-14AABFA30AD7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -70,11 +73,14 @@ Global
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {9A868C51-0460-4700-AF33-E1A921192614}
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{06A60918-32EB-4C38-B369-7E1D76B809A0} = {A4044484-FE08-4399-8239-14AABFA30AD7}
{F2878625-0919-4C26-8DC9-58CD8FA34050} = {A4044484-FE08-4399-8239-14AABFA30AD7}
{F81E2D5B-FC59-4396-A911-56BE65E4FE80} = {A4044484-FE08-4399-8239-14AABFA30AD7}
{C050E9E8-3FB6-4581-953F-31826E385FB4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5}
{8AAAA344-B5FD-48D9-B2BA-379E374448D4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {9A868C51-0460-4700-AF33-E1A921192614}
EndGlobalSection
EndGlobal
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Main logic is placed in [EventsSubscription](./src/Blumchen/Subscriptions/Subscr
```shell
docker-compose up
```
2. Run(order doesn't matter) Publisher and Subscriber apps from vs-studio and follow Publisher instructions.
2. Run(order doesn't matter) Publisher and Subscriber apps, under 'demo' folder, from vs-studio, and follow Publisher instructions.

## Testing (against default docker instance)

Expand Down
13 changes: 11 additions & 2 deletions src/Blumchen/Blumchen.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<VersionPrefix>0.0.1</VersionPrefix>
<VersionPrefix>0.1.0</VersionPrefix>
<TargetFramework>net8.0</TargetFramework>
<GenerateAssemblyTitleAttribute>true</GenerateAssemblyTitleAttribute>
<GenerateAssemblyDescriptionAttribute>true</GenerateAssemblyDescriptionAttribute>
Expand All @@ -25,9 +25,18 @@
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<RootNamespace>Blumchen</RootNamespace>
</PropertyGroup>
<ItemGroup>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>Tests</_Parameter1>
</AssemblyAttribute>
</ItemGroup>

<ItemGroup>
<PackageReference Include="JetBrains.Annotations" Version="2023.3.0" />
<PackageReference Include="JetBrains.Annotations" Version="2023.3.0" >
<PrivateAssets>all</PrivateAssets>
<ExcludeAssets>none</ExcludeAssets>
<IncludeAssets>all</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1" />
<PackageReference Include="Npgsql" Version="8.0.3" />
</ItemGroup>
Expand Down
53 changes: 28 additions & 25 deletions src/Blumchen/Database/Run.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,17 @@ namespace Blumchen.Database;

public static class Run
{
public static async Task Execute(
private static async Task Execute(
this NpgsqlDataSource dataSource,
string sql,
CancellationToken ct)
{
await using var command = dataSource.CreateCommand(sql);
await command.ExecuteNonQueryAsync(ct);
await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
}

public static async Task EnsureTableExists(this NpgsqlDataSource dataSource, string tableName, CancellationToken ct)
{
var sql = @$"
CREATE TABLE IF NOT EXISTS {tableName} (
id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
message_type VARCHAR(250) NOT NULL,
data JSONB NOT NULL
);
";
await dataSource.Execute(sql, ct);
}
public static async Task EnsureTableExists(this NpgsqlDataSource dataSource, TableDescriptorBuilder.MessageTable tableDescriptor, CancellationToken ct)
=> await dataSource.Execute(tableDescriptor.ToString(), ct).ConfigureAwait(false);

public static async Task<bool> Exists(
this NpgsqlDataSource dataSource,
Expand All @@ -38,31 +29,43 @@ public static async Task<bool> Exists(
object[] parameters,
CancellationToken ct)
{
await using var command = dataSource.CreateCommand(
var command = dataSource.CreateCommand(
$"SELECT EXISTS(SELECT 1 FROM {table} WHERE {where})"
);
await using var command1 = command.ConfigureAwait(false);
foreach (var parameter in parameters) command.Parameters.AddWithValue(parameter);

return ((await command.ExecuteScalarAsync(ct)) as bool?) == true;
return await command.ExecuteScalarAsync(ct).ConfigureAwait(false) as bool? == true;
}

internal static async IAsyncEnumerable<IEnvelope> QueryTransactionSnapshot(
this NpgsqlConnection connection,
internal static async IAsyncEnumerable<IEnvelope> QueryTransactionSnapshot(this NpgsqlConnection connection,
string snapshotName,
string tableName,
TableDescriptorBuilder.MessageTable tableDescriptor,
ISet<string> registeredTypesKeys,
IReplicationDataMapper dataMapper,
[EnumeratorCancellation] CancellationToken ct)
{
await using var transaction = await connection.BeginTransactionAsync(IsolationLevel.RepeatableRead, ct);
var transaction = await connection.BeginTransactionAsync(IsolationLevel.RepeatableRead, ct).ConfigureAwait(false);
await using var transaction1 = transaction.ConfigureAwait(false);

await using var command =
var command =
new NpgsqlCommand($"SET TRANSACTION SNAPSHOT '{snapshotName}';", connection, transaction);
await command.ExecuteScalarAsync(ct);
await using var command1 = command.ConfigureAwait(false);
await command.ExecuteScalarAsync(ct).ConfigureAwait(false);
var whereClause = registeredTypesKeys.Count > 0
? $" WHERE {tableDescriptor.MessageType.Name} IN({PublicationFilter(registeredTypesKeys)})"
: null;
var cmd = new NpgsqlCommand($"SELECT * FROM {tableDescriptor.Name}{whereClause}", connection, transaction);
await using var cmd1 = cmd.ConfigureAwait(false);
var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false);
await using var reader1 = reader.ConfigureAwait(false);

await using var cmd = new NpgsqlCommand($"SELECT * FROM {tableName}", connection, transaction);
await using var reader = await cmd.ExecuteReaderAsync(ct);
while (await reader.ReadAsync(ct).ConfigureAwait(false))
yield return await dataMapper.ReadFromSnapshot(reader, ct).ConfigureAwait(false);
yield break;

while (await reader.ReadAsync(ct))
yield return await dataMapper.ReadFromSnapshot(reader, ct);
static string PublicationFilter(ICollection<string> input) => string.Join(", ", input.Select(s => $"'{s}'"));
}
}


76 changes: 76 additions & 0 deletions src/Blumchen/MessageTableOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using Blumchen.Subscriptions;
using NpgsqlTypes;

namespace Blumchen;

#pragma warning disable CS1591
public record TableDescriptorBuilder
{
private MessageTable TableDescriptor { get; set; } = new();

public MessageTable Build() => TableDescriptor.Build();

public TableDescriptorBuilder Name(string eventsTable)
{
TableDescriptor = new MessageTable(eventsTable);
return this;
}

public TableDescriptorBuilder Id(string name)
{
TableDescriptor = TableDescriptor with { Id = new Column.Id(name) };
return this;
}

public TableDescriptorBuilder MessageData(string name, MimeType mime)
{
TableDescriptor = TableDescriptor with { Data = new Column.Data(name), MimeType = mime };
return this;
}

public TableDescriptorBuilder MessageType(string name, int dimension = 250)
{
TableDescriptor = TableDescriptor with { MessageType = new Column.MessageType(name, dimension) };
return this;
}

public record MessageTable(string Name = MessageTable.DefaultName)
{
internal const string DefaultName = "outbox";
public Column.Id Id { get; internal init; } = Column.Id.Default();
public Column.MessageType MessageType { get; internal init; } = Column.MessageType.Default();
public Column.Data Data { get; internal init; } = Column.Data.Default();
public MimeType MimeType { get; internal init; } = new MimeType.Json();
public MessageTable Build() => this;

public override string ToString() => @$"
CREATE TABLE IF NOT EXISTS {Name} (
{Id} PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
{MessageType} NOT NULL,
{Data} NOT NULL
);";
}

public record Column(string Name, NpgsqlDbType Type)
{
public override string ToString() => $"{Name} {Type}";

public record Id(string Name): Column(Name, NpgsqlDbType.Bigint)
{
public override string ToString() => base.ToString();
internal static readonly Func<Id> Default = () => new("id");
}

public record MessageType(string Name, int Dimension): Column(Name, NpgsqlDbType.Varchar)
{
internal static readonly Func<MessageType> Default = () => new("message_type", 250);
public override string ToString() => $"{base.ToString()}({Dimension})";
}

public record Data(string Name): Column(Name, NpgsqlDbType.Jsonb)
{
internal static readonly Func<Data> Default = () => new("data");
public override string ToString() => base.ToString();
}
}
}
87 changes: 87 additions & 0 deletions src/Blumchen/Publications/MessageAppender.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using System.Collections;
using Blumchen.Serialization;
using Npgsql;

namespace Blumchen.Publications;
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

public static class MessageAppender
{
public static async Task AppendAsync<T>(T @input
, (TableDescriptorBuilder.MessageTable tableDescriptor, IJsonTypeResolver jsonTypeResolver) resolver
, NpgsqlConnection connection
, NpgsqlTransaction transaction
, CancellationToken ct
) where T : class
{
switch (@input)
{
case null:
throw new ArgumentNullException(nameof(@input));
case IEnumerable inputs:
await AppendBatchAsyncOfT(inputs, resolver.tableDescriptor, resolver.jsonTypeResolver, connection, transaction, ct).ConfigureAwait(false);
break;
default:
await AppendAsyncOfT(input, resolver.tableDescriptor, resolver.jsonTypeResolver, connection, transaction, ct).ConfigureAwait(false);
break;
}
}

private static async Task AppendAsyncOfT<T>(T input
, TableDescriptorBuilder.MessageTable tableDescriptor
, IJsonTypeResolver typeResolver
, NpgsqlConnection connection
, NpgsqlTransaction transaction
, CancellationToken ct) where T : class
{
var (typeName, jsonTypeInfo) = typeResolver.Resolve(typeof(T));
var data = JsonSerialization.ToJson(@input, jsonTypeInfo);
var command = new NpgsqlCommand(
$"INSERT INTO {tableDescriptor.Name}({tableDescriptor.MessageType.Name}, {tableDescriptor.Data.Name}) values ('{typeName}', '{data}')",
connection,
transaction
);
await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
}

public static async Task AppendAsync<T>(T input
, (TableDescriptorBuilder.MessageTable tableDescriptor, IJsonTypeResolver resolver) options
, string connectionString
, CancellationToken ct)
where T: class
{
var type = typeof(T);
var (typeName, jsonTypeInfo) = options.resolver.Resolve(type);
var data = JsonSerialization.ToJson(input, jsonTypeInfo);

var connection = new NpgsqlConnection(connectionString);
await using var connection1 = connection.ConfigureAwait(false);
await connection.OpenAsync(ct).ConfigureAwait(false);
var command = connection.CreateCommand();
command.CommandText =
$"INSERT INTO {options.tableDescriptor.Name}({options.tableDescriptor.MessageType.Name}, {options.tableDescriptor.Data.Name}) values ('{typeName}', '{data}')";
await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
}

private static async Task AppendBatchAsyncOfT<T>(T inputs
, TableDescriptorBuilder.MessageTable tableDescriptor
, IJsonTypeResolver resolver
, NpgsqlConnection connection
, NpgsqlTransaction transaction
, CancellationToken ct) where T : class, IEnumerable
{
var batch = new NpgsqlBatch(connection, transaction);
foreach (var input in inputs)
{
var (typeName, jsonTypeInfo) = resolver.Resolve(input.GetType());
var batchCommand = batch.CreateBatchCommand();
var data = JsonSerialization.ToJson(input, jsonTypeInfo);


batchCommand.CommandText =
$"INSERT INTO {tableDescriptor.Name}({tableDescriptor.MessageType.Name}, {tableDescriptor.Data.Name}) values ('{typeName}', '{data}')";
batch.BatchCommands.Add(batchCommand);
}
await batch.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
}
}
54 changes: 54 additions & 0 deletions src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using System.Text.Json.Serialization;
using Blumchen.Serialization;
using JetBrains.Annotations;
using static Blumchen.TableDescriptorBuilder;

namespace Blumchen.Publications;

#pragma warning disable CS1591
public class PublisherSetupOptionsBuilder
{
private INamingPolicy? _namingPolicy;
private JsonSerializerContext? _jsonSerializerContext;
private static readonly TableDescriptorBuilder TableDescriptorBuilder = new();
private MessageTable? _tableDescriptor;

[UsedImplicitly]
public PublisherSetupOptionsBuilder NamingPolicy(INamingPolicy namingPolicy)
{
_namingPolicy = namingPolicy;
return this;
}

[UsedImplicitly]
public PublisherSetupOptionsBuilder JsonContext(JsonSerializerContext jsonSerializerContext)
{
_jsonSerializerContext = jsonSerializerContext;
return this;
}

[UsedImplicitly]
public PublisherSetupOptionsBuilder WithTable(Func<TableDescriptorBuilder, TableDescriptorBuilder> builder)
{
_tableDescriptor = builder(TableDescriptorBuilder).Build();
return this;
}

public (MessageTable tableDescriptor, IJsonTypeResolver jsonTypeResolver) Build()
{
ArgumentNullException.ThrowIfNull(_jsonSerializerContext);
ArgumentNullException.ThrowIfNull(_namingPolicy);

_tableDescriptor ??= TableDescriptorBuilder.Build();
var jsonTypeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy);
using var typeEnum = _jsonSerializerContext.GetType()
.GetCustomAttributesData()
.Where(attributeData => attributeData.AttributeType == typeof(JsonSerializableAttribute))
.Select(att => att.ConstructorArguments.Single())
.Select(ca => ca.Value).OfType<Type>().GetEnumerator();
while (typeEnum.MoveNext())
jsonTypeResolver.WhiteList(typeEnum.Current);

return (_tableDescriptor,jsonTypeResolver);
}
}
Loading
Loading