Skip to content

Commit

Permalink
Added pool for grpc connections
Browse files Browse the repository at this point in the history
  • Loading branch information
labkode committed Dec 29, 2015
1 parent a06ce42 commit db72728
Show file tree
Hide file tree
Showing 137 changed files with 3,053 additions and 17,592 deletions.
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ ENV CLAWIO_LOCALFS_META_DATADIR /tmp/localfs
ENV CLAWIO_LOCALFS_META_TMPDIR /tmp/localfs
ENV CLAWIO_LOCALFS_META_PORT 57001
ENV CLAWIO_LOCALFS_META_PROP "service-localfs-prop:57003"
ENV CLAWIO_LOCALFS_META_PROPMAXACTIVE 1024
ENV CLAWIO_LOCALFS_META_PROPMAXIDLE 1024
ENV CLAWIO_LOCALFS_META_PROPMAXCONCURRENCY 1024
ENV CLAWIO_SHAREDSECRET secret

ADD . /go/src/github.com/clawio/service-localfs-meta
Expand Down
27 changes: 15 additions & 12 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions environ
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ export CLAWIO_LOCALFS_META_DATADIR=/tmp/localfs
export CLAWIO_LOCALFS_META_TMPDIR=/tmp/localfs
export CLAWIO_LOCALFS_META_PORT=57001
export CLAWIO_LOCALFS_META_PROP="service-localfs-prop:57003"
export CLAWIO_LOCALFS_META_PROPMAXACTIVE=1024
export CLAWIO_LOCALFS_META_PROPMAXIDLE=1024
export CLAWIO_LOCALFS_META_PROPMAXCONCURRENCY=1024
export CLAWIO_SHAREDSECRET=secret
54 changes: 43 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,26 @@ import (
)

const (
serviceID = "CLAWIO_LOCALFS_META"
dataDirEnvar = serviceID + "_DATADIR"
tmpDirEnvar = serviceID + "_TMPDIR"
portEnvar = serviceID + "_PORT"
propEnvar = serviceID + "_PROP"
sharedSecretEnvar = "CLAWIO_SHAREDSECRET"
serviceID = "CLAWIO_LOCALFS_META"
dataDirEnvar = serviceID + "_DATADIR"
tmpDirEnvar = serviceID + "_TMPDIR"
portEnvar = serviceID + "_PORT"
propEnvar = serviceID + "_PROP"
propMaxActiveEnvar = serviceID + "_PROPMAXACTIVE"
propMaxIdleEnvar = serviceID + "_PROPMAXIDLE"
propMaxConcurrencyEnvar = serviceID + "_PROPMAXCONCURRENCY"
sharedSecretEnvar = "CLAWIO_SHAREDSECRET"
)

type environ struct {
dataDir string
tmpDir string
port int
prop string
sharedSecret string
dataDir string
tmpDir string
port int
prop string
propMaxActive int
propMaxIdle int
propMaxConcurrency int
sharedSecret string
}

func getEnviron() (*environ, error) {
Expand All @@ -37,7 +43,27 @@ func getEnviron() (*environ, error) {
return nil, err
}
e.port = port

e.prop = os.Getenv(propEnvar)

propMaxActive, err := strconv.Atoi(os.Getenv(propMaxActiveEnvar))
if err != nil {
return nil, err
}
e.propMaxActive = propMaxActive

propMaxIdle, err := strconv.Atoi(os.Getenv(propMaxIdleEnvar))
if err != nil {
return nil, err
}
e.propMaxIdle = propMaxIdle

propMaxConcurrency, err := strconv.Atoi(os.Getenv(propMaxConcurrencyEnvar))
if err != nil {
return nil, err
}
e.propMaxConcurrency = propMaxConcurrency

e.sharedSecret = os.Getenv(sharedSecretEnvar)
return e, nil
}
Expand All @@ -46,6 +72,9 @@ func printEnviron(e *environ) {
log.Infof("%s=%s", tmpDirEnvar, e.tmpDir)
log.Infof("%s=%d", portEnvar, e.port)
log.Infof("%s=%s", propEnvar, e.prop)
log.Infof("%s=%d", propMaxActiveEnvar, e.propMaxActive)
log.Infof("%s=%d", propMaxIdleEnvar, e.propMaxIdle)
log.Infof("%s=%d", propMaxConcurrencyEnvar, e.propMaxConcurrency)
log.Infof("%s=%s", sharedSecretEnvar, "******")
}

Expand All @@ -66,6 +95,9 @@ func main() {
p.tmpDir = env.tmpDir
p.prop = env.prop
p.sharedSecret = env.sharedSecret
p.propMaxActive = env.propMaxActive
p.propMaxIdle = env.propMaxIdle
p.propMaxConcurrency = env.propMaxConcurrency

// Create data and tmp dirs
if err := os.MkdirAll(p.dataDir, 0644); err != nil {
Expand Down
122 changes: 100 additions & 22 deletions server.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package main

import (
"fmt"
authlib "github.com/clawio/service-auth/lib"
pb "github.com/clawio/service-localfs-meta/proto/metadata"
proppb "github.com/clawio/service-localfs-meta/proto/propagator"
"github.com/dropbox/godropbox/resource_pool"
rus "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
Expand All @@ -25,18 +27,53 @@ var (
)

type newServerParams struct {
dataDir string
tmpDir string
prop string
sharedSecret string
dataDir string
tmpDir string
prop string
propMaxActive int
propMaxIdle int
propMaxConcurrency int
sharedSecret string
}

func newServer(p *newServerParams) *server {
return &server{p}
poolOptions := resource_pool.Options{}
poolOptions.MaxActiveHandles = int32(p.propMaxActive)
poolOptions.MaxIdleHandles = uint32(p.propMaxIdle)
poolOptions.OpenMaxConcurrency = p.propMaxConcurrency
poolOptions.Open = func(resourceLocation string) (interface{}, error) {
con, err := grpc.Dial(resourceLocation, grpc.WithInsecure())
if err != nil {
rus.Error(err)
return nil, err
}
return con, nil
}
poolOptions.Close = func(handle interface{}) error {
con, ok := handle.(*grpc.ClientConn)
if !ok {
err := fmt.Errorf("connection handle is %+v but expected %+v", handle, "*grpc.ClientConn")
rus.Error(err)
return err
}
err := con.Close()
if err != nil {
rus.Error(err)
return err
}
return nil
}
pool := resource_pool.NewSimpleResourcePool(poolOptions)
pool.Register(p.prop)
s := &server{}
s.p = p
s.grpcPool = pool
return s
}

type server struct {
p *newServerParams
p *newServerParams
grpcPool resource_pool.ResourcePool
}

func (s *server) Home(ctx context.Context, req *pb.HomeReq) (*pb.Void, error) {
Expand Down Expand Up @@ -79,13 +116,20 @@ func (s *server) Home(ctx context.Context, req *pb.HomeReq) (*pb.Void, error) {

log.Infof("user physical home is %s", pp)

con, err := grpc.Dial(s.p.prop, grpc.WithInsecure())
resource, err := s.grpcPool.Get("")
if err != nil {
log.Error(err)
return &pb.Void{}, err
}
defer con.Close()

defer resource.Release()

handle, err := resource.Handle()
if err != nil {
log.Error(err)
return &pb.Void{}, err
}
con := handle.(*grpc.ClientConn)
log.Infof("created connection to %s", s.p.prop)

client := proppb.NewPropClient(con)
Expand Down Expand Up @@ -197,14 +241,21 @@ func (s *server) Mkdir(ctx context.Context, req *pb.MkdirReq) (*pb.Void, error)

log.Infof("created dir %s", pp)

con, err := grpc.Dial(s.p.prop, grpc.WithInsecure())
resource, err := s.grpcPool.Get("")
if err != nil {
log.Error(err)
return &pb.Void{}, err
}
defer con.Close()

log.Infof("created connection to prop")
defer resource.Release()

handle, err := resource.Handle()
if err != nil {
log.Error(err)
return &pb.Void{}, err
}
con := handle.(*grpc.ClientConn)
log.Infof("created connection to %s", s.p.prop)

client := proppb.NewPropClient(con)

Expand Down Expand Up @@ -298,14 +349,21 @@ func (s *server) Stat(ctx context.Context, req *pb.StatReq) (*pb.Metadata, error

log.Infof("stated parent %s", pp)

con, err := grpc.Dial(s.p.prop, grpc.WithInsecure())
resource, err := s.grpcPool.Get("")
if err != nil {
log.Error(err)
return &pb.Metadata{}, err
}
defer con.Close()

log.Infof("created connection to prop")
defer resource.Release()

handle, err := resource.Handle()
if err != nil {
log.Error(err)
return &pb.Metadata{}, err
}
con := handle.(*grpc.ClientConn)
log.Infof("created connection to %s", s.p.prop)

client := proppb.NewPropClient(con)

Expand Down Expand Up @@ -469,13 +527,20 @@ func (s *server) Cp(ctx context.Context, req *pb.CpReq) (*pb.Void, error) {
log.Infof("copied from file %s to file %s", psrc, pdst)
}

con, err := grpc.Dial(s.p.prop, grpc.WithInsecure())
resource, err := s.grpcPool.Get("")
if err != nil {
log.Error(err)
return &pb.Void{}, err
}
defer con.Close()

defer resource.Release()

handle, err := resource.Handle()
if err != nil {
log.Error(err)
return &pb.Void{}, err
}
con := handle.(*grpc.ClientConn)
log.Infof("created connection to %s", s.p.prop)

client := proppb.NewPropClient(con)
Expand Down Expand Up @@ -561,14 +626,21 @@ func (s *server) Mv(ctx context.Context, req *pb.MvReq) (*pb.Void, error) {

log.Infof("renamed from %s to %s", psrc, pdst)

con, err := grpc.Dial(s.p.prop, grpc.WithInsecure())
resource, err := s.grpcPool.Get("")
if err != nil {
log.Error(err)
return &pb.Void{}, err
}
defer con.Close()

log.Infof("created connection to prop")
defer resource.Release()

handle, err := resource.Handle()
if err != nil {
log.Error(err)
return &pb.Void{}, err
}
con := handle.(*grpc.ClientConn)
log.Infof("created connection to %s", s.p.prop)

client := proppb.NewPropClient(con)

Expand Down Expand Up @@ -645,15 +717,21 @@ func (s *server) Rm(ctx context.Context, req *pb.RmReq) (*pb.Void, error) {

log.Infof("removed %s", pp)

con, err := grpc.Dial(s.p.prop, grpc.WithInsecure())
resource, err := s.grpcPool.Get("")
if err != nil {
log.Error(err)
return &pb.Void{}, err
}
defer con.Close()

log.Infof("created connection to prop")
defer resource.Release()

handle, err := resource.Handle()
if err != nil {
log.Error(err)
return &pb.Void{}, err
}
con := handle.(*grpc.ClientConn)
log.Infof("created connection to %s", s.p.prop)
client := proppb.NewPropClient(con)

in := &proppb.RmReq{}
Expand Down
Binary file removed service-localfs-meta
Binary file not shown.
Loading

0 comments on commit db72728

Please sign in to comment.