diff --git a/accountresolver/bundlerreader.go b/accountresolver/bundlerreader.go index 2472049d..9e2fbe62 100644 --- a/accountresolver/bundlerreader.go +++ b/accountresolver/bundlerreader.go @@ -2,6 +2,7 @@ package accountsresolver import ( "context" + "encoding/binary" "fmt" "io" "time" @@ -57,10 +58,13 @@ func (r *BundleReader) PushBlock(block *bstream.Block) error { return fmt.Errorf("unable to marshal proto block: %w", err) } + length := make([]byte, 4) + binary.BigEndian.PutUint32(length, uint32(len(data))) + select { case <-r.ctx.Done(): return nil - case r.blockData <- data: + case r.blockData <- append(length, data...): return nil } } diff --git a/cmd/grrr/main.go b/cmd/grrr/main.go new file mode 100644 index 00000000..31412c43 --- /dev/null +++ b/cmd/grrr/main.go @@ -0,0 +1,42 @@ +package main + +import ( + "fmt" + + "github.com/streamingfast/dbin" + + pbbstream "github.com/streamingfast/pbgo/sf/bstream/v1" + "google.golang.org/protobuf/proto" +) + +func main() { + //data, err := os.ReadFile("/Users/cbillett/devel/sf/0154667200.dbin") + //if err != nil { + // panic(err) + //} + + fr, err := dbin.NewFileReader("/Users/cbillett/devel/sf/0154667200.dbin") + if err != nil { + panic(err) + } + + contentType, version, err := fr.ReadHeader() + if err != nil { + panic(err) + } + fmt.Println("type:", contentType, "version:", version) + + for { + data, err := fr.ReadMessage() + if err != nil { + panic(err) + } + b := &pbbstream.Block{} + err = proto.Unmarshal(data, b) + if err != nil { + panic(err) + } + + fmt.Println(b.Number) + } +}