Skip to content

Commit

Permalink
Fix #215. Fix hangs on startup after raft compaction
Browse files Browse the repository at this point in the history
Because raft compaction compact the entire log in one state, commands in
the log aren't replayed on startup. Instead the entire state is loaded
this prevented the AddPotentialServer from being called which in turn
didn't put anything in the channel
  • Loading branch information
jvshahid committed Jan 30, 2014
1 parent 3d0b527 commit 25cb03e
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 4 deletions.
10 changes: 10 additions & 0 deletions src/api/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ func (self *HttpServer) Serve(listener net.Listener) {
// healthcheck
self.registerEndpoint(p, "get", "/ping", self.ping)

// force a raft log compaction
self.registerEndpoint(p, "post", "/raft/force_compaction", self.forceRaftCompaction)

// fetch current list of available interfaces
self.registerEndpoint(p, "get", "/interfaces", self.listInterfaces)

Expand Down Expand Up @@ -267,6 +270,13 @@ func TimePrecisionFromString(s string) (TimePrecision, error) {
return 0, fmt.Errorf("Unknown time precision %s", s)
}

func (self *HttpServer) forceRaftCompaction(w libhttp.ResponseWriter, r *libhttp.Request) {
self.tryAsClusterAdmin(w, r, func(user common.User) (int, interface{}) {
self.coordinator.ForceCompaction(user)
return libhttp.StatusOK, "OK"
})
}

func (self *HttpServer) sendCrossOriginHeader(w libhttp.ResponseWriter, r *libhttp.Request) {
w.WriteHeader(libhttp.StatusOK)
}
Expand Down
16 changes: 16 additions & 0 deletions src/coordinator/cluster_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type ClusterConfiguration struct {
ClusterVersion uint32
config *configuration.Configuration
addedLocalServerWait chan bool
addedLocalServer bool
}

type ContinuousQuery struct {
Expand Down Expand Up @@ -307,6 +308,7 @@ func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) {
log.Info("Added the local server")
self.localServerId = server.Id
self.addedLocalServerWait <- true
self.addedLocalServer = true
}
}

Expand Down Expand Up @@ -569,9 +571,23 @@ func (self *ClusterConfiguration) Recovery(b []byte) error {
server.Connect()
}
}

self.hasRunningServers = data.HasRunningServers
self.localServerId = data.LocalServerId
self.ClusterVersion = data.ClusterVersion

if self.addedLocalServer {
return nil
}

for _, server := range self.servers {
if server.ProtobufConnectionString != self.config.ProtobufConnectionString() {
continue
}
self.addedLocalServerWait <- true
self.addedLocalServer = true
break
}

return nil
}
8 changes: 8 additions & 0 deletions src/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsens
return coordinator
}

func (self *CoordinatorImpl) ForceCompaction(user common.User) error {
if !user.IsClusterAdmin() {
return fmt.Errorf("Insufficient permission to force a log compaction")
}

return self.raftServer.ForceLogCompaction()
}

// Distributes the query across the cluster and combines the results. Yields as they come in ensuring proper order.
// TODO: make this work even if there is a downed server in the cluster
func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query *parser.SelectQuery, localOnly bool, yield func(*protocol.Series) error) error {
Expand Down
2 changes: 1 addition & 1 deletion src/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (self *CoordinatorSuite) TestCanSnapshot(c *C) {
}
size, err := GetFileSize(server.raftServer.LogPath())
c.Assert(err, IsNil)
server.forceLogCompaction()
server.ForceLogCompaction()
newSize, err := GetFileSize(server.raftServer.LogPath())
c.Assert(err, IsNil)
c.Assert(newSize < size, Equals, true)
Expand Down
3 changes: 3 additions & 0 deletions src/coordinator/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Coordinator interface {
DropDatabase(user common.User, db string) error
DropSeries(user common.User, db, series string) error
CreateDatabase(user common.User, db string, replicationFactor uint8) error
ForceCompaction(user common.User) error
ListDatabases(user common.User) ([]*Database, error)
ListSeries(user common.User, database string) ([]*protocol.Series, error)
ReplicateWrite(request *protocol.Request) error
Expand Down Expand Up @@ -91,6 +92,8 @@ type ClusterConsensus interface {

// When a cluster is turned on for the first time.
CreateRootUser() error

ForceLogCompaction() error
}

type RequestHandler interface {
Expand Down
8 changes: 5 additions & 3 deletions src/coordinator/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,13 @@ const (
MAX_SIZE = 10 * MEGABYTE
)

func (s *RaftServer) forceLogCompaction() {
func (s *RaftServer) ForceLogCompaction() error {
err := s.raftServer.TakeSnapshot()
if err != nil {
log.Error("Cannot take snapshot: %s", err)
return err
}
return nil
}

func (s *RaftServer) CompactLog() {
Expand All @@ -306,9 +308,9 @@ func (s *RaftServer) CompactLog() {
if size < MAX_SIZE {
continue
}
s.forceLogCompaction()
s.ForceLogCompaction()
case <-forceCompactionTicker:
s.forceLogCompaction()
s.ForceLogCompaction()
}
}
}
Expand Down
27 changes: 27 additions & 0 deletions src/integration/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,33 @@ func (self *ServerSuite) TearDownSuite(c *C) {
}
}

func (self *ServerSuite) TestRestartAfterCompaction(c *C) {
data := `
[{
"points": [[1]],
"name": "test_restart_after_compaction",
"columns": ["val"]
}]
`
self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c)

collection := self.serverProcesses[0].Query("test_rep", "select * from test_restart_after_compaction", false, c)
c.Assert(collection.Members, HasLen, 1)
series := collection.GetSeries("test_restart_after_compaction", c)
c.Assert(series.Points, HasLen, 1)
resp := self.serverProcesses[0].Post("/raft/force_compaction?u=root&p=root", "", c)
c.Assert(resp.StatusCode, Equals, http.StatusOK)
self.serverProcesses[0].Stop()
time.Sleep(time.Second)
self.serverProcesses[0].Start()
time.Sleep(time.Second * 3)

collection = self.serverProcesses[0].Query("test_rep", "select * from test_restart_after_compaction", false, c)
c.Assert(collection.Members, HasLen, 1)
series = collection.GetSeries("test_restart_after_compaction", c)
c.Assert(series.Points, HasLen, 1)
}

// For issue #140 https://github.com/influxdb/influxdb/issues/140
func (self *ServerSuite) TestRestartServers(c *C) {
data := `
Expand Down

0 comments on commit 25cb03e

Please sign in to comment.