diff --git a/.github/workflows/deploy-docker.yml b/.github/workflows/deploy-docker.yml index df006e3f..c6d85c6b 100644 --- a/.github/workflows/deploy-docker.yml +++ b/.github/workflows/deploy-docker.yml @@ -3,6 +3,7 @@ name: Publish Docker image on: release: types: [published] + pull_request: jobs: push_to_registry: @@ -31,24 +32,25 @@ jobs: fi - name: Log in to Docker Hub + if: ${{ github.event_name == 'release' && github.event.action == 'published' }} uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Extract metadata (tags, labels) for Docker + if: ${{ github.event_name == 'release' && github.event.action == 'published' }} id: meta uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7 with: - images: multiversx/elastic-indexer + images: multiversx/sovereign-elastic-indexer - name: Build and push Docker image id: push uses: docker/build-push-action@3b5e8027fcad23fda98b2e3ac259d8d67585f671 with: context: . - file: ./Dockerfile - push: true + file: ./Dockerfile-sovereign + push: ${{ github.event_name == 'release' && github.event.action == 'published' }} tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} - diff --git a/Dockerfile-sovereign b/Dockerfile-sovereign new file mode 100644 index 00000000..c8b72a1a --- /dev/null +++ b/Dockerfile-sovereign @@ -0,0 +1,26 @@ +FROM golang:1.20.7 as builder + +RUN apt-get update && apt-get install -y + +WORKDIR /multiversx +COPY . . + +WORKDIR /multiversx/cmd/elasticindexer + +RUN go build -o elasticindexer + +# ===== SECOND STAGE ====== +FROM ubuntu:22.04 +RUN apt-get update && apt-get install -y + +RUN useradd -m -u 1000 appuser +USER appuser + +COPY --from=builder --chown=appuser /multiversx/cmd/elasticindexer /multiversx + +EXPOSE 22111 + +WORKDIR /multiversx + +ENTRYPOINT ["./elasticindexer", "--sovereign"] +CMD ["--log-level", "*:DEBUG"] diff --git a/Makefile b/Makefile index a000f850..075cc715 100644 --- a/Makefile +++ b/Makefile @@ -36,9 +36,16 @@ integration-tests-open-search: INDEXER_IMAGE_NAME="elasticindexer" INDEXER_IMAGE_TAG="latest" DOCKER_FILE=Dockerfile +SOVEREIGN_DOCKER_FILE=Dockerfile-sovereign docker-build: docker build \ -t ${INDEXER_IMAGE_NAME}:${INDEXER_IMAGE_TAG} \ -f ${DOCKER_FILE} \ . + +docker-sovereign-build: + docker build \ + -t ${INDEXER_IMAGE_NAME}:${INDEXER_IMAGE_TAG} \ + -f ${SOVEREIGN_DOCKER_FILE} \ + . diff --git a/client/disabled/elasticClient.go b/client/disabled/elasticClient.go new file mode 100644 index 00000000..41ccc6c0 --- /dev/null +++ b/client/disabled/elasticClient.go @@ -0,0 +1,78 @@ +package disabled + +import ( + "bytes" + "context" +) + +type elasticClient struct{} + +// NewDisabledElasticClient - +func NewDisabledElasticClient() *elasticClient { + return &elasticClient{} +} + +// DoBulkRequest - +func (ec *elasticClient) DoBulkRequest(_ context.Context, _ *bytes.Buffer, _ string) error { + return nil +} + +// DoQueryRemove - +func (ec *elasticClient) DoQueryRemove(_ context.Context, _ string, _ *bytes.Buffer) error { + return nil +} + +// DoMultiGet - +func (ec *elasticClient) DoMultiGet(_ context.Context, _ []string, _ string, _ bool, _ interface{}) error { + return nil +} + +// DoScrollRequest - +func (ec *elasticClient) DoScrollRequest(_ context.Context, _ string, _ []byte, _ bool, _ func(responseBytes []byte) error) error { + return nil +} + +// DoCountRequest - +func (ec *elasticClient) DoCountRequest(_ context.Context, _ string, _ []byte) (uint64, error) { + return 0, nil +} + +// UpdateByQuery - +func (ec *elasticClient) UpdateByQuery(_ context.Context, _ string, _ *bytes.Buffer) error { + return nil +} + +// PutMappings - +func (ec *elasticClient) PutMappings(_ string, _ *bytes.Buffer) error { + return nil +} + +// CheckAndCreateIndex - +func (ec *elasticClient) CheckAndCreateIndex(_ string) error { + return nil +} + +// CheckAndCreateAlias - +func (ec *elasticClient) CheckAndCreateAlias(_ string, _ string) error { + return nil +} + +// CheckAndCreateTemplate - +func (ec *elasticClient) CheckAndCreateTemplate(_ string, _ *bytes.Buffer) error { + return nil +} + +// CheckAndCreatePolicy - +func (ec *elasticClient) CheckAndCreatePolicy(_ string, _ *bytes.Buffer) error { + return nil +} + +// IsEnabled - +func (ec *elasticClient) IsEnabled() bool { + return false +} + +// IsInterfaceNil - returns true if there is no value under the interface +func (ec *elasticClient) IsInterfaceNil() bool { + return ec == nil +} diff --git a/client/disabled/elasticClient_test.go b/client/disabled/elasticClient_test.go new file mode 100644 index 00000000..c33e8716 --- /dev/null +++ b/client/disabled/elasticClient_test.go @@ -0,0 +1,31 @@ +package disabled + +import ( + "bytes" + "context" + "testing" + + "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/stretchr/testify/require" +) + +func TestDisabledElasticClient_MethodsShouldNotPanic(t *testing.T) { + t.Parallel() + + ec := NewDisabledElasticClient() + require.False(t, check.IfNil(ec)) + + require.NotPanics(t, func() { + _ = ec.DoBulkRequest(context.Background(), new(bytes.Buffer), "") + _ = ec.DoQueryRemove(context.Background(), "", new(bytes.Buffer)) + _ = ec.DoMultiGet(context.Background(), make([]string, 0), "", true, nil) + _ = ec.DoScrollRequest(context.Background(), "", []byte(""), true, nil) + _, _ = ec.DoCountRequest(context.Background(), "", []byte("")) + _ = ec.UpdateByQuery(context.Background(), "", new(bytes.Buffer)) + _ = ec.PutMappings("", new(bytes.Buffer)) + _ = ec.CheckAndCreateIndex("") + _ = ec.CheckAndCreateAlias("", "") + _ = ec.CheckAndCreateTemplate("", new(bytes.Buffer)) + _ = ec.CheckAndCreatePolicy("", new(bytes.Buffer)) + }) +} diff --git a/client/elasticClientCommon.go b/client/elasticClientCommon.go index 553ca048..80ab9790 100644 --- a/client/elasticClientCommon.go +++ b/client/elasticClientCommon.go @@ -6,11 +6,14 @@ import ( "fmt" "io" "io/ioutil" + "math" "net/http" "net/url" "strings" + "time" "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/multiversx/mx-chain-es-indexer-go/data" "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" ) @@ -268,3 +271,11 @@ func parseResponse(res *esapi.Response, dest interface{}, errorHandler responseE return nil } + +// RetryBackOff returns elastic retry backoff duration +func RetryBackOff(attempt int) time.Duration { + d := time.Duration(math.Exp2(float64(attempt))) * time.Second + log.Debug("elastic: retry backoff", "attempt", attempt, "sleep duration", d) + + return d +} diff --git a/client/mainChainElasticClient.go b/client/mainChainElasticClient.go new file mode 100644 index 00000000..042504e9 --- /dev/null +++ b/client/mainChainElasticClient.go @@ -0,0 +1,35 @@ +package client + +import ( + "github.com/multiversx/mx-chain-core-go/core/check" + + "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc" +) + +type mainChainElasticClient struct { + elasticproc.DatabaseClientHandler + indexingEnabled bool +} + +// NewMainChainElasticClient creates a new sovereign elastic client +func NewMainChainElasticClient(esClient elasticproc.DatabaseClientHandler, indexingEnabled bool) (*mainChainElasticClient, error) { + if check.IfNil(esClient) { + return nil, dataindexer.ErrNilDatabaseClient + } + + return &mainChainElasticClient{ + esClient, + indexingEnabled, + }, nil +} + +// IsEnabled returns true if main chain elastic client is enabled +func (mcec *mainChainElasticClient) IsEnabled() bool { + return mcec.indexingEnabled +} + +// IsInterfaceNil returns true if there is no value under the interface +func (mcec *mainChainElasticClient) IsInterfaceNil() bool { + return mcec == nil +} diff --git a/client/mainChainElasticClient_test.go b/client/mainChainElasticClient_test.go new file mode 100644 index 00000000..c674cb32 --- /dev/null +++ b/client/mainChainElasticClient_test.go @@ -0,0 +1,42 @@ +package client + +import ( + "fmt" + "testing" + + "github.com/elastic/go-elasticsearch/v7" + "github.com/stretchr/testify/require" + + "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" +) + +func TestNewMainChainElasticClient(t *testing.T) { + t.Run("nil elastic client, should error", func(t *testing.T) { + mainChainESClient, err := NewMainChainElasticClient(nil, true) + require.Error(t, err, dataindexer.ErrNilDatabaseClient) + require.True(t, mainChainESClient.IsInterfaceNil()) + }) + t.Run("valid elastic client, should work", func(t *testing.T) { + esClient, err := NewElasticClient(elasticsearch.Config{ + Addresses: []string{"http://localhost:9200"}, + }) + require.Nil(t, err) + require.NotNil(t, esClient) + + mainChainESClient, err := NewMainChainElasticClient(esClient, true) + require.NoError(t, err) + require.Equal(t, "*client.mainChainElasticClient", fmt.Sprintf("%T", mainChainESClient)) + }) +} + +func TestMainChainElasticClient_IsEnabled(t *testing.T) { + esClient, err := NewElasticClient(elasticsearch.Config{ + Addresses: []string{"http://localhost:9200"}, + }) + require.Nil(t, err) + require.NotNil(t, esClient) + + mainChainESClient, err := NewMainChainElasticClient(esClient, true) + require.NoError(t, err) + require.Equal(t, true, mainChainESClient.IsEnabled()) +} diff --git a/cmd/elasticindexer/config/config.toml b/cmd/elasticindexer/config/config.toml index b63329c7..5fdf7122 100644 --- a/cmd/elasticindexer/config/config.toml +++ b/cmd/elasticindexer/config/config.toml @@ -4,6 +4,7 @@ "receipts", "scresults", "accountsesdt", "accountsesdthistory", "epochinfo", "scdeploys", "tokens", "tags", "logs", "delegators", "operations", "esdts", "values", "events" ] + esdt-prefix = "" [config.address-converter] length = 32 type = "bech32" diff --git a/cmd/elasticindexer/config/prefs.toml b/cmd/elasticindexer/config/prefs.toml index 2685e4b8..bc6a2743 100644 --- a/cmd/elasticindexer/config/prefs.toml +++ b/cmd/elasticindexer/config/prefs.toml @@ -23,3 +23,11 @@ username = "" password = "" bulk-request-max-size-in-bytes = 4194304 # 4MB + + # Configuration for main chain elastic cluster + # Used by the sovereign chain indexer to index incoming new tokens properties + [config.main-chain-elastic-cluster] + enabled = true + url = "http://localhost:9201" + username = "" + password = "" diff --git a/config/config.go b/config/config.go index 5d4e7812..203bd6e7 100644 --- a/config/config.go +++ b/config/config.go @@ -4,6 +4,7 @@ package config type Config struct { Config struct { AvailableIndices []string `toml:"available-indices"` + ESDTPrefix string `toml:"esdt-prefix"` AddressConverter struct { Length int `toml:"length"` Type string `toml:"type"` @@ -52,6 +53,12 @@ type ClusterConfig struct { Password string `toml:"password"` BulkRequestMaxSizeInBytes int `toml:"bulk-request-max-size-in-bytes"` } `toml:"elastic-cluster"` + MainChainCluster struct { + Enabled bool `toml:"enabled"` + URL string `toml:"url"` + UserName string `toml:"username"` + Password string `toml:"password"` + } `toml:"main-chain-elastic-cluster"` } `toml:"config"` } diff --git a/data/tokens.go b/data/tokens.go index 967e13ee..01f39dce 100644 --- a/data/tokens.go +++ b/data/tokens.go @@ -45,6 +45,18 @@ type SourceToken struct { CurrentOwner string `json:"currentOwner"` } +// ResponseTokenInfo is the structure for the tokens info response +type ResponseTokenInfo struct { + Docs []ResponseTokenInfoDB `json:"docs"` +} + +// ResponseTokenInfoDB is the structure for the token info response +type ResponseTokenInfoDB struct { + Found bool `json:"found"` + ID string `json:"_id"` + Source TokenInfo `json:"_source"` +} + // TokenInfo is a structure that is needed to store information about a token type TokenInfo struct { Name string `json:"name,omitempty"` diff --git a/docker-compose.yml b/docker-compose.yml index 970f0b40..27e689c1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,6 +15,22 @@ services: ports: - "9200:9200" - "9300:9300" + main-chain-elasticsearch: + container_name: es-container2 + image: docker.elastic.co/elasticsearch/elasticsearch:7.16.1 + environment: + - "discovery.type=single-node" + - "xpack.security.enabled=false" + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ulimits: + memlock: + soft: -1 + hard: -1 + networks: + - es-net + ports: + - "9201:9200" + - "9301:9300" kibana: container_name: kb-container image: docker.elastic.co/kibana/kibana:7.16.1 diff --git a/factory/runType/interface.go b/factory/runType/interface.go index 1114ff4d..4b96663b 100644 --- a/factory/runType/interface.go +++ b/factory/runType/interface.go @@ -1,12 +1,13 @@ package runType import ( + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc" "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/transactions" ) // RunTypeComponentsCreator is the interface for creating run type components type RunTypeComponentsCreator interface { - Create() *runTypeComponents + Create() (*runTypeComponents, error) IsInterfaceNil() bool } @@ -28,6 +29,7 @@ type RunTypeComponentsHandler interface { type RunTypeComponentsHolder interface { TxHashExtractorCreator() transactions.TxHashExtractor RewardTxDataCreator() transactions.RewardTxDataHandler + IndexTokensHandlerCreator() elasticproc.IndexTokensHandler Create() error Close() error CheckSubcomponents() error diff --git a/factory/runType/runTypeComponents.go b/factory/runType/runTypeComponents.go index b70512b6..4bccd66b 100644 --- a/factory/runType/runTypeComponents.go +++ b/factory/runType/runTypeComponents.go @@ -1,12 +1,14 @@ package runType import ( + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc" "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/transactions" ) type runTypeComponents struct { - txHashExtractor transactions.TxHashExtractor - rewardTxData transactions.RewardTxDataHandler + txHashExtractor transactions.TxHashExtractor + rewardTxData transactions.RewardTxDataHandler + indexTokensHandler elasticproc.IndexTokensHandler } // Close does nothing diff --git a/factory/runType/runTypeComponentsFactory.go b/factory/runType/runTypeComponentsFactory.go index 8ec94df3..8ac863d6 100644 --- a/factory/runType/runTypeComponentsFactory.go +++ b/factory/runType/runTypeComponentsFactory.go @@ -1,6 +1,7 @@ package runType import ( + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/tokens" "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/transactions" ) @@ -12,11 +13,12 @@ func NewRunTypeComponentsFactory() *runTypeComponentsFactory { } // Create will create the run type components -func (rtcf *runTypeComponentsFactory) Create() *runTypeComponents { +func (rtcf *runTypeComponentsFactory) Create() (*runTypeComponents, error) { return &runTypeComponents{ - txHashExtractor: transactions.NewTxHashExtractor(), - rewardTxData: transactions.NewRewardTxData(), - } + txHashExtractor: transactions.NewTxHashExtractor(), + rewardTxData: transactions.NewRewardTxData(), + indexTokensHandler: tokens.NewDisabledIndexTokensHandler(), + }, nil } // IsInterfaceNil returns true if there is no value under the interface diff --git a/factory/runType/runTypeComponentsHandler.go b/factory/runType/runTypeComponentsHandler.go index 6b0fe9e9..113be479 100644 --- a/factory/runType/runTypeComponentsHandler.go +++ b/factory/runType/runTypeComponentsHandler.go @@ -5,6 +5,8 @@ import ( "github.com/multiversx/mx-chain-core-go/core/check" + elasticIndexer "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc" "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/transactions" ) @@ -34,7 +36,10 @@ func NewManagedRunTypeComponents(rtc RunTypeComponentsCreator) (*managedRunTypeC // Create will create the managed components func (mrtc *managedRunTypeComponents) Create() error { - rtc := mrtc.factory.Create() + rtc, err := mrtc.factory.Create() + if err != nil { + return err + } mrtc.mutRunTypeCoreComponents.Lock() mrtc.runTypeComponents = rtc @@ -75,6 +80,9 @@ func (mrtc *managedRunTypeComponents) CheckSubcomponents() error { if check.IfNil(mrtc.rewardTxData) { return transactions.ErrNilRewardTxDataHandler } + if check.IfNil(mrtc.indexTokensHandler) { + return elasticIndexer.ErrNilIndexTokensHandler + } return nil } @@ -102,6 +110,18 @@ func (mrtc *managedRunTypeComponents) RewardTxDataCreator() transactions.RewardT return mrtc.runTypeComponents.rewardTxData } +// IndexTokensHandlerCreator returns the index tokens handler +func (mrtc *managedRunTypeComponents) IndexTokensHandlerCreator() elasticproc.IndexTokensHandler { + mrtc.mutRunTypeCoreComponents.Lock() + defer mrtc.mutRunTypeCoreComponents.Unlock() + + if check.IfNil(mrtc.runTypeComponents) { + return nil + } + + return mrtc.runTypeComponents.indexTokensHandler +} + // IsInterfaceNil returns true if the interface is nil func (mrtc *managedRunTypeComponents) IsInterfaceNil() bool { return mrtc == nil diff --git a/factory/runType/runTypeComponents_test.go b/factory/runType/runTypeComponents_test.go index 07580802..9f929ea2 100644 --- a/factory/runType/runTypeComponents_test.go +++ b/factory/runType/runTypeComponents_test.go @@ -21,8 +21,9 @@ func TestRunTypeComponentsFactory_Create(t *testing.T) { rtcf := NewRunTypeComponentsFactory() require.NotNil(t, rtcf) - rtc := rtcf.Create() + rtc, err := rtcf.Create() require.NotNil(t, rtc) + require.NoError(t, err) } func TestRunTypeComponentsFactory_Close(t *testing.T) { @@ -31,8 +32,9 @@ func TestRunTypeComponentsFactory_Close(t *testing.T) { rtcf := NewRunTypeComponentsFactory() require.NotNil(t, rtcf) - rtc := rtcf.Create() + rtc, err := rtcf.Create() require.NotNil(t, rtc) + require.NoError(t, err) require.NoError(t, rtc.Close()) } diff --git a/factory/runType/sovereignRunTypeComponentsFactory.go b/factory/runType/sovereignRunTypeComponentsFactory.go index 874a1b29..4bafdf1a 100644 --- a/factory/runType/sovereignRunTypeComponentsFactory.go +++ b/factory/runType/sovereignRunTypeComponentsFactory.go @@ -1,21 +1,69 @@ package runType import ( + "net/http" + + "github.com/elastic/go-elasticsearch/v7" + + "github.com/multiversx/mx-chain-es-indexer-go/client" + "github.com/multiversx/mx-chain-es-indexer-go/client/disabled" + "github.com/multiversx/mx-chain-es-indexer-go/client/logging" + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc" + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/factory" + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/tokens" "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/transactions" ) -type sovereignRunTypeComponentsFactory struct{} +type sovereignRunTypeComponentsFactory struct { + mainChainElastic factory.ElasticConfig + esdtPrefix string +} // NewSovereignRunTypeComponentsFactory will return a new instance of sovereign run type components factory -func NewSovereignRunTypeComponentsFactory() *sovereignRunTypeComponentsFactory { - return &sovereignRunTypeComponentsFactory{} +func NewSovereignRunTypeComponentsFactory(mainChainElastic factory.ElasticConfig, esdtPrefix string) *sovereignRunTypeComponentsFactory { + return &sovereignRunTypeComponentsFactory{ + mainChainElastic: mainChainElastic, + esdtPrefix: esdtPrefix, + } } // Create will create the run type components -func (srtcf *sovereignRunTypeComponentsFactory) Create() *runTypeComponents { +func (srtcf *sovereignRunTypeComponentsFactory) Create() (*runTypeComponents, error) { + mainChainElasticClient, err := createMainChainElasticClient(srtcf.mainChainElastic) + if err != nil { + return nil, err + } + + sovIndexTokensHandler, err := tokens.NewSovereignIndexTokensHandler(mainChainElasticClient, srtcf.esdtPrefix) + if err != nil { + return nil, err + } + return &runTypeComponents{ - txHashExtractor: transactions.NewSovereignTxHashExtractor(), - rewardTxData: transactions.NewSovereignRewardTxData(), + txHashExtractor: transactions.NewSovereignTxHashExtractor(), + rewardTxData: transactions.NewSovereignRewardTxData(), + indexTokensHandler: sovIndexTokensHandler, + }, nil +} + +func createMainChainElasticClient(mainChainElastic factory.ElasticConfig) (elasticproc.MainChainDatabaseClientHandler, error) { + if mainChainElastic.Enabled { + argsEsClient := elasticsearch.Config{ + Addresses: []string{mainChainElastic.Url}, + Username: mainChainElastic.UserName, + Password: mainChainElastic.Password, + Logger: &logging.CustomLogger{}, + RetryOnStatus: []int{http.StatusConflict}, + RetryBackoff: client.RetryBackOff, + } + esClient, err := client.NewElasticClient(argsEsClient) + if err != nil { + return nil, err + } + + return client.NewMainChainElasticClient(esClient, mainChainElastic.Enabled) + } else { + return disabled.NewDisabledElasticClient(), nil } } diff --git a/factory/runType/sovereignRunTypeComponentsFactory_test.go b/factory/runType/sovereignRunTypeComponentsFactory_test.go index 71d909ba..4bb3581b 100644 --- a/factory/runType/sovereignRunTypeComponentsFactory_test.go +++ b/factory/runType/sovereignRunTypeComponentsFactory_test.go @@ -4,16 +4,19 @@ import ( "testing" "github.com/stretchr/testify/require" + + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/factory" ) func TestSovereignRunTypeComponentsFactory_CreateAndClose(t *testing.T) { t.Parallel() - srtcf := NewSovereignRunTypeComponentsFactory() + srtcf := NewSovereignRunTypeComponentsFactory(factory.ElasticConfig{}, "sov") require.False(t, srtcf.IsInterfaceNil()) - srtc := srtcf.Create() + srtc, err := srtcf.Create() require.NotNil(t, srtc) + require.NoError(t, err) require.NoError(t, srtc.Close()) } diff --git a/factory/wsIndexerFactory.go b/factory/wsIndexerFactory.go index d9d1264c..74bc00e4 100644 --- a/factory/wsIndexerFactory.go +++ b/factory/wsIndexerFactory.go @@ -11,6 +11,7 @@ import ( "github.com/multiversx/mx-chain-es-indexer-go/config" "github.com/multiversx/mx-chain-es-indexer-go/core" + esFactory "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/factory" "github.com/multiversx/mx-chain-es-indexer-go/process/factory" "github.com/multiversx/mx-chain-es-indexer-go/process/wsindexer" ) @@ -76,8 +77,16 @@ func createDataIndexer( return nil, err } + mainChainElastic := esFactory.ElasticConfig{ + Enabled: clusterCfg.Config.MainChainCluster.Enabled, + Url: clusterCfg.Config.MainChainCluster.URL, + UserName: clusterCfg.Config.MainChainCluster.UserName, + Password: clusterCfg.Config.MainChainCluster.Password, + } + return factory.NewIndexer(factory.ArgsIndexerFactory{ Sovereign: cfg.Sovereign, + MainChainElastic: mainChainElastic, UseKibana: clusterCfg.Config.ElasticCluster.UseKibana, Denomination: cfg.Config.Economics.Denomination, BulkRequestMaxSize: clusterCfg.Config.ElasticCluster.BulkRequestMaxSizeInBytes, diff --git a/integrationtests/consts.go b/integrationtests/consts.go index 8593fcf6..32b4bd1b 100644 --- a/integrationtests/consts.go +++ b/integrationtests/consts.go @@ -6,5 +6,7 @@ const ( //nolint esURL = "http://localhost:9200" //nolint + esMainChainURL = "http://localhost:9201" + //nolint addressPrefix = "erd" ) diff --git a/integrationtests/incomingSCR_test.go b/integrationtests/incomingSCR_test.go new file mode 100644 index 00000000..b6931a0d --- /dev/null +++ b/integrationtests/incomingSCR_test.go @@ -0,0 +1,280 @@ +//go:build integrationtests + +package integrationtests + +import ( + "context" + "encoding/hex" + "encoding/json" + "math/big" + "testing" + + "github.com/multiversx/mx-chain-core-go/core" + dataBlock "github.com/multiversx/mx-chain-core-go/data/block" + "github.com/multiversx/mx-chain-core-go/data/esdt" + "github.com/multiversx/mx-chain-core-go/data/outport" + "github.com/multiversx/mx-chain-core-go/data/smartContractResult" + "github.com/multiversx/mx-chain-core-go/data/transaction" + "github.com/stretchr/testify/require" + + indexerData "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc" +) + +type esToken struct { + Identifier string + Value *big.Int + NumDecimals int64 +} + +type esNft struct { + Collection string + Nonce uint64 + Data esdt.ESDigitalToken +} + +func createTokens() ([]esToken, []esNft) { + tokens := []esToken{} + token1 := esToken{ + Identifier: "TKN18-1a2b3c", + Value: big.NewInt(123), + NumDecimals: 18, + } + tokens = append(tokens, token1) + token2 := esToken{ + Identifier: "TKN12-1c2b3a", + Value: big.NewInt(333), + NumDecimals: 12, + } + tokens = append(tokens, token2) + + nfts := []esNft{} + nft := esNft{ + Collection: "NFT-abc123", + Nonce: 1, + Data: esdt.ESDigitalToken{ + Type: uint32(core.NonFungibleV2), + Value: big.NewInt(1), + Properties: []byte("3032"), + TokenMetaData: &esdt.MetaData{ + Nonce: 1, + Name: []byte("NFT"), + Creator: []byte("creator"), + Royalties: uint32(2500), + }, + }, + } + nfts = append(nfts, nft) + + return tokens, nfts +} + +func TestCrossChainTokensIndexingFromMainChain(t *testing.T) { + setLogLevelDebug() + + mainChainEsClient, err := createMainChainESClient(esMainChainURL, true) + require.Nil(t, err) + + tokens, nfts := createTokens() + createTokensInSourceEs(t, mainChainEsClient, tokens, nfts) + + esClient, err := createESClient(esURL) + require.Nil(t, err) + + esProc, err := CreateSovereignElasticProcessor(esClient, mainChainEsClient) + require.Nil(t, err) + + allTokens := getAllTokensIDs(tokens, nfts) + allTokens = append(allTokens, getAllNftIDs(nfts)...) + genericResponse := &GenericResponse{} + err = esClient.DoMultiGet(context.Background(), allTokens, indexerData.TokensIndex, true, genericResponse) + require.Nil(t, err) + for _, token := range genericResponse.Docs { + require.False(t, token.Found) + } + + scrHash := []byte("scrHash") + header := &dataBlock.Header{ + Round: 10, + TimeStamp: 2500, + } + body := &dataBlock.Body{ + MiniBlocks: dataBlock.MiniBlockSlice{ + { + Type: dataBlock.SmartContractResultBlock, + SenderShardID: core.MainChainShardId, + ReceiverShardID: core.SovereignChainShardId, + TxHashes: [][]byte{scrHash}, + }, + }, + } + + pool := &outport.TransactionPool{ + SmartContractResults: map[string]*outport.SCRInfo{ + hex.EncodeToString(scrHash): {SmartContractResult: &smartContractResult.SmartContractResult{ + Nonce: 11, + Value: big.NewInt(0), + GasLimit: 0, + SndAddr: decodeAddress("erd1qqqqqqqqqqqqqqqpqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqzllls8a5w6u"), + RcvAddr: decodeAddress("erd1kzrfl2tztgzjpeedwec37c8npcr0a2ulzh9lhmj7xufyg23zcxuqxcqz0s"), + Data: createMultiEsdtTransferData(tokens, nfts), + OriginalTxHash: nil, + }, FeeInfo: &outport.FeeInfo{}}, + }, + } + err = esProc.SaveTransactions(createOutportBlockWithHeader(body, header, pool, nil, 1)) + require.Nil(t, err) + + genericResponse = &GenericResponse{} + err = esClient.DoMultiGet(context.Background(), []string{hex.EncodeToString(scrHash)}, indexerData.ScResultsIndex, true, genericResponse) + require.Nil(t, err) + require.JSONEq(t, + readExpectedResult("./testdata/incomingSCR/incoming-scr.json"), + string(genericResponse.Docs[0].Source), + ) + + genericResponse = &GenericResponse{} + err = esClient.DoMultiGet(context.Background(), allTokens, indexerData.TokensIndex, true, genericResponse) + require.Nil(t, err) + for _, token := range genericResponse.Docs { + require.True(t, token.Found) + } +} + +func createTokensInSourceEs(t *testing.T, esClient elasticproc.DatabaseClientHandler, tokens []esToken, nfts []esNft) { + esProc, err := CreateElasticProcessor(esClient) + require.Nil(t, err) + + body := &dataBlock.Body{} + header := &dataBlock.Header{ + Round: 50, + TimeStamp: 5040, + ShardID: core.MetachainShardId, + } + + address1 := "erd1k04pxr6c0gvlcx4rd5fje0a4uy33axqxwz0fpcrgtfdy3nrqauqqgvxprv" + + // create issue token and nft collection events + events := make([]*transaction.Event, 0) + for _, token := range tokens { + events = append(events, &transaction.Event{ + Address: decodeAddress(address1), + Identifier: []byte("issue"), + Topics: [][]byte{[]byte(token.Identifier), []byte("TKN"), []byte("TKN"), []byte(core.FungibleESDT), big.NewInt(token.NumDecimals).Bytes()}, + }) + } + for _, nft := range nfts { + events = append(events, &transaction.Event{ + Address: decodeAddress(address1), + Identifier: []byte("issueNonFungible"), + Topics: [][]byte{[]byte(nft.Collection), []byte("NFT"), []byte("NFT"), []byte(core.ESDTType(nft.Data.Type).String())}, + }) + } + + pool := &outport.TransactionPool{ + Logs: []*outport.LogData{ + { + TxHash: hex.EncodeToString([]byte("txHash1")), + Log: &transaction.Log{ + Address: decodeAddress(address1), + Events: events, + }, + }, + }, + } + + err = esProc.SaveTransactions(createOutportBlockWithHeader(body, header, pool, nil, testNumOfShards)) + require.Nil(t, err) + + genericResponse := &GenericResponse{} + allTokens := getAllTokensIDs(tokens, nfts) + err = esClient.DoMultiGet(context.Background(), allTokens, indexerData.TokensIndex, true, genericResponse) + require.Nil(t, err) + for _, token := range genericResponse.Docs { + require.True(t, token.Found) + } + + // create nft event + events = make([]*transaction.Event, 0) + for _, nft := range nfts { + nftDataBytes, _ := json.Marshal(nft.Data) + + events = append(events, &transaction.Event{ + Address: decodeAddress(address1), + Identifier: []byte(core.BuiltInFunctionESDTNFTCreate), + Topics: [][]byte{[]byte(nft.Collection), big.NewInt(0).SetUint64(nft.Nonce).Bytes(), nft.Data.Value.Bytes(), []byte(nftDataBytes)}, + }) + } + + header = &dataBlock.Header{ + Round: 51, + TimeStamp: 5600, + ShardID: 0, + } + + pool = &outport.TransactionPool{ + Logs: []*outport.LogData{ + { + TxHash: hex.EncodeToString([]byte("txHash2")), + Log: &transaction.Log{ + Address: decodeAddress(address1), + Events: events, + }, + }, + }, + } + + err = esProc.SaveTransactions(createOutportBlockWithHeader(body, header, pool, nil, testNumOfShards)) + require.Nil(t, err) + + allNfts := getAllNftIDs(nfts) + err = esClient.DoMultiGet(context.Background(), allNfts, indexerData.TokensIndex, true, genericResponse) + require.Nil(t, err) + for _, token := range genericResponse.Docs { + require.True(t, token.Found) + } +} + +func getAllTokensIDs(tokens []esToken, nfts []esNft) []string { + allTokens := make([]string, 0) + for _, token := range tokens { + allTokens = append(allTokens, token.Identifier) + } + for _, nft := range nfts { + allTokens = append(allTokens, nft.Collection) + } + return allTokens +} + +func getAllNftIDs(nfts []esNft) []string { + allNfts := make([]string, 0) + for _, nft := range nfts { + nonceBytes := big.NewInt(0).SetUint64(nft.Nonce).Bytes() + nonceHex := hex.EncodeToString(nonceBytes) + nftIdentifier := nft.Collection + "-" + nonceHex + + allNfts = append(allNfts, nftIdentifier) + + } + return allNfts +} + +func createMultiEsdtTransferData(tokens []esToken, nfts []esNft) []byte { + data := []byte(core.BuiltInFunctionMultiESDTNFTTransfer + + "@" + hex.EncodeToString(big.NewInt(int64(len(tokens)+len(nfts))).Bytes())) + for _, token := range tokens { + data = append(data, []byte( + "@"+hex.EncodeToString([]byte(token.Identifier))+ + "@"+ + "@"+hex.EncodeToString(token.Value.Bytes()))...) + } + for _, nft := range nfts { + nftDataBytes, _ := json.Marshal(nft.Data) + data = append(data, []byte( + "@"+hex.EncodeToString([]byte(nft.Collection))+ + "@"+hex.EncodeToString(big.NewInt(0).SetUint64(nft.Nonce).Bytes())+ + "@"+hex.EncodeToString(nftDataBytes))...) + } + + return data +} diff --git a/integrationtests/testdata/incomingSCR/incoming-scr.json b/integrationtests/testdata/incomingSCR/incoming-scr.json new file mode 100644 index 00000000..5f4c142e --- /dev/null +++ b/integrationtests/testdata/incomingSCR/incoming-scr.json @@ -0,0 +1,43 @@ +{ + "miniBlockHash": "71e255368d7a6686a57a1acb8845953fc54e5a1cfde395acd09df58cb61d5abb", + "nonce": 11, + "gasLimit": 0, + "gasPrice": 0, + "value": "0", + "valueNum": 0, + "sender": "erd1qqqqqqqqqqqqqqqpqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqzllls8a5w6u", + "receiver": "erd1kzrfl2tztgzjpeedwec37c8npcr0a2ulzh9lhmj7xufyg23zcxuqxcqz0s", + "senderShard": 4294967293, + "receiverShard": 0, + "data": "TXVsdGlFU0RUTkZUVHJhbnNmZXJAMDNANTQ0YjRlMzEzODJkMzE2MTMyNjIzMzYzQEA3YkA1NDRiNGUzMTMyMmQzMTYzMzI2MjMzNjFAQDAxNGRANGU0NjU0MmQ2MTYyNjMzMTMyMzNAMDFAN2IyMjU0Nzk3MDY1MjIzYTMyMmMyMjU2NjE2Yzc1NjUyMjNhMzEyYzIyNTA3MjZmNzA2NTcyNzQ2OTY1NzMyMjNhMjI0ZDdhNDE3YTRkNjczZDNkMjIyYzIyNGQ2NTc0NjE0NDYxNzQ2MTIyM2E3YjIyNGU2ZjZlNjM2NTIyM2EzMTJjMjI0ZTYxNmQ2NTIyM2EyMjU0NmI1YTU1MjIyYzIyNDM3MjY1NjE3NDZmNzIyMjNhMjI1OTMzNGE2YzU5NTg1Mjc2NjM2NzNkM2QyMjJjMjI1MjZmNzk2MTZjNzQ2OTY1NzMyMjNhMzIzNTMwMzAyYzIyNDg2MTczNjgyMjNhNmU3NTZjNmMyYzIyNTU1MjQ5NzMyMjNhNmU3NTZjNmMyYzIyNDE3NDc0NzI2OTYyNzU3NDY1NzMyMjNhNmU3NTZjNmM3ZDJjMjI1MjY1NzM2NTcyNzY2NTY0MjIzYTZlNzU2YzZjN2Q=", + "prevTxHash": "", + "originalTxHash": "", + "callType": "0", + "timestamp": 2500, + "tokens": [ + "TKN18-1a2b3c", + "TKN12-1c2b3a", + "NFT-abc123-01" + ], + "esdtValues": [ + "123", + "333", + "1" + ], + "esdtValuesNum": [ + 1.23e-16, + 3.33e-16, + 1e-18 + ], + "receivers": [ + "erd1kzrfl2tztgzjpeedwec37c8npcr0a2ulzh9lhmj7xufyg23zcxuqxcqz0s", + "erd1kzrfl2tztgzjpeedwec37c8npcr0a2ulzh9lhmj7xufyg23zcxuqxcqz0s", + "erd1kzrfl2tztgzjpeedwec37c8npcr0a2ulzh9lhmj7xufyg23zcxuqxcqz0s" + ], + "receiversShardIDs": [ + 0, + 0, + 0 + ], + "operation": "MultiESDTNFTTransfer" +} diff --git a/integrationtests/utils.go b/integrationtests/utils.go index 06239e0c..3a85f7eb 100644 --- a/integrationtests/utils.go +++ b/integrationtests/utils.go @@ -20,12 +20,14 @@ import ( "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc" "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/factory" + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/tokens" "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/transactions" ) var ( log = logger.GetOrCreate("integration-tests") pubKeyConverter, _ = pubkeyConverter.NewBech32PubkeyConverter(32, addressPrefix) + sovEsdtPrefix = "sov" ) // nolint @@ -41,6 +43,12 @@ func createESClient(url string) (elasticproc.DatabaseClientHandler, error) { }) } +// nolint +func createMainChainESClient(url string, enabled bool) (elasticproc.MainChainDatabaseClientHandler, error) { + esClient, _ := createESClient(url) + return client.NewMainChainElasticClient(esClient, enabled) +} + // nolint func decodeAddress(address string) []byte { decoded, err := pubKeyConverter.Decode(address) @@ -62,9 +70,35 @@ func CreateElasticProcessor( EnabledIndexes: []string{dataindexer.TransactionsIndex, dataindexer.LogsIndex, dataindexer.AccountsESDTIndex, dataindexer.ScResultsIndex, dataindexer.ReceiptsIndex, dataindexer.BlockIndex, dataindexer.AccountsIndex, dataindexer.TokensIndex, dataindexer.TagsIndex, dataindexer.EventsIndex, dataindexer.OperationsIndex, dataindexer.DelegatorsIndex, dataindexer.ESDTsIndex, dataindexer.SCDeploysIndex, dataindexer.MiniblocksIndex, dataindexer.ValuesIndex}, - Denomination: 18, - TxHashExtractor: transactions.NewTxHashExtractor(), - RewardTxData: transactions.NewRewardTxData(), + Denomination: 18, + TxHashExtractor: transactions.NewTxHashExtractor(), + RewardTxData: transactions.NewRewardTxData(), + IndexTokensHandler: tokens.NewDisabledIndexTokensHandler(), + } + + return factory.CreateElasticProcessor(args) +} + +// CreateSovereignElasticProcessor - +func CreateSovereignElasticProcessor( + esClient elasticproc.DatabaseClientHandler, + mainEsClient elasticproc.MainChainDatabaseClientHandler, +) (dataindexer.ElasticProcessor, error) { + sovIndexTokens, _ := tokens.NewSovereignIndexTokensHandler(mainEsClient, sovEsdtPrefix) + + args := factory.ArgElasticProcessorFactory{ + Marshalizer: &mock.MarshalizerMock{}, + Hasher: &mock.HasherMock{}, + AddressPubkeyConverter: pubKeyConverter, + ValidatorPubkeyConverter: mock.NewPubkeyConverterMock(32), + DBClient: esClient, + EnabledIndexes: []string{dataindexer.TransactionsIndex, dataindexer.LogsIndex, dataindexer.AccountsESDTIndex, dataindexer.ScResultsIndex, + dataindexer.ReceiptsIndex, dataindexer.BlockIndex, dataindexer.AccountsIndex, dataindexer.TokensIndex, dataindexer.TagsIndex, dataindexer.EventsIndex, + dataindexer.OperationsIndex, dataindexer.DelegatorsIndex, dataindexer.ESDTsIndex, dataindexer.SCDeploysIndex, dataindexer.MiniblocksIndex, dataindexer.ValuesIndex}, + Denomination: 18, + TxHashExtractor: transactions.NewSovereignTxHashExtractor(), + RewardTxData: transactions.NewSovereignRewardTxData(), + IndexTokensHandler: sovIndexTokens, } return factory.CreateElasticProcessor(args) diff --git a/integrationtests/valuesIndex_test.go b/integrationtests/valuesIndex_test.go index 966163c3..0fbd7ae3 100644 --- a/integrationtests/valuesIndex_test.go +++ b/integrationtests/valuesIndex_test.go @@ -11,6 +11,7 @@ import ( "github.com/multiversx/mx-chain-es-indexer-go/mock" indexerData "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc" "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/factory" ) @@ -30,6 +31,7 @@ func TestCheckVersionIsIndexer(t *testing.T) { EnabledIndexes: []string{indexerData.ValuesIndex}, TxHashExtractor: &mock.TxHashExtractorMock{}, RewardTxData: &mock.RewardTxDataMock{}, + IndexTokensHandler: &elasticproc.IndexTokenHandlerMock{}, } _, err = factory.CreateElasticProcessor(args) diff --git a/mock/databaseWriterStub.go b/mock/databaseWriterStub.go index 1df608a8..5a0153ce 100644 --- a/mock/databaseWriterStub.go +++ b/mock/databaseWriterStub.go @@ -86,6 +86,11 @@ func (dwm *DatabaseWriterStub) CheckAndCreatePolicy(_ string, _ *bytes.Buffer) e return nil } +// IsEnabled - +func (dwm *DatabaseWriterStub) IsEnabled() bool { + return false +} + // IsInterfaceNil returns true if there is no value under the interface func (dwm *DatabaseWriterStub) IsInterfaceNil() bool { return dwm == nil diff --git a/process/dataindexer/errors.go b/process/dataindexer/errors.go index 9b4fbd0f..9495e93f 100644 --- a/process/dataindexer/errors.go +++ b/process/dataindexer/errors.go @@ -88,3 +88,6 @@ var ErrNilOperationsHandler = errors.New("nil operations handler") // ErrNilBlockContainerHandler signals that a nil block container handler has been provided var ErrNilBlockContainerHandler = errors.New("nil bock container handler") + +// ErrNilIndexTokensHandler signals that a nil index tokens handler has been provided +var ErrNilIndexTokensHandler = errors.New("nil index tokens handler") diff --git a/process/elasticproc/check.go b/process/elasticproc/check.go index 9284376e..d15a19fc 100644 --- a/process/elasticproc/check.go +++ b/process/elasticproc/check.go @@ -2,6 +2,7 @@ package elasticproc import ( "github.com/multiversx/mx-chain-core-go/core/check" + elasticIndexer "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" ) @@ -39,6 +40,9 @@ func checkArguments(arguments *ArgElasticProcessor) error { if check.IfNilReflect(arguments.OperationsProc) { return elasticIndexer.ErrNilOperationsHandler } + if check.IfNilReflect(arguments.IndexTokensHandler) { + return elasticIndexer.ErrNilIndexTokensHandler + } return nil } diff --git a/process/elasticproc/elasticProcessor.go b/process/elasticproc/elasticProcessor.go index 89634a95..e0568504 100644 --- a/process/elasticproc/elasticProcessor.go +++ b/process/elasticproc/elasticProcessor.go @@ -14,6 +14,8 @@ import ( "github.com/multiversx/mx-chain-core-go/data/alteredAccount" "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" + logger "github.com/multiversx/mx-chain-logger-go" + "github.com/multiversx/mx-chain-es-indexer-go/core/request" "github.com/multiversx/mx-chain-es-indexer-go/data" elasticIndexer "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" @@ -21,7 +23,6 @@ import ( "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/tags" "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/tokeninfo" "github.com/multiversx/mx-chain-es-indexer-go/templates" - logger "github.com/multiversx/mx-chain-logger-go" ) var ( @@ -57,6 +58,7 @@ type ArgElasticProcessor struct { LogsAndEventsProc DBLogsAndEventsHandler OperationsProc OperationsHandler Version string + IndexTokensHandler IndexTokensHandler } type elasticProcessor struct { @@ -73,6 +75,7 @@ type elasticProcessor struct { validatorsProc DBValidatorsHandler logsAndEventsProc DBLogsAndEventsHandler operationsProc OperationsHandler + indexTokensHandler IndexTokensHandler } // NewElasticProcessor handles Elasticsearch operations such as initialization, adding, modifying or removing data @@ -94,6 +97,7 @@ func NewElasticProcessor(arguments *ArgElasticProcessor) (*elasticProcessor, err logsAndEventsProc: arguments.LogsAndEventsProc, operationsProc: arguments.OperationsProc, bulkRequestMaxSize: arguments.BulkRequestMaxSize, + indexTokensHandler: arguments.IndexTokensHandler, } err = ei.init(arguments.UseKibana, arguments.IndexTemplates, arguments.IndexPolicies, arguments.ExtraMappings) @@ -503,6 +507,11 @@ func (ei *elasticProcessor) SaveTransactions(obh *outport.OutportBlockWithHeader return err } + err = ei.indexTokensHandler.IndexCrossChainTokens(ei.elasticClient, preparedResults.ScResults, buffers) + if err != nil { + return err + } + return ei.doBulkRequests("", buffers.Buffers(), obh.ShardID) } diff --git a/process/elasticproc/elasticProcessor_test.go b/process/elasticproc/elasticProcessor_test.go index b01180ac..424970cb 100644 --- a/process/elasticproc/elasticProcessor_test.go +++ b/process/elasticproc/elasticProcessor_test.go @@ -31,15 +31,16 @@ import ( func newElasticsearchProcessor(elasticsearchWriter DatabaseClientHandler, arguments *ArgElasticProcessor) *elasticProcessor { return &elasticProcessor{ - elasticClient: elasticsearchWriter, - enabledIndexes: arguments.EnabledIndexes, - blockProc: arguments.BlockProc, - transactionsProc: arguments.TransactionsProc, - miniblocksProc: arguments.MiniblocksProc, - accountsProc: arguments.AccountsProc, - validatorsProc: arguments.ValidatorsProc, - statisticsProc: arguments.StatisticsProc, - logsAndEventsProc: arguments.LogsAndEventsProc, + elasticClient: elasticsearchWriter, + enabledIndexes: arguments.EnabledIndexes, + blockProc: arguments.BlockProc, + transactionsProc: arguments.TransactionsProc, + miniblocksProc: arguments.MiniblocksProc, + accountsProc: arguments.AccountsProc, + validatorsProc: arguments.ValidatorsProc, + statisticsProc: arguments.StatisticsProc, + logsAndEventsProc: arguments.LogsAndEventsProc, + indexTokensHandler: arguments.IndexTokensHandler, } } @@ -80,14 +81,15 @@ func createMockElasticProcessorArgs() *ArgElasticProcessor { EnabledIndexes: map[string]struct{}{ dataindexer.BlockIndex: {}, dataindexer.TransactionsIndex: {}, dataindexer.MiniblocksIndex: {}, dataindexer.ValidatorsIndex: {}, dataindexer.RoundsIndex: {}, dataindexer.AccountsIndex: {}, dataindexer.RatingIndex: {}, dataindexer.AccountsHistoryIndex: {}, }, - ValidatorsProc: vp, - StatisticsProc: statistics.NewStatisticsProcessor(), - TransactionsProc: &mock.DBTransactionProcessorStub{}, - MiniblocksProc: mp, - AccountsProc: acp, - BlockProc: bp, - LogsAndEventsProc: lp, - OperationsProc: op, + ValidatorsProc: vp, + StatisticsProc: statistics.NewStatisticsProcessor(), + TransactionsProc: &mock.DBTransactionProcessorStub{}, + MiniblocksProc: mp, + AccountsProc: acp, + BlockProc: bp, + LogsAndEventsProc: lp, + OperationsProc: op, + IndexTokensHandler: &IndexTokenHandlerMock{}, } } diff --git a/process/elasticproc/factory/elasticProcessorFactory.go b/process/elasticproc/factory/elasticProcessorFactory.go index 54819439..2471885f 100644 --- a/process/elasticproc/factory/elasticProcessorFactory.go +++ b/process/elasticproc/factory/elasticProcessorFactory.go @@ -19,6 +19,14 @@ import ( "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/validators" ) +// ElasticConfig holds the elastic search settings +type ElasticConfig struct { + Enabled bool + Url string + UserName string + Password string +} + // ArgElasticProcessorFactory is struct that is used to store all components that are needed to create an elastic processor factory type ArgElasticProcessorFactory struct { Marshalizer marshal.Marshalizer @@ -34,6 +42,7 @@ type ArgElasticProcessorFactory struct { ImportDB bool TxHashExtractor transactions.TxHashExtractor RewardTxData transactions.RewardTxDataHandler + IndexTokensHandler elasticproc.IndexTokensHandler } // CreateElasticProcessor will create a new instance of ElasticProcessor @@ -132,6 +141,7 @@ func CreateElasticProcessor(arguments ArgElasticProcessorFactory) (dataindexer.E OperationsProc: operationsProc, ImportDB: arguments.ImportDB, Version: arguments.Version, + IndexTokensHandler: arguments.IndexTokensHandler, } return elasticproc.NewElasticProcessor(args) diff --git a/process/elasticproc/factory/elasticProcessorFactory_test.go b/process/elasticproc/factory/elasticProcessorFactory_test.go index 2e4dd081..f7271737 100644 --- a/process/elasticproc/factory/elasticProcessorFactory_test.go +++ b/process/elasticproc/factory/elasticProcessorFactory_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/require" "github.com/multiversx/mx-chain-es-indexer-go/mock" + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc" ) func TestCreateElasticProcessor(t *testing.T) { @@ -21,6 +22,7 @@ func TestCreateElasticProcessor(t *testing.T) { UseKibana: false, TxHashExtractor: &mock.TxHashExtractorMock{}, RewardTxData: &mock.RewardTxDataMock{}, + IndexTokensHandler: &elasticproc.IndexTokenHandlerMock{}, } ep, err := CreateElasticProcessor(args) diff --git a/process/elasticproc/indexTokenHandlerMock.go b/process/elasticproc/indexTokenHandlerMock.go new file mode 100644 index 00000000..e60f36fb --- /dev/null +++ b/process/elasticproc/indexTokenHandlerMock.go @@ -0,0 +1,23 @@ +package elasticproc + +import ( + "github.com/multiversx/mx-chain-es-indexer-go/data" +) + +// IndexTokenHandlerMock - +type IndexTokenHandlerMock struct { + IndexCrossChainTokensCalled func(elasticClient DatabaseClientHandler, scrs []*data.ScResult, buffSlice *data.BufferSlice) error +} + +// IndexCrossChainTokens - +func (ithh *IndexTokenHandlerMock) IndexCrossChainTokens(elasticClient DatabaseClientHandler, scrs []*data.ScResult, buffSlice *data.BufferSlice) error { + if ithh.IndexCrossChainTokensCalled != nil { + return ithh.IndexCrossChainTokensCalled(elasticClient, scrs, buffSlice) + } + return nil +} + +// IsInterfaceNil returns true if there is no value under the interface +func (ithh *IndexTokenHandlerMock) IsInterfaceNil() bool { + return ithh == nil +} diff --git a/process/elasticproc/interface.go b/process/elasticproc/interface.go index fe4b565f..f80e3809 100644 --- a/process/elasticproc/interface.go +++ b/process/elasticproc/interface.go @@ -8,10 +8,18 @@ import ( "github.com/multiversx/mx-chain-core-go/data/alteredAccount" "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" + "github.com/multiversx/mx-chain-es-indexer-go/data" "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/tokeninfo" ) +// MainChainDatabaseClientHandler defines the actions that sovereign database client handler should do +type MainChainDatabaseClientHandler interface { + DatabaseClientHandler + IsEnabled() bool + IsInterfaceNil() bool +} + // DatabaseClientHandler defines the actions that a component that handles requests should do type DatabaseClientHandler interface { DoBulkRequest(ctx context.Context, buff *bytes.Buffer, index string) error @@ -120,3 +128,9 @@ type OperationsHandler interface { ProcessTransactionsAndSCRs(txs []*data.Transaction, scrs []*data.ScResult, isImportDB bool, shardID uint32) ([]*data.Transaction, []*data.ScResult) SerializeSCRs(scrs []*data.ScResult, buffSlice *data.BufferSlice, index string, shardID uint32) error } + +// IndexTokensHandler defines what index tokens handler should be able to do +type IndexTokensHandler interface { + IndexCrossChainTokens(handler DatabaseClientHandler, scrs []*data.ScResult, buffSlice *data.BufferSlice) error + IsInterfaceNil() bool +} diff --git a/process/elasticproc/tokens/disabledIndexTokensHandler.go b/process/elasticproc/tokens/disabledIndexTokensHandler.go new file mode 100644 index 00000000..18fc63f8 --- /dev/null +++ b/process/elasticproc/tokens/disabledIndexTokensHandler.go @@ -0,0 +1,23 @@ +package tokens + +import ( + "github.com/multiversx/mx-chain-es-indexer-go/data" + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc" +) + +type disabledTndexTokensHandler struct{} + +// NewDisabledIndexTokensHandler creates a new disabled index tokens handler +func NewDisabledIndexTokensHandler() *disabledTndexTokensHandler { + return &disabledTndexTokensHandler{} +} + +// IndexCrossChainTokens should do nothing and return no error +func (dit *disabledTndexTokensHandler) IndexCrossChainTokens(_ elasticproc.DatabaseClientHandler, _ []*data.ScResult, _ *data.BufferSlice) error { + return nil +} + +// IsInterfaceNil returns true if there is no value under the interface +func (dit *disabledTndexTokensHandler) IsInterfaceNil() bool { + return dit == nil +} diff --git a/process/elasticproc/tokens/disabledIndexTokensHandler_test.go b/process/elasticproc/tokens/disabledIndexTokensHandler_test.go new file mode 100644 index 00000000..6b490a81 --- /dev/null +++ b/process/elasticproc/tokens/disabledIndexTokensHandler_test.go @@ -0,0 +1,22 @@ +package tokens + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewIndexTokensHandler(t *testing.T) { + t.Parallel() + + ith := NewDisabledIndexTokensHandler() + require.False(t, ith.IsInterfaceNil()) +} + +func TestIndexTokensHandler_IndexCrossChainTokens(t *testing.T) { + t.Parallel() + + ith := NewDisabledIndexTokensHandler() + err := ith.IndexCrossChainTokens(nil, nil, nil) + require.NoError(t, err) +} diff --git a/process/elasticproc/tokens/sovereignIndexTokensHandler.go b/process/elasticproc/tokens/sovereignIndexTokensHandler.go new file mode 100644 index 00000000..1a1e1385 --- /dev/null +++ b/process/elasticproc/tokens/sovereignIndexTokensHandler.go @@ -0,0 +1,143 @@ +package tokens + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-core-go/data/esdt" + + "github.com/multiversx/mx-chain-es-indexer-go/data" + indexerdata "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc" + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/converters" +) + +type sovereignIndexTokensHandler struct { + mainChainElasticClient elasticproc.MainChainDatabaseClientHandler + esdtPrefix string +} + +// NewSovereignIndexTokensHandler creates a new sovereign index tokens handler +func NewSovereignIndexTokensHandler(mainChainElasticClient elasticproc.MainChainDatabaseClientHandler, esdtPrefix string) (*sovereignIndexTokensHandler, error) { + return &sovereignIndexTokensHandler{ + mainChainElasticClient: mainChainElasticClient, + esdtPrefix: esdtPrefix, + }, nil +} + +// IndexCrossChainTokens will index the new tokens properties +func (sit *sovereignIndexTokensHandler) IndexCrossChainTokens(elasticClient elasticproc.DatabaseClientHandler, scrs []*data.ScResult, buffSlice *data.BufferSlice) error { + if !sit.mainChainElasticClient.IsEnabled() { + return nil + } + + newTokens, err := sit.getNewTokensFromSCRs(elasticClient, scrs) + if err != nil { + return err + } + + if len(newTokens) == 0 { // no new tokens + return nil + } + + // get tokens from main chain elastic db + mainChainTokens := &data.ResponseTokenInfo{} + err = sit.mainChainElasticClient.DoMultiGet(context.Background(), newTokens, indexerdata.TokensIndex, true, mainChainTokens) + if err != nil { + return err + } + + return sit.serializeNewTokens(mainChainTokens.Docs, buffSlice) +} + +func (sit *sovereignIndexTokensHandler) getNewTokensFromSCRs(elasticClient elasticproc.DatabaseClientHandler, scrs []*data.ScResult) ([]string, error) { + receivedTokensIDs := make([]string, 0) + for _, scr := range scrs { + if scr.SenderShard == core.MainChainShardId { + receivedTokensIDs = append(receivedTokensIDs, sit.extractNewSovereignTokens(scr.Tokens)...) + } + } + + if len(receivedTokensIDs) == 0 { + return make([]string, 0), nil + } + + responseTokens := &data.ResponseTokens{} + err := elasticClient.DoMultiGet(context.Background(), receivedTokensIDs, indexerdata.TokensIndex, true, responseTokens) + if err != nil { + return nil, err + } + + newTokens := make([]string, 0) + for _, token := range responseTokens.Docs { + if !token.Found { + newTokens = append(newTokens, token.ID) + } + } + + return newTokens, nil +} + +func (sit *sovereignIndexTokensHandler) extractNewSovereignTokens(tokens []string) []string { + receivedTokensIDs := make([]string, 0) + for _, token := range tokens { + tokenPrefix, hasPrefix := esdt.IsValidPrefixedToken(token) + if !hasPrefix || tokenPrefix != sit.esdtPrefix { + receivedTokensIDs = append(receivedTokensIDs, token) + } + if tokenCollection := getTokenCollection(hasPrefix, token); tokenCollection != "" { + receivedTokensIDs = append(receivedTokensIDs, tokenCollection) + } + } + + return receivedTokensIDs +} + +func getTokenCollection(hasPrefix bool, tokenIdentifier string) string { + tokenSplit := strings.Split(tokenIdentifier, "-") + if !hasPrefix && len(tokenSplit) == 3 { + return tokenSplit[0] + "-" + tokenSplit[1] + } + if hasPrefix && len(tokenSplit) == 4 { + return tokenSplit[1] + "-" + tokenSplit[2] + } + return "" +} + +func (sit *sovereignIndexTokensHandler) serializeNewTokens(responseTokensInfo []data.ResponseTokenInfoDB, buffSlice *data.BufferSlice) error { + for _, responseToken := range responseTokensInfo { + token, identifier := formatToken(responseToken) + + meta := []byte(fmt.Sprintf(`{ "index" : { "_index":"%s", "_id" : "%s" } }%s`, indexerdata.TokensIndex, converters.JsonEscape(identifier), "\n")) + serializedTokenData, err := json.Marshal(token) + if err != nil { + return err + } + + err = buffSlice.PutData(meta, serializedTokenData) + if err != nil { + return err + } + } + + return nil +} + +func formatToken(token data.ResponseTokenInfoDB) (data.TokenInfo, string) { + token.Source.OwnersHistory = nil + token.Source.Properties = nil + + identifier := token.Source.Identifier // for NFTs + if identifier == "" { + identifier = token.Source.Token // for tokens/collections + } + return token.Source, identifier +} + +// IsInterfaceNil returns true if there is no value under the interface +func (sit *sovereignIndexTokensHandler) IsInterfaceNil() bool { + return sit == nil +} diff --git a/process/elasticproc/tokens/sovereignIndexTokensHandler_test.go b/process/elasticproc/tokens/sovereignIndexTokensHandler_test.go new file mode 100644 index 00000000..b881289b --- /dev/null +++ b/process/elasticproc/tokens/sovereignIndexTokensHandler_test.go @@ -0,0 +1,45 @@ +package tokens + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/multiversx/mx-chain-es-indexer-go/client/disabled" + "github.com/multiversx/mx-chain-es-indexer-go/data" + "github.com/multiversx/mx-chain-es-indexer-go/mock" +) + +const ( + prefix = "sov" +) + +func TestSovereignNewIndexTokensHandler(t *testing.T) { + t.Parallel() + + t.Run("valid disabled config, should work", func(t *testing.T) { + sith, err := NewSovereignIndexTokensHandler(disabled.NewDisabledElasticClient(), prefix) + require.NoError(t, err) + require.Equal(t, "*disabled.elasticClient", fmt.Sprintf("%T", sith.mainChainElasticClient)) + }) + t.Run("valid config, should work", func(t *testing.T) { + sith, err := NewSovereignIndexTokensHandler(&mock.DatabaseWriterStub{}, prefix) + require.NoError(t, err) + require.Equal(t, "*mock.DatabaseWriterStub", fmt.Sprintf("%T", sith.mainChainElasticClient)) + }) +} + +func TestSovereignIndexTokensHandler_IndexCrossChainTokens(t *testing.T) { + t.Parallel() + + sith, err := NewSovereignIndexTokensHandler(disabled.NewDisabledElasticClient(), prefix) + require.NoError(t, err) + require.NotNil(t, sith) + + // should skip indexing + err = sith.IndexCrossChainTokens(nil, make([]*data.ScResult, 0), data.NewBufferSlice(0)) + require.NoError(t, err) + + // actual indexing is tested in TestCrossChainTokensIndexingFromMainChain +} diff --git a/process/factory/indexerFactory.go b/process/factory/indexerFactory.go index 295039aa..5def543a 100644 --- a/process/factory/indexerFactory.go +++ b/process/factory/indexerFactory.go @@ -2,9 +2,7 @@ package factory import ( "fmt" - "math" "net/http" - "time" "github.com/elastic/go-elasticsearch/v7" "github.com/multiversx/mx-chain-core-go/core" @@ -33,6 +31,8 @@ type ArgsIndexerFactory struct { UseKibana bool ImportDB bool Sovereign bool + ESDTPrefix string + MainChainElastic factory.ElasticConfig Denomination int BulkRequestMaxSize int Url string @@ -58,7 +58,7 @@ func NewIndexer(args ArgsIndexerFactory) (dataindexer.Indexer, error) { } if args.Sovereign { - args.RunTypeComponents, err = createManagedRunTypeComponents(runType.NewSovereignRunTypeComponentsFactory()) + args.RunTypeComponents, err = createManagedRunTypeComponents(runType.NewSovereignRunTypeComponentsFactory(args.MainChainElastic, args.ESDTPrefix)) } else { args.RunTypeComponents, err = createManagedRunTypeComponents(runType.NewRunTypeComponentsFactory()) } @@ -99,13 +99,6 @@ func createManagedRunTypeComponents(factory runType.RunTypeComponentsCreator) (r return managedRunTypeComponents, nil } -func retryBackOff(attempt int) time.Duration { - d := time.Duration(math.Exp2(float64(attempt))) * time.Second - log.Debug("elastic: retry backoff", "attempt", attempt, "sleep duration", d) - - return d -} - func createElasticProcessor(args ArgsIndexerFactory) (dataindexer.ElasticProcessor, error) { databaseClient, err := createElasticClient(args) if err != nil { @@ -126,6 +119,7 @@ func createElasticProcessor(args ArgsIndexerFactory) (dataindexer.ElasticProcess Version: args.Version, TxHashExtractor: args.RunTypeComponents.TxHashExtractorCreator(), RewardTxData: args.RunTypeComponents.RewardTxDataCreator(), + IndexTokensHandler: args.RunTypeComponents.IndexTokensHandlerCreator(), } return factory.CreateElasticProcessor(argsElasticProcFac) @@ -138,7 +132,7 @@ func createElasticClient(args ArgsIndexerFactory) (elasticproc.DatabaseClientHan Password: args.Password, Logger: &logging.CustomLogger{}, RetryOnStatus: []int{http.StatusConflict}, - RetryBackoff: retryBackOff, + RetryBackoff: client.RetryBackOff, } if check.IfNil(args.StatusMetrics) { diff --git a/scripts/script.sh b/scripts/script.sh index 11a1bdb4..166780fc 100755 --- a/scripts/script.sh +++ b/scripts/script.sh @@ -1,4 +1,5 @@ IMAGE_NAME=elastic-container +MAIN_CHAIN_IMAGE_NAME=main-chain-elastic-container DEFAULT_ES_VERSION=7.16.2 PROMETHEUS_CONTAINER_NAME=prometheus_container GRAFANA_CONTAINER_NAME=grafana_container @@ -16,9 +17,13 @@ start() { docker pull docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION} docker rm ${IMAGE_NAME} 2> /dev/null + docker rm ${MAIN_CHAIN_IMAGE_NAME} 2> /dev/null docker run -d --name "${IMAGE_NAME}" -p 9200:9200 -p 9300:9300 \ -e "discovery.type=single-node" -e "xpack.security.enabled=false" -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \ docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION} + docker run -d --name "${MAIN_CHAIN_IMAGE_NAME}" -p 9201:9200 -p 9301:9300 \ + -e "discovery.type=single-node" -e "xpack.security.enabled=false" -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \ + docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION} # Wait elastic cluster to start echo "Waiting Elasticsearch cluster to start..." @@ -27,6 +32,7 @@ start() { stop() { docker stop "${IMAGE_NAME}" + docker stop "${MAIN_CHAIN_IMAGE_NAME}" } delete() { @@ -41,6 +47,7 @@ delete() { IMAGE_OPEN_SEARCH=open-container +MAIN_CHAIN_IMAGE_OPEN_SEARCH=main-chain-open-container DEFAULT_OPEN_SEARCH_VERSION=1.2.4 start_open_search() { @@ -52,9 +59,13 @@ start_open_search() { docker pull opensearchproject/opensearch:${OPEN_VERSION} docker rm ${IMAGE_OPEN_SEARCH} 2> /dev/null + docker rm ${MAIN_CHAIN_IMAGE_OPEN_SEARCH} 2> /dev/null docker run -d --name "${IMAGE_OPEN_SEARCH}" -p 9200:9200 -p 9600:9600 \ -e "discovery.type=single-node" -e "plugins.security.disabled=true" -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \ opensearchproject/opensearch:${OPEN_VERSION} + docker run -d --name "${MAIN_CHAIN_IMAGE_OPEN_SEARCH}" -p 9201:9200 -p 9601:9600 \ + -e "discovery.type=single-node" -e "plugins.security.disabled=true" -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \ + opensearchproject/opensearch:${OPEN_VERSION} }