Skip to content

Commit

Permalink
Merge pull request #215 from influxdb/fix-215-server-fails-restart-af…
Browse files Browse the repository at this point in the history
…ter-raft-compaction

Fix error with server restart after Raft log compaction
  • Loading branch information
pauldix committed Jan 30, 2014
2 parents 982873c + 25cb03e commit dd547a7
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 7 deletions.
10 changes: 10 additions & 0 deletions src/api/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ func (self *HttpServer) Serve(listener net.Listener) {
// healthcheck
self.registerEndpoint(p, "get", "/ping", self.ping)

// force a raft log compaction
self.registerEndpoint(p, "post", "/raft/force_compaction", self.forceRaftCompaction)

// fetch current list of available interfaces
self.registerEndpoint(p, "get", "/interfaces", self.listInterfaces)

Expand Down Expand Up @@ -267,6 +270,13 @@ func TimePrecisionFromString(s string) (TimePrecision, error) {
return 0, fmt.Errorf("Unknown time precision %s", s)
}

func (self *HttpServer) forceRaftCompaction(w libhttp.ResponseWriter, r *libhttp.Request) {
self.tryAsClusterAdmin(w, r, func(user common.User) (int, interface{}) {
self.coordinator.ForceCompaction(user)
return libhttp.StatusOK, "OK"
})
}

func (self *HttpServer) sendCrossOriginHeader(w libhttp.ResponseWriter, r *libhttp.Request) {
w.WriteHeader(libhttp.StatusOK)
}
Expand Down
16 changes: 16 additions & 0 deletions src/coordinator/cluster_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type ClusterConfiguration struct {
ClusterVersion uint32
config *configuration.Configuration
addedLocalServerWait chan bool
addedLocalServer bool
}

type ContinuousQuery struct {
Expand Down Expand Up @@ -307,6 +308,7 @@ func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) {
log.Info("Added the local server")
self.localServerId = server.Id
self.addedLocalServerWait <- true
self.addedLocalServer = true
}
}

Expand Down Expand Up @@ -569,9 +571,23 @@ func (self *ClusterConfiguration) Recovery(b []byte) error {
server.Connect()
}
}

self.hasRunningServers = data.HasRunningServers
self.localServerId = data.LocalServerId
self.ClusterVersion = data.ClusterVersion

if self.addedLocalServer {
return nil
}

for _, server := range self.servers {
if server.ProtobufConnectionString != self.config.ProtobufConnectionString() {
continue
}
self.addedLocalServerWait <- true
self.addedLocalServer = true
break
}

return nil
}
8 changes: 8 additions & 0 deletions src/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsens
return coordinator
}

func (self *CoordinatorImpl) ForceCompaction(user common.User) error {
if !user.IsClusterAdmin() {
return fmt.Errorf("Insufficient permission to force a log compaction")
}

return self.raftServer.ForceLogCompaction()
}

// Distributes the query across the cluster and combines the results. Yields as they come in ensuring proper order.
// TODO: make this work even if there is a downed server in the cluster
func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query *parser.SelectQuery, localOnly bool, yield func(*protocol.Series) error) error {
Expand Down
2 changes: 1 addition & 1 deletion src/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (self *CoordinatorSuite) TestCanSnapshot(c *C) {
}
size, err := GetFileSize(server.raftServer.LogPath())
c.Assert(err, IsNil)
server.forceLogCompaction()
server.ForceLogCompaction()
newSize, err := GetFileSize(server.raftServer.LogPath())
c.Assert(err, IsNil)
c.Assert(newSize < size, Equals, true)
Expand Down
3 changes: 3 additions & 0 deletions src/coordinator/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Coordinator interface {
DropDatabase(user common.User, db string) error
DropSeries(user common.User, db, series string) error
CreateDatabase(user common.User, db string, replicationFactor uint8) error
ForceCompaction(user common.User) error
ListDatabases(user common.User) ([]*Database, error)
ListSeries(user common.User, database string) ([]*protocol.Series, error)
ReplicateWrite(request *protocol.Request) error
Expand Down Expand Up @@ -91,6 +92,8 @@ type ClusterConsensus interface {

// When a cluster is turned on for the first time.
CreateRootUser() error

ForceLogCompaction() error
}

type RequestHandler interface {
Expand Down
8 changes: 5 additions & 3 deletions src/coordinator/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,13 @@ const (
MAX_SIZE = 10 * MEGABYTE
)

func (s *RaftServer) forceLogCompaction() {
func (s *RaftServer) ForceLogCompaction() error {
err := s.raftServer.TakeSnapshot()
if err != nil {
log.Error("Cannot take snapshot: %s", err)
return err
}
return nil
}

func (s *RaftServer) CompactLog() {
Expand All @@ -306,9 +308,9 @@ func (s *RaftServer) CompactLog() {
if size < MAX_SIZE {
continue
}
s.forceLogCompaction()
s.ForceLogCompaction()
case <-forceCompactionTicker:
s.forceLogCompaction()
s.ForceLogCompaction()
}
}
}
Expand Down
27 changes: 27 additions & 0 deletions src/integration/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,33 @@ func (self *ServerSuite) TearDownSuite(c *C) {
}
}

func (self *ServerSuite) TestRestartAfterCompaction(c *C) {
data := `
[{
"points": [[1]],
"name": "test_restart_after_compaction",
"columns": ["val"]
}]
`
self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c)

collection := self.serverProcesses[0].Query("test_rep", "select * from test_restart_after_compaction", false, c)
c.Assert(collection.Members, HasLen, 1)
series := collection.GetSeries("test_restart_after_compaction", c)
c.Assert(series.Points, HasLen, 1)
resp := self.serverProcesses[0].Post("/raft/force_compaction?u=root&p=root", "", c)
c.Assert(resp.StatusCode, Equals, http.StatusOK)
self.serverProcesses[0].Stop()
time.Sleep(time.Second)
self.serverProcesses[0].Start()
time.Sleep(time.Second * 3)

collection = self.serverProcesses[0].Query("test_rep", "select * from test_restart_after_compaction", false, c)
c.Assert(collection.Members, HasLen, 1)
series = collection.GetSeries("test_restart_after_compaction", c)
c.Assert(series.Points, HasLen, 1)
}

// For issue #140 https://github.com/influxdb/influxdb/issues/140
func (self *ServerSuite) TestRestartServers(c *C) {
data := `
Expand Down
2 changes: 1 addition & 1 deletion src/integration/test_config1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

[logging]
# logging level can be one of "debug", "info", "warn" or "error"
# level = "info"
level = "info"
file = "/tmp/influxdb/test/1/influxdb.log"

# Configure the admin server
Expand Down
2 changes: 1 addition & 1 deletion src/integration/test_config2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

[logging]
# logging level can be one of "debug", "info", "warn" or "error"
# level = "info"
level = "info"
file = "/tmp/influxdb/test/2/influxdb.log"

# Configure the admin server
Expand Down
2 changes: 1 addition & 1 deletion src/integration/test_config3.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

[logging]
# logging level can be one of "debug", "info", "warn" or "error"
# level = "info"
level = "info"
file = "/tmp/influxdb/test/3/influxdb.log"

# Configure the admin server
Expand Down

0 comments on commit dd547a7

Please sign in to comment.