description |
---|
Pipelines were added to the Nethermind code base in order to make building plugins around blockchain analysis faster and easier. |
Please note that the pipeline source code is still a work in progress and there will be significant changes in the coming weeks!
Nethermind under Nethermind.Pipeline
project provides couple of interfaces and classes used in creating pipelines.
IPipelineElement
- base building block of the pipeline.IPipelineElement<TOut>
- generic interface used for emitting data of typeTOut
. to the next element (most of the time this is the first element in the collection).- IPipelineElement<TIn, TOut> - interface used for both emitting and subscribing to the data from the previous block, so
TIn
is the type of data from the previous block andTOut
is the type of data that the element wants to give to the next element. IPipeline
- collection ofIPipelineElement
used to store our pipeline in the memory.IPipelineBuilder<TSource, TOutput>
- when we want to create our pipeline the fastest way will be to use implementation of this interface (implemented inPipelineBuilder.cs
). Builder collects all of the elements we want to create the pipeline with and creates the flow of the data between them. Keep in mind that once the pipeline is created it's immutable.
Publishers are just classes implementing IPipelineElement<TIn, TOut>
(so they are able to subscribe and emit data) with the additional functionality of publishing the data to either WebSockets or as logs to the file. We add them to the pipeline the same way as normal elements - by using PipelineBuilder
- WebSocketsPublisher
- LogPublisher
We will build a Nethermind plugin (see nethermind plugins) which observes newly created blocks and see whether there are any transactions to a specific contract address.
Firstly we need to create our pipeline elements. With this case there will only be 3 of them - source, processor and publisher but there is nothing standing in the way in adding more elements.
public class NewTransactionsSource<TIn> : IPipelineElement<TIn> where TIn : Transaction
{
private Block _block;
private IBlockProcessor _blockProcessor;
public Action<TIn> Emit { private get; set; }
public NewTransactionsSource(IBlockProcessor blockProcessor)
{
_blockProcessor = blockProcessor;
}
public void OnNewBlockProcessed(BlockProcessedEventArgs args)
{
var block = args.Block;
foreach(var tx in block.Transactions)
{
Emit((TIn)tx);
}
}
}
public class TransactionProcessorElement<TIn, TOut> : IPipelineElement<TIn, TOut>
where TIn : Transaction
where TOut : TIn
{
private Address _address = new Address("0xFca59Cd816aB1eaD66534D82bc21E7515cE441CF");
public Action<TOut> Emit { private get; set; }
public void SubscribeToData(TIn data)
{
if(data.To == _address)
{
Emit((TOut)data);
}
}
}
After that create plugin file and build your pipeline with IPipelineBuilder
public class PipelineExamplePlugin : INethermindPlugin
{
public string Name => "Pipeline example";
public string Description => "Example on how to deal with Nethermind's pipeline";
public string Author => "Nethermind team";
private INethermindApi _api;
private IPipeline _pipeline;
public ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}
public Task Init(INethermindApi nethermindApi)
{
_api = nethermindApi;
var sourceElement = new NewTransactionsSource<Transaction>(_api.MainBlockProcessor);
var processorElement = new TransactionProcessorElement<Transaction, Transaction>();
var webSocketsPublisher = new WebSocketsPublisher<Transaction, Transaction>("example-publisher", _api.EthereumJsonSerializer);
_api.WebSocketsManager.AddModule(webSocketsPublisher);
var pipelineBuilder = new PipelineBuilder<Transaction, Transaction>(sourceElement);
_pipeline = pipelineBuilder.AddElement(processorElement).AddElement(webSocketsPublisher).Build();
return Task.CompletedTask;
}
public Task InitNetworkProtocol()
{
return Task.CompletedTask;
}
public Task InitRpcModules()
{
return Task.CompletedTask;
}
}
With that, all you need is to add the plugin to your node and watch how transactions are being streamed through web sockets.