Skip to content

Commit

Permalink
upgrade to use google protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
mangalaman93 authored and shivaji-kharse committed Oct 14, 2024
1 parent ecbc356 commit 42b8fa8
Show file tree
Hide file tree
Showing 63 changed files with 7,856 additions and 25,701 deletions.
3 changes: 2 additions & 1 deletion algo/uidlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/dgraph/v24/codec"
"github.com/dgraph-io/dgraph/v24/protos/pb"
Expand Down Expand Up @@ -450,7 +451,7 @@ func BenchmarkListIntersectRatio(b *testing.B) {
compressedUids := codec.Encode(v1, 256)

fmt.Printf("len: %d, compressed: %d, bytes/int: %f\n",
len(v1), compressedUids.Size(), float64(compressedUids.Size())/float64(len(v1)))
len(v1), proto.Size(compressedUids), float64(proto.Size(compressedUids))/float64(len(v1)))
b.Run(fmt.Sprintf(":IntersectWith:ratio=%d:size=%d:overlap=%.2f:", r, sz, overlap),
func(b *testing.B) {
for k := 0; k < b.N; k++ {
Expand Down
5 changes: 3 additions & 2 deletions codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/dustin/go-humanize"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/dgraph/v24/protos/pb"
"github.com/dgraph-io/dgraph/v24/x"
Expand Down Expand Up @@ -276,7 +277,7 @@ func benchmarkUidPackEncode(b *testing.B, blockSize int) {
var data []byte
for i := 0; i < b.N; i++ {
pack := Encode(uids, blockSize)
out, err := pack.Marshal()
out, err := proto.Marshal(pack)
FreePack(pack)
if err != nil {
b.Fatalf("Error marshaling uid pack: %s", err.Error())
Expand Down Expand Up @@ -311,7 +312,7 @@ func benchmarkUidPackDecode(b *testing.B, blockSize int) {
b.Logf("Dataset Len=%d. Size: %s", len(uids), humanize.Bytes(sz))

pack := Encode(uids, blockSize)
data, err := pack.Marshal()
data, err := proto.Marshal(pack)
defer FreePack(pack)
x.Check(err)
b.Logf("Output size: %s. Compression: %.2f",
Expand Down
3 changes: 2 additions & 1 deletion conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
otrace "go.opencensus.io/trace"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/dgo/v240/protos/api"
Expand Down Expand Up @@ -598,7 +599,7 @@ func (n *Node) proposeConfChange(ctx context.Context, conf raftpb.ConfChange) er
func (n *Node) addToCluster(ctx context.Context, rc *pb.RaftContext) error {
pid := rc.Id
rc.SnapshotTs = 0
rcBytes, err := rc.Marshal()
rcBytes, err := proto.Marshal(rc)
x.Check(err)

cc := raftpb.ConfChange{
Expand Down
1 change: 1 addition & 0 deletions conn/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (p *proposals) Done(key uint64, err error) {
type RaftServer struct {
m sync.RWMutex
node *Node
pb.RaftServer
}

// UpdateNode safely updates the node.
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/alpha/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import (
"sync/atomic"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/golang/glog"
"github.com/pkg/errors"
"google.golang.org/grpc/metadata"
jsonpb "google.golang.org/protobuf/encoding/protojson"

"github.com/dgraph-io/dgo/v240/protos/api"
"github.com/dgraph-io/dgraph/v24/dql"
Expand Down Expand Up @@ -600,7 +600,7 @@ func alterHandler(w http.ResponseWriter, r *http.Request) {
}

op := &api.Operation{}
if err := jsonpb.UnmarshalString(string(b), op); err != nil {
if err := jsonpb.Unmarshal(b, op); err != nil {
op.Schema = string(b)
}

Expand Down
3 changes: 2 additions & 1 deletion dgraph/cmd/alpha/login_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net/http"

"github.com/golang/glog"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/dgo/v240/protos/api"
"github.com/dgraph-io/dgraph/v24/edgraph"
Expand Down Expand Up @@ -48,7 +49,7 @@ func loginHandler(w http.ResponseWriter, r *http.Request) {
}

jwt := &api.Jwt{}
if err := jwt.Unmarshal(resp.Json); err != nil {
if err := proto.Unmarshal(resp.Json, jwt); err != nil {
x.SetStatusWithData(w, x.Error, err.Error())
}

Expand Down
11 changes: 6 additions & 5 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
farm "github.com/dgryski/go-farm"
"github.com/golang/glog"
"github.com/golang/snappy"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/dgo/v240/protos/api"
"github.com/dgraph-io/dgraph/v24/chunker"
Expand Down Expand Up @@ -90,7 +91,7 @@ type MapEntry []byte
// }

func mapEntrySize(key []byte, p *pb.Posting) int {
return 8 + 4 + 4 + len(key) + p.Size() // UID + keySz + postingSz + len(key) + size(p)
return 8 + 4 + 4 + len(key) + proto.Size(p) // UID + keySz + postingSz + len(key) + size(p)
}

func marshalMapEntry(dst []byte, uid uint64, key []byte, p *pb.Posting) {
Expand All @@ -100,14 +101,14 @@ func marshalMapEntry(dst []byte, uid uint64, key []byte, p *pb.Posting) {
binary.BigEndian.PutUint64(dst[0:8], uid)
binary.BigEndian.PutUint32(dst[8:12], uint32(len(key)))

psz := p.Size()
psz := proto.Size(p)
binary.BigEndian.PutUint32(dst[12:16], uint32(psz))

n := copy(dst[16:], key)

if psz > 0 {
pbuf := dst[16+n:]
_, err := p.MarshalToSizedBuffer(pbuf[:psz])
_, err := x.MarshalToSizedBuffer(pbuf, p)
x.Check(err)
}

Expand Down Expand Up @@ -205,8 +206,8 @@ func (m *mapper) writeMapEntriesToFile(cbuf *z.Buffer, shardIdx int) {
x.Check(err)
}

// Write the header to the map file.
headerBuf, err := header.Marshal()
// Write the header to the map file.s
headerBuf, err := proto.Marshal(header)
x.Check(err)
lenBuf := make([]byte, 4)
binary.BigEndian.PutUint32(lenBuf, uint32(len(headerBuf)))
Expand Down
9 changes: 5 additions & 4 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/dustin/go-humanize"
"github.com/golang/glog"
"github.com/golang/snappy"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/badger/v4"
bo "github.com/dgraph-io/badger/v4/options"
Expand Down Expand Up @@ -234,7 +235,7 @@ func newMapIterator(filename string) (*pb.MapHeader, *mapIterator) {

x.Check2(io.ReadFull(reader, headerBuf))
header := &pb.MapHeader{}
err = header.Unmarshal(headerBuf)
err = proto.Unmarshal(headerBuf, header)
x.Check(err)

itr := &mapIterator{
Expand Down Expand Up @@ -351,7 +352,7 @@ func (r *reducer) startWriting(ci *countIndexer, writerCh chan *encodeRequest, c
kv := &bpb.KV{}
err := kvBuf.SliceIterate(func(s []byte) error {
kv.Reset()
x.Check(kv.Unmarshal(s))
x.Check(proto.Unmarshal(s, kv))
if lastStreamId == kv.StreamId {
return nil
}
Expand Down Expand Up @@ -639,7 +640,7 @@ func (r *reducer) toList(req *encodeRequest) {
enc.Add(uid)
if pbuf := me.Plist(); len(pbuf) > 0 {
p := getPosting()
x.Check(p.Unmarshal(pbuf))
x.Check(proto.Unmarshal(pbuf, p))
pl.Postings = append(pl.Postings, p)
}
}
Expand Down Expand Up @@ -678,7 +679,7 @@ func (r *reducer) toList(req *encodeRequest) {
}
}

shouldSplit := pl.Size() > (1<<20)/2 && len(pl.Pack.Blocks) > 1
shouldSplit := proto.Size(pl) > (1<<20)/2 && len(pl.Pack.Blocks) > 1
if shouldSplit {
// Give ownership of pl.Pack away to list. Rollup would deallocate the Pack.
// We do rollup at math.MaxUint64 so that we don't change the allocated
Expand Down
5 changes: 3 additions & 2 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"strings"

"github.com/spf13/cobra"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/dgraph/v24/ee"
Expand Down Expand Up @@ -325,7 +326,7 @@ func run() {
}

var bulkMeta pb.BulkMeta
if err = bulkMeta.Unmarshal(bulkMetaData); err != nil {
if err = proto.Unmarshal(bulkMetaData, &bulkMeta); err != nil {
fmt.Fprintln(os.Stderr, "Error deserializing bulk meta file")
os.Exit(1)
}
Expand All @@ -343,7 +344,7 @@ func run() {
SchemaMap: loader.schema.schemaMap,
Types: loader.schema.types,
}
bulkMetaData, err := bulkMeta.Marshal()
bulkMetaData, err := proto.Marshal(&bulkMeta)
if err != nil {
fmt.Fprintln(os.Stderr, "Error serializing bulk meta file")
os.Exit(1)
Expand Down
6 changes: 4 additions & 2 deletions dgraph/cmd/bulk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"math"
"sync"

"google.golang.org/protobuf/proto"

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/dgraph/v24/posting"
"github.com/dgraph-io/dgraph/v24/protos/pb"
Expand Down Expand Up @@ -173,7 +175,7 @@ func (s *schemaStore) write(db *badger.DB, preds []string) {
continue
}
k := x.SchemaKey(pred)
v, err := sch.Marshal()
v, err := proto.Marshal(sch)
x.Check(err)
// Write schema and types always at timestamp 1, s.state.writeTs may not be equal to 1
// if bulk loader was restarted or other similar scenarios.
Expand All @@ -183,7 +185,7 @@ func (s *schemaStore) write(db *badger.DB, preds []string) {
// Write all the types as all groups should have access to all the types.
for _, typ := range s.types {
k := x.TypeKey(typ.TypeName)
v, err := typ.Marshal()
v, err := proto.Marshal(typ)
x.Check(err)
x.Check(w.SetAt(k, v, posting.BitSchemaPosting, 1))
}
Expand Down
13 changes: 7 additions & 6 deletions dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/dustin/go-humanize"
"github.com/spf13/cobra"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/badger/v4"
bpb "github.com/dgraph-io/badger/v4/pb"
Expand Down Expand Up @@ -301,7 +302,7 @@ func showAllPostingsAt(db *badger.DB, readTs uint64) {
val, err := item.ValueCopy(nil)
x.Check(err)
var plist pb.PostingList
x.Check(plist.Unmarshal(val))
x.Check(proto.Unmarshal(val, &plist))

x.AssertTrue(len(plist.Postings) <= 1)
var num int
Expand Down Expand Up @@ -408,21 +409,21 @@ func history(lookup []byte, itr *badger.Iterator) {
fmt.Fprintln(&buf)
if meta&posting.BitDeltaPosting > 0 {
plist := &pb.PostingList{}
x.Check(plist.Unmarshal(val))
x.Check(proto.Unmarshal(val, plist))
for _, p := range plist.Postings {
appendPosting(&buf, p)
}
}
if meta&posting.BitCompletePosting > 0 {
var plist pb.PostingList
x.Check(plist.Unmarshal(val))
x.Check(proto.Unmarshal(val, &plist))

for _, p := range plist.Postings {
appendPosting(&buf, p)
}

fmt.Fprintf(&buf, " Num uids = %d. Size = %d\n",
codec.ExactLen(plist.Pack), plist.Pack.Size())
codec.ExactLen(plist.Pack), proto.Size(plist.Pack))
dec := codec.Decoder{Pack: plist.Pack}
for uids := dec.Seek(0, codec.SeekStart); len(uids) > 0; uids = dec.Next() {
for _, uid := range uids {
Expand Down Expand Up @@ -525,7 +526,7 @@ func lookup(db *badger.DB) {
x.Check(err)

var s pb.SchemaUpdate
x.Check(s.Unmarshal(schemaBytes))
x.Check(proto.Unmarshal(schemaBytes, &s))
fmt.Fprintf(&buf, "Value: %+v\n", s)
} else {
fmt.Fprintf(&buf, "Key: %x", item.Key())
Expand Down Expand Up @@ -671,7 +672,7 @@ func printKeys(db *badger.DB) {
var count int
err := buf.SliceIterate(func(s []byte) error {
var kv bpb.KV
if err := kv.Unmarshal(s); err != nil {
if err := proto.Unmarshal(s, &kv); err != nil {
return err
}
x.Check2(w.Write(kv.Value))
Expand Down
13 changes: 7 additions & 6 deletions dgraph/cmd/debug/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
humanize "github.com/dustin/go-humanize"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/dgraph/v24/protos/pb"
"github.com/dgraph-io/dgraph/v24/raftwal"
Expand Down Expand Up @@ -53,13 +54,13 @@ func printEntry(es raftpb.Entry, pending map[uint64]bool, isZero bool) {
var err error
if isZero {
var zpr pb.ZeroProposal
if err = zpr.Unmarshal(es.Data[8:]); err == nil {
if err = proto.Unmarshal(es.Data[8:], &zpr); err == nil {
printZeroProposal(&buf, &zpr)
return
}
} else {
var pr pb.Proposal
if err = pr.Unmarshal(es.Data[8:]); err == nil {
if err = proto.Unmarshal(es.Data[8:], &pr); err == nil {
printAlphaProposal(&buf, &pr, pending)
return
}
Expand All @@ -82,9 +83,9 @@ func printBasic(store RaftStore) (uint64, uint64) {
fmt.Printf("Snapshot Metadata: %+v\n", snap.Metadata)
var ds pb.Snapshot
var zs pb.ZeroSnapshot
if err := ds.Unmarshal(snap.Data); err == nil {
if err := proto.Unmarshal(snap.Data, &ds); err == nil {
fmt.Printf("Snapshot Alpha: %+v\n", ds)
} else if err := zs.Unmarshal(snap.Data); err == nil {
} else if err := proto.Unmarshal(snap.Data, &zs); err == nil {
for gid, group := range zs.State.GetGroups() {
fmt.Printf("\nGROUP: %d\n", gid)
for _, member := range group.GetMembers() {
Expand Down Expand Up @@ -151,7 +152,7 @@ func overwriteSnapshot(store *raftwal.DiskStorage) error {

var dsnap pb.Snapshot
if len(snap.Data) > 0 {
x.Check(dsnap.Unmarshal(snap.Data))
x.Check(proto.Unmarshal(snap.Data, &dsnap))
}
fmt.Printf("Previous snapshot: %+v\n", dsnap)

Expand Down Expand Up @@ -190,7 +191,7 @@ func overwriteSnapshot(store *raftwal.DiskStorage) error {
dsnap.ReadTs = uint64(readTs)

fmt.Printf("Setting snapshot to: %+v\n", dsnap)
data, err := dsnap.Marshal()
data, err := proto.Marshal(&dsnap)
x.Check(err)
if err = store.CreateSnapshot(dsnap.Index, &cs, data); err != nil {
fmt.Printf("Created snapshot with error: %v\n", err)
Expand Down
Loading

0 comments on commit 42b8fa8

Please sign in to comment.