Skip to content

Commit

Permalink
Merge pull request #65 from covalenthq/pinnerv2
Browse files Browse the repository at this point in the history
ipfs-pinner support to generate client side cids and do car file uploads
  • Loading branch information
noslav authored Apr 16, 2022
2 parents 70fb2b2 + b74b559 commit dd7ac8c
Show file tree
Hide file tree
Showing 8 changed files with 1,760 additions and 112 deletions.
39 changes: 19 additions & 20 deletions cmd/bspagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (
"github.com/covalenthq/bsp-agent/internal/config"
"github.com/covalenthq/bsp-agent/internal/event"
"github.com/covalenthq/bsp-agent/internal/handler"
st "github.com/covalenthq/bsp-agent/internal/storage"
"github.com/covalenthq/bsp-agent/internal/types"
"github.com/covalenthq/bsp-agent/internal/utils"
"github.com/covalenthq/bsp-agent/internal/websocket"
pinner "github.com/covalenthq/ipfs-pinner"
pinapi "github.com/covalenthq/ipfs-pinner"
pincore "github.com/covalenthq/ipfs-pinner/core"
)

const (
Expand Down Expand Up @@ -79,7 +81,7 @@ func parseFlags() {
flag.IntVar(&blockDivisorFlag, "block-divisor", utils.LookupEnvOrInt("BlockDivisor", blockDivisorFlag), "integer divisor that allows for selecting only block numbers divisible by this number")
flag.IntVar(&consumerPendingTimeoutFlag, "consumer-timeout", utils.LookupEnvOrInt("ConsumerPendingTimeout", consumerPendingTimeoutFlag), "number of seconds to wait before pending messages consumer timeout")
flag.StringVar(&logFolderFlag, "log-folder", utils.LookupEnvOrString("LogFolder", logFolderFlag), "Location where the log files should be placed")
flag.StringVar(&ipfsServiceFlag, "ipfs-service", utils.LookupEnvOrString("IpfsService", ipfsServiceFlag), "Allowed values are 'pinata' and 'others'")
flag.StringVar(&ipfsServiceFlag, "ipfs-service", utils.LookupEnvOrString("IpfsService", ipfsServiceFlag), "Allowed values are 'web3.storage', 'pinata' and 'others'")

flag.Parse()
}
Expand Down Expand Up @@ -133,7 +135,7 @@ func main() {
if err != nil {
log.Fatalf("unable to get redis client from redis URL flag: %v", err)
}
storageClient, err := utils.NewStorageClient(gcpSvcAccountFlag)
gcpStorageClient, err := utils.NewGCPStorageClient(gcpSvcAccountFlag)
if err != nil {
log.Printf("unable to get gcp storage client; --gcp-svc-account flag not set or set incorrectly: %v, storing BSP files locally: %v", err, utils.LookupEnvOrString("BinaryFilePath", binaryFilePathFlag))
}
Expand All @@ -150,25 +152,22 @@ func main() {
log.Fatalf("unable to generate avro codec for block-replica: %v", err)
}

var pinclient *pinner.Client
if ipfsServiceFlag == pinner.Pinata.String() {
req := pinner.NewClientRequest(pinner.Pinata).BearerToken(config.IPFSConfig.JWTToken)
pinclient = pinner.NewClient(req)
} else if ipfsServiceFlag != "" {
log.Fatalf("Only pinata IPFS service supported for now")
pinnode, err := st.GetPinnerNode(pincore.PinningService(ipfsServiceFlag), config.IPFSConfig.ServiceToken)
if err != nil {
log.Fatalf("error creating pinner node: %v", err)
}

if websocketURLsFlag != "" {
websocketsURLs := strings.Split(websocketURLsFlag, " ")
for _, url := range websocketsURLs {
go websocket.ConsumeWebsocketsEvents(&config.EthConfig, url, replicaCodec, ethClient, storageClient, binaryFilePathFlag, replicaBucketFlag, proofChainFlag)
go websocket.ConsumeWebsocketsEvents(&config.EthConfig, url, replicaCodec, ethClient, gcpStorageClient, binaryFilePathFlag, replicaBucketFlag, proofChainFlag)
}
} else {
var consumerName string = uuid.NewV4().String()
log.Printf("Initializing Consumer: %v | Redis Stream: %v | Consumer Group: %v", consumerName, streamKey, consumerGroup)
createConsumerGroup(redisClient, streamKey, consumerGroup)
go consumeEvents(config, replicaCodec, redisClient, pinclient, storageClient, ethClient, consumerName, streamKey, consumerGroup)
go consumePendingEvents(config, replicaCodec, redisClient, pinclient, storageClient, ethClient, consumerName, streamKey, consumerGroup)
go consumeEvents(config, replicaCodec, redisClient, pinnode, gcpStorageClient, ethClient, consumerName, streamKey, consumerGroup)
go consumePendingEvents(config, replicaCodec, redisClient, pinnode, gcpStorageClient, ethClient, consumerName, streamKey, consumerGroup)
}

// Gracefully disconnect
Expand All @@ -187,8 +186,8 @@ func main() {
}
}

if storageClient != nil {
err = storageClient.Close()
if gcpStorageClient != nil {
err = gcpStorageClient.Close()
if err != nil {
log.Error("error in closing storage client: ", err)
}
Expand All @@ -205,7 +204,7 @@ func createConsumerGroup(redisClient *redis.Client, streamKey, consumerGroup str
}
}

func consumeEvents(config *config.Config, avroCodecs *goavro.Codec, redisClient *redis.Client, pinClient *pinner.Client, storageClient *storage.Client, ethClient *ethclient.Client, consumerName, streamKey, consumerGroup string) {
func consumeEvents(config *config.Config, avroCodecs *goavro.Codec, redisClient *redis.Client, pinnode pinapi.PinnerNode, gcpStorageClient *storage.Client, ethClient *ethclient.Client, consumerName, streamKey, consumerGroup string) {
for {
log.Debug("New sequential stream unit: ", time.Now().Format(time.RFC3339))
streams, err := redisClient.XReadGroup(&redis.XReadGroupArgs{
Expand All @@ -223,14 +222,14 @@ func consumeEvents(config *config.Config, avroCodecs *goavro.Codec, redisClient

for _, stream := range streams[0].Messages {
waitGrp.Add(1)
go processStream(config, avroCodecs, redisClient, storageClient, pinClient, ethClient, stream, streamKey, consumerGroup)
go processStream(config, avroCodecs, redisClient, gcpStorageClient, pinnode, ethClient, stream, streamKey, consumerGroup)
}
waitGrp.Wait()
}
}

// consume pending messages from redis
func consumePendingEvents(config *config.Config, avroCodecs *goavro.Codec, redisClient *redis.Client, pinClient *pinner.Client, storageClient *storage.Client, ethClient *ethclient.Client, consumerName, streamKey, consumerGroup string) {
func consumePendingEvents(config *config.Config, avroCodecs *goavro.Codec, redisClient *redis.Client, pinnode pinapi.PinnerNode, gcpStorageClient *storage.Client, ethClient *ethclient.Client, consumerName, streamKey, consumerGroup string) {
timeout := time.After(time.Second * time.Duration(consumerPendingTimeoutFlag))
ticker := time.Tick(time.Second * time.Duration(consumerPendingTimeTicker))
for {
Expand Down Expand Up @@ -269,7 +268,7 @@ func consumePendingEvents(config *config.Config, avroCodecs *goavro.Codec, redis
}
for _, stream := range streams {
waitGrp.Add(1)
go processStream(config, avroCodecs, redisClient, storageClient, pinClient, ethClient, stream, streamKey, consumerGroup)
go processStream(config, avroCodecs, redisClient, gcpStorageClient, pinnode, ethClient, stream, streamKey, consumerGroup)
}
waitGrp.Wait()
}
Expand All @@ -278,7 +277,7 @@ func consumePendingEvents(config *config.Config, avroCodecs *goavro.Codec, redis
}
}

func processStream(config *config.Config, replicaCodec *goavro.Codec, redisClient *redis.Client, storageClient *storage.Client, pinClient *pinner.Client, ethClient *ethclient.Client, stream redis.XMessage, streamKey, consumerGroup string) {
func processStream(config *config.Config, replicaCodec *goavro.Codec, redisClient *redis.Client, gcpStorageClient *storage.Client, pinnode pinapi.PinnerNode, ethClient *ethclient.Client, stream redis.XMessage, streamKey, consumerGroup string) {
ctx := context.Background()
hash := stream.Values["hash"].(string)
decodedData, err := snappy.Decode(nil, []byte(stream.Values["data"].(string)))
Expand Down Expand Up @@ -311,7 +310,7 @@ func processStream(config *config.Config, replicaCodec *goavro.Codec, redisClien
replicationSegment.Elements = uint64(segmentLength)
replicaSegmentName = fmt.Sprint(replica.Data.NetworkId) + "-" + fmt.Sprint(replicationSegment.StartBlock) + objectType
// avro encode, prove and upload
_, err := handler.EncodeProveAndUploadReplicaSegment(ctx, &config.EthConfig, pinClient, replicaCodec, &replicationSegment, objectReplica, storageClient, ethClient, binaryFilePathFlag, replicaBucketFlag, replicaSegmentName, proofChainFlag)
_, err := handler.EncodeProveAndUploadReplicaSegment(ctx, &config.EthConfig, pinnode, replicaCodec, &replicationSegment, objectReplica, gcpStorageClient, ethClient, binaryFilePathFlag, replicaBucketFlag, replicaSegmentName, proofChainFlag)
if err != nil {
log.Error("failed to avro encode, prove and upload block-result segment with err: ", err)
panic(err)
Expand Down
Loading

0 comments on commit dd7ac8c

Please sign in to comment.