Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixes and API client experience enhancements #28

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,18 @@ public CounterController(ICounterState counterState, IRaftClientRequestHandler c
[ProducesResponseType(typeof(int), StatusCodes.Status200OK)]
public async Task<int?> Increment(CancellationToken cancellationToken)
{
var result = await _clientRequestHandler.OnClientRequest(new IncrementCounterCommand(), cancellationToken);
return (int?)result;
int? result = null;
try
{
result = (int?)await _clientRequestHandler.OnClientRequest(new IncrementCounterCommand(), cancellationToken);
result ??= -1;
}
catch (Exception ex)
{
throw;
}

return result;
}

[HttpPost("[action]")]
Expand All @@ -46,4 +56,20 @@ public CounterController(ICounterState counterState, IRaftClientRequestHandler c
var result = await _clientRequestHandler.OnClientRequest(new ResetCounterCommand(), cancellationToken);
return (int?)result;
}

[HttpPost("[action]")]
[ProducesResponseType(typeof(bool), StatusCodes.Status200OK)]
public async Task<bool?> LockRequest([FromBody] LockRequestCommand command, CancellationToken cancellationToken)
{
var result = await _clientRequestHandler.OnClientRequest(command, cancellationToken);
return (bool?)result;
}

[HttpPost("[action]")]
[ProducesResponseType(typeof(bool), StatusCodes.Status200OK)]
public async Task<bool?> LockRelease([FromBody] LockReleaseCommand command, CancellationToken cancellationToken)
{
var result = await _clientRequestHandler.OnClientRequest(command, cancellationToken);
return (bool?)result;
}
}
2 changes: 2 additions & 0 deletions src/Samples/SlimCluster.Samples.Service/Kube-ApplySample.ps1
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# run with tty console, interactive mode, and remove container after program ends
kubectl apply -f service.yaml
kubectl apply -f configmap.yaml
kubectl apply -f persistent-volume-claim.yaml
kubectl apply -f deployment.yaml
2 changes: 2 additions & 0 deletions src/Samples/SlimCluster.Samples.Service/Kube-DeleteSample.ps1
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
kubectl delete -f deployment.yaml
#kubectl delete -f service.yaml
#kubectl delete -f persistent-volume-claim.yaml
#kubectl delete -f persistent-volume.yaml
38 changes: 30 additions & 8 deletions src/Samples/SlimCluster.Samples.Service/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using SlimCluster.Consensus.Raft;
using SlimCluster.Consensus.Raft.Logs;
using SlimCluster.Membership.Swim;
using SlimCluster.Persistence;
using SlimCluster.Persistence.LocalFile;
using SlimCluster.Samples.ConsoleApp;
using SlimCluster.Samples.ConsoleApp.State.Logs;
Expand All @@ -22,6 +23,10 @@
// doc:fragment:ExampleStartup
builder.Services.AddSlimCluster(cfg =>
{

//System.IO.File.WriteAllText("/data/pvc/Testing123-" + Guid.NewGuid().ToString(), "Testing");
//System.IO.File.WriteAllText("/data/Testing123-" + Guid.NewGuid().ToString(), "Testing");

cfg.ClusterId = "MyCluster";
// This will use the machine name, in Kubernetes this will be the pod name
cfg.NodeId = Environment.MachineName;
Expand All @@ -37,7 +42,10 @@
cfg.AddJsonSerialization();

// Cluster state will saved into the local json file in between node restarts
cfg.AddPersistenceUsingLocalFile("cluster-state.json");
var tempFilePath = Path.Combine(Path.GetTempPath(), "cluster-state.json");
Console.WriteLine($"Path.GetTempPath(): {Path.GetTempPath()}");
Console.WriteLine($"tempFilePath: {tempFilePath}");
cfg.AddPersistenceUsingLocalFile("cluster-state.json", Newtonsoft.Json.Formatting.Indented);

// Setup Swim Cluster Membership
cfg.AddSwimMembership(opts =>
Expand All @@ -49,12 +57,15 @@
cfg.AddRaftConsensus(opts =>
{
opts.NodeCount = 3;

// Use custom values or remove and use defaults
opts.LeaderTimeout = TimeSpan.FromSeconds(5);
opts.LeaderPingInterval = TimeSpan.FromSeconds(2);
opts.ElectionTimeoutMin = TimeSpan.FromSeconds(3);
opts.ElectionTimeoutMax = TimeSpan.FromSeconds(6);
//opts.LeaderTimeout = TimeSpan.FromSeconds(5f);
opts.HeartbeatInterval = TimeSpan.FromSeconds(2f);
opts.ElectionTimeoutMin = TimeSpan.FromSeconds(3f);
opts.ElectionTimeoutMax = TimeSpan.FromSeconds(6f);
opts.RequestTimeout = TimeSpan.FromSeconds(10f);
opts.FifoLockTimeout = TimeSpan.FromSeconds(30f);

// Can set a different log serializer, by default ISerializer is used (in our setup its JSON)
// opts.LogSerializerType = typeof(JsonSerializer);
});
Expand All @@ -68,7 +79,9 @@

// Raft app specific implementation
builder.Services.AddSingleton<ILogRepository, InMemoryLogRepository>(); // For now, store the logs in memory only
builder.Services.AddSingleton<IStateMachine, CounterStateMachine>(); // This is app specific machine that implements a distributed counter
builder.Services.AddSingleton<CounterStateMachine>();
builder.Services.AddSingleton<IStateMachine>(provider => provider.GetRequiredService<CounterStateMachine>()); // This is app specific machine that implements a distributed counter
builder.Services.AddSingleton<IDurableComponent>(provider => provider.GetRequiredService<CounterStateMachine>());
builder.Services.AddSingleton<ISerializationTypeAliasProvider, CommandSerializationTypeAliasProvider>(); // App specific state/logs command types for the replicated state machine

// Requires packages: SlimCluster.Membership.Swim, SlimCluster.Consensus.Raft, SlimCluster.Serialization.Json, SlimCluster.Transport.Ip, SlimCluster.Persistence.LocalFile, SlimCluster.AspNetCore
Expand All @@ -81,11 +94,20 @@

var app = builder.Build();

if (app.Services.GetService<IClusterPersistenceService>() is null)
{
Console.WriteLine("IClusterPersistenceService is not available.");
}
else
{
Console.WriteLine("IClusterPersistenceService is available.");
}

// Configure the HTTP request pipeline.
app.UseSwagger();
app.UseSwaggerUI();

app.UseHttpsRedirection();
//app.UseHttpsRedirection();

app.UseAuthorization();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace SlimCluster.Samples.ConsoleApp.State.Logs;

using SlimCluster.Consensus.Raft;
using SlimCluster.Serialization;

internal class CommandSerializationTypeAliasProvider : ISerializationTypeAliasProvider
Expand All @@ -9,5 +10,7 @@ internal class CommandSerializationTypeAliasProvider : ISerializationTypeAliasPr
["dec"] = typeof(DecrementCounterCommand),
["inc"] = typeof(IncrementCounterCommand),
["rst"] = typeof(ResetCounterCommand),
["lrq"] = typeof(LockRequestCommand),
["lrl"] = typeof(LockReleaseCommand),
};
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
namespace SlimCluster.Samples.ConsoleApp.State.StateMachine;

using SlimCluster.Consensus.Raft;
using SlimCluster.Persistence;
using SlimCluster.Samples.ConsoleApp.State.Logs;

using static SlimCluster.Samples.ConsoleApp.State.StateMachine.CounterStateMachine;

/// <summary>
/// Counter state machine that processes counter commands. Everything is stored in memory.
/// </summary>
public class CounterStateMachine : IStateMachine, ICounterState
public class CounterStateMachine : IStateMachine, ICounterState, IDurableComponent
{
private int _index = 0;
private int _counter = 0;
Expand All @@ -18,7 +21,15 @@ public class CounterStateMachine : IStateMachine, ICounterState
/// </summary>
public int Counter => _counter;

public Task<object?> Apply(object command, int index)
public List<LockRequestCommand> _locks = new();

//public CounterStateMachine(LoggerFactory loggerFactory)
//{
// _logger=loggerFactory.CreateLogger<CounterStateMachine>();
//}
//private ILogger<CounterStateMachine> _logger;

public async Task<object?> Apply(object command, int index)
{
// Note: This is thread safe - there is ever going to be only one task at a time calling Apply

Expand All @@ -27,22 +38,70 @@ public class CounterStateMachine : IStateMachine, ICounterState
throw new InvalidOperationException($"The State Machine can only apply next command at index ${_index + 1}");
}

int? result = command switch
//await Task.Delay(20000);

object? result = null;

switch (command)
{
IncrementCounterCommand => ++_counter,
DecrementCounterCommand => --_counter,
ResetCounterCommand => _counter = 0,
_ => throw new NotImplementedException($"The command type ${command?.GetType().Name} is not supported")
};
case LockRequestCommand lockRequest:

result = lockRequest.Result;

if(lockRequest.Result)
{
var existing = _locks.SingleOrDefault(lock_ => lock_.Name.Equals(lockRequest.Name, StringComparison.CurrentCultureIgnoreCase));
if (existing is null)
{
_locks.Add(lockRequest);
}
// Change owner
else if(!existing.Owner.Equals(lockRequest.Owner, StringComparison.CurrentCultureIgnoreCase))
{
existing.Owner = lockRequest.Owner;
}
}
break;
case LockReleaseCommand lockRelease:

result = lockRelease.Result;
if (lockRelease.Result)
{
var existing = _locks.SingleOrDefault(lock_ => lock_.Name.Equals(lockRelease.Name, StringComparison.CurrentCultureIgnoreCase) && lock_.Owner.Equals(lockRelease.Owner, StringComparison.CurrentCultureIgnoreCase));
if (existing is not null)
{
_locks.Remove(existing);
}
}
break;
default:

result = command switch
{
IncrementCounterCommand => ++_counter,
DecrementCounterCommand => --_counter,
ResetCounterCommand => _counter = 0,
_ => throw new NotImplementedException($"The command type ${command?.GetType().Name} is not supported")
};
break;
}

_index = index;

return Task.FromResult<object?>(result);
return result;
}

// For now we don't support snapshotting
public Task Restore() => throw new NotImplementedException();

// For now we don't support snapshotting
public Task Snapshot() => throw new NotImplementedException();

public void OnStatePersist(IStateWriter state)
{
state.Set("counter", _counter);
state.Set("locks", _locks);
}

public void OnStateRestore(IStateReader state) => throw new NotImplementedException();
}
8 changes: 8 additions & 0 deletions src/Samples/SlimCluster.Samples.Service/configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: my-config
data:
config-file.txt: |
key1=value1
key2=value2
33 changes: 32 additions & 1 deletion src/Samples/SlimCluster.Samples.Service/deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
apiVersion: apps/v1
kind: Deployment
kind: StatefulSet
metadata:
name: sc-service
labels:
Expand All @@ -25,3 +25,34 @@ spec:
env:
- name: ASPNETCORE_URLS
value: "http://+:8080"
volumeMounts:
# ConfigMap
- mountPath: /etc/config # path in pod in which ConfigMap's config-file.txt will be written
name: config-volume
# Ephemeral not supported during dev
# - mountPath: /etc/data # path in pod in which the emphemeral volume is mounted
# name: my-ephemeral-volume
- mountPath: /data/pvc
name: pvc-volume
- mountPath: /data
name: my-volume
volumes:
- name: config-volume
configMap:
name: my-config
# - name: my-ephemeral-volume
# csi:
# driver: my-csi-driver
# volumeAttributes:
# size: "1Gi"
- name: pvc-volume
persistentVolumeClaim:
claimName: local-pvc
volumeClaimTemplates:
- metadata:
name: my-volume
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 1Gi
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: local-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
2 changes: 1 addition & 1 deletion src/Samples/SlimCluster.Samples.Service/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
labels:
run: sc-service
spec:
type: NodePort
type: LoadBalancer
ports:
- port: 8080
protocol: TCP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ public static ClusterConfiguration AddAspNetCore(this ClusterConfiguration cfg,
{
cfg.PostConfigurationActions.Add(services =>
{
services.AddHttpClient<RequestDelegatingClient>();
// This client is used for followers to redirect requests to primaries.
services.AddHttpClient<RequestDelegatingClient>(client => {
// If the leader fails then followers will attempt to contact it and it needs to fail quickly because the leader's IP may change to
// a new node and therefore the target IP for this client call may no longer be valid.
// TODO: Should be an should be an option.
// ElectionTimeoutMax = 6?
client.Timeout = TimeSpan.FromSeconds(60);
});

services.Configure(options);
});
Expand Down
Loading