diff --git a/src/api/http/api.go b/src/api/http/api.go index 59756020bbe..74b828f8824 100644 --- a/src/api/http/api.go +++ b/src/api/http/api.go @@ -130,6 +130,9 @@ func (self *HttpServer) Serve(listener net.Listener) { // healthcheck self.registerEndpoint(p, "get", "/ping", self.ping) + // force a raft log compaction + self.registerEndpoint(p, "post", "/raft/force_compaction", self.forceRaftCompaction) + // fetch current list of available interfaces self.registerEndpoint(p, "get", "/interfaces", self.listInterfaces) @@ -267,6 +270,13 @@ func TimePrecisionFromString(s string) (TimePrecision, error) { return 0, fmt.Errorf("Unknown time precision %s", s) } +func (self *HttpServer) forceRaftCompaction(w libhttp.ResponseWriter, r *libhttp.Request) { + self.tryAsClusterAdmin(w, r, func(user common.User) (int, interface{}) { + self.coordinator.ForceCompaction(user) + return libhttp.StatusOK, "OK" + }) +} + func (self *HttpServer) sendCrossOriginHeader(w libhttp.ResponseWriter, r *libhttp.Request) { w.WriteHeader(libhttp.StatusOK) } diff --git a/src/coordinator/cluster_configuration.go b/src/coordinator/cluster_configuration.go index 1307760ca19..fb8092762d2 100644 --- a/src/coordinator/cluster_configuration.go +++ b/src/coordinator/cluster_configuration.go @@ -40,6 +40,7 @@ type ClusterConfiguration struct { ClusterVersion uint32 config *configuration.Configuration addedLocalServerWait chan bool + addedLocalServer bool } type ContinuousQuery struct { @@ -307,6 +308,7 @@ func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) { log.Info("Added the local server") self.localServerId = server.Id self.addedLocalServerWait <- true + self.addedLocalServer = true } } @@ -569,9 +571,23 @@ func (self *ClusterConfiguration) Recovery(b []byte) error { server.Connect() } } + self.hasRunningServers = data.HasRunningServers self.localServerId = data.LocalServerId self.ClusterVersion = data.ClusterVersion + if self.addedLocalServer { + return nil + } + + for _, server := range self.servers { + if server.ProtobufConnectionString != self.config.ProtobufConnectionString() { + continue + } + self.addedLocalServerWait <- true + self.addedLocalServer = true + break + } + return nil } diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 576fdb435c0..a874f3dfe67 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -78,6 +78,14 @@ func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsens return coordinator } +func (self *CoordinatorImpl) ForceCompaction(user common.User) error { + if !user.IsClusterAdmin() { + return fmt.Errorf("Insufficient permission to force a log compaction") + } + + return self.raftServer.ForceLogCompaction() +} + // Distributes the query across the cluster and combines the results. Yields as they come in ensuring proper order. // TODO: make this work even if there is a downed server in the cluster func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query *parser.SelectQuery, localOnly bool, yield func(*protocol.Series) error) error { diff --git a/src/coordinator/coordinator_test.go b/src/coordinator/coordinator_test.go index eac2efd3513..59e7047fe80 100644 --- a/src/coordinator/coordinator_test.go +++ b/src/coordinator/coordinator_test.go @@ -176,7 +176,7 @@ func (self *CoordinatorSuite) TestCanSnapshot(c *C) { } size, err := GetFileSize(server.raftServer.LogPath()) c.Assert(err, IsNil) - server.forceLogCompaction() + server.ForceLogCompaction() newSize, err := GetFileSize(server.raftServer.LogPath()) c.Assert(err, IsNil) c.Assert(newSize < size, Equals, true) diff --git a/src/coordinator/interface.go b/src/coordinator/interface.go index 40fa4be9c02..14f2c301f7f 100644 --- a/src/coordinator/interface.go +++ b/src/coordinator/interface.go @@ -22,6 +22,7 @@ type Coordinator interface { DropDatabase(user common.User, db string) error DropSeries(user common.User, db, series string) error CreateDatabase(user common.User, db string, replicationFactor uint8) error + ForceCompaction(user common.User) error ListDatabases(user common.User) ([]*Database, error) ListSeries(user common.User, database string) ([]*protocol.Series, error) ReplicateWrite(request *protocol.Request) error @@ -91,6 +92,8 @@ type ClusterConsensus interface { // When a cluster is turned on for the first time. CreateRootUser() error + + ForceLogCompaction() error } type RequestHandler interface { diff --git a/src/coordinator/raft_server.go b/src/coordinator/raft_server.go index 5e473b86106..5f73fc97034 100644 --- a/src/coordinator/raft_server.go +++ b/src/coordinator/raft_server.go @@ -282,11 +282,13 @@ const ( MAX_SIZE = 10 * MEGABYTE ) -func (s *RaftServer) forceLogCompaction() { +func (s *RaftServer) ForceLogCompaction() error { err := s.raftServer.TakeSnapshot() if err != nil { log.Error("Cannot take snapshot: %s", err) + return err } + return nil } func (s *RaftServer) CompactLog() { @@ -306,9 +308,9 @@ func (s *RaftServer) CompactLog() { if size < MAX_SIZE { continue } - s.forceLogCompaction() + s.ForceLogCompaction() case <-forceCompactionTicker: - s.forceLogCompaction() + s.ForceLogCompaction() } } } diff --git a/src/integration/server_test.go b/src/integration/server_test.go index b0ab86eb8bb..223ef64a3f3 100644 --- a/src/integration/server_test.go +++ b/src/integration/server_test.go @@ -200,6 +200,33 @@ func (self *ServerSuite) TearDownSuite(c *C) { } } +func (self *ServerSuite) TestRestartAfterCompaction(c *C) { + data := ` + [{ + "points": [[1]], + "name": "test_restart_after_compaction", + "columns": ["val"] + }] + ` + self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c) + + collection := self.serverProcesses[0].Query("test_rep", "select * from test_restart_after_compaction", false, c) + c.Assert(collection.Members, HasLen, 1) + series := collection.GetSeries("test_restart_after_compaction", c) + c.Assert(series.Points, HasLen, 1) + resp := self.serverProcesses[0].Post("/raft/force_compaction?u=root&p=root", "", c) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + self.serverProcesses[0].Stop() + time.Sleep(time.Second) + self.serverProcesses[0].Start() + time.Sleep(time.Second * 3) + + collection = self.serverProcesses[0].Query("test_rep", "select * from test_restart_after_compaction", false, c) + c.Assert(collection.Members, HasLen, 1) + series = collection.GetSeries("test_restart_after_compaction", c) + c.Assert(series.Points, HasLen, 1) +} + // For issue #140 https://github.com/influxdb/influxdb/issues/140 func (self *ServerSuite) TestRestartServers(c *C) { data := ` diff --git a/src/integration/test_config1.toml b/src/integration/test_config1.toml index 2023b388ed0..86e3a2245a2 100644 --- a/src/integration/test_config1.toml +++ b/src/integration/test_config1.toml @@ -7,7 +7,7 @@ [logging] # logging level can be one of "debug", "info", "warn" or "error" -# level = "info" +level = "info" file = "/tmp/influxdb/test/1/influxdb.log" # Configure the admin server diff --git a/src/integration/test_config2.toml b/src/integration/test_config2.toml index 0ae20298ad5..0d3fd76c0c7 100644 --- a/src/integration/test_config2.toml +++ b/src/integration/test_config2.toml @@ -7,7 +7,7 @@ [logging] # logging level can be one of "debug", "info", "warn" or "error" -# level = "info" +level = "info" file = "/tmp/influxdb/test/2/influxdb.log" # Configure the admin server diff --git a/src/integration/test_config3.toml b/src/integration/test_config3.toml index 8ebca7ee1a3..0cabdaf6e1f 100644 --- a/src/integration/test_config3.toml +++ b/src/integration/test_config3.toml @@ -7,7 +7,7 @@ [logging] # logging level can be one of "debug", "info", "warn" or "error" -# level = "info" +level = "info" file = "/tmp/influxdb/test/3/influxdb.log" # Configure the admin server