Skip to content

Commit

Permalink
Merge pull request #859 from go-kivik/serverNext
Browse files Browse the repository at this point in the history
More server endpoints
  • Loading branch information
flimzy authored Dec 30, 2023
2 parents 8e596d7 + 92c90db commit 00509ca
Show file tree
Hide file tree
Showing 8 changed files with 651 additions and 215 deletions.
6 changes: 5 additions & 1 deletion updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ func (f *DBUpdates) Seq() string {
return f.curVal.(*driver.DBUpdate).Seq
}

// DBUpdates begins polling for database updates.
// DBUpdates begins polling for database updates. Canceling the context will
// close the iterator. The iterator will also close automatically if there are
// no more updates, when an error occurs, or when the [DBUpdates.Close] method
// is called. The [DBUpdates.Err] method should be consulted to determine if
// there was an error during iteration.
func (c *Client) DBUpdates(ctx context.Context, options ...Option) *DBUpdates {
updater, ok := c.driverClient.(driver.DBUpdater)
if !ok {
Expand Down
47 changes: 40 additions & 7 deletions x/server/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,54 @@ import (
"encoding/json"
"mime"
"net/http"

"github.com/go-kivik/kivik/v4/internal"
)

// bind binds the request to v if it is of type application/json or
// application/x-www-form-urlencoded.
func (s *Server) bind(r *http.Request, v interface{}) error {
defer r.Body.Close()
switch r.Method {
case http.MethodPatch, http.MethodPost, http.MethodPut:
// continue
default:
// simple query parsing
return s.bindForm(r, v)
}
ct, _, _ := mime.ParseMediaType(r.Header.Get("Content-Type"))
switch ct {
case "application/json":
defer r.Body.Close()
return json.NewDecoder(r.Body).Decode(v)
case "application/x-www-form-urlencoded":
defer r.Body.Close()
if err := r.ParseForm(); err != nil {
return err
if err := json.NewDecoder(r.Body).Decode(v); err != nil {
return &internal.Error{Status: http.StatusBadRequest, Err: err}
}
return s.formDecoder.Decode(r.Form, v)
return nil
case "application/x-www-form-urlencoded":
return s.bindForm(r, v)
default:
return &couchError{status: http.StatusUnsupportedMediaType, Err: "bad_content_type", Reason: "Content-Type must be 'application/x-www-form-urlencoded' or 'application/json'"}
}
}

func (s *Server) bindForm(r *http.Request, v interface{}) error {
defer r.Body.Close()
if err := r.ParseForm(); err != nil {
return &internal.Error{Status: http.StatusBadRequest, Err: err}
}
if err := s.formDecoder.Decode(r.Form, v); err != nil {
return &internal.Error{Status: http.StatusBadRequest, Err: err}
}
return nil
}

// bindJSON works like bind, but for endpoints that require application/json.
func (s *Server) bindJSON(r *http.Request, v interface{}) error {
defer r.Body.Close()
ct, _, _ := mime.ParseMediaType(r.Header.Get("Content-Type"))
switch ct {
case "application/json":
return json.NewDecoder(r.Body).Decode(v)
default:
return &couchError{status: http.StatusUnsupportedMediaType, Err: "bad_content_type", Reason: "Content-Type must be 'application/json'"}
}
}
51 changes: 51 additions & 0 deletions x/server/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

//go:build !js
// +build !js

package server

import (
"net/http"

"gitlab.com/flimzy/httpe"
)

func (s *Server) clusterStatus() httpe.HandlerWithError {
return httpe.HandlerWithErrorFunc(func(w http.ResponseWriter, r *http.Request) error {
status, err := s.client.ClusterStatus(r.Context(), options(r))
if err != nil {
return err
}
return serveJSON(w, http.StatusOK, map[string]string{
"state": status,
})
})
}

func (s *Server) clusterSetup() httpe.HandlerWithError {
return httpe.HandlerWithErrorFunc(func(w http.ResponseWriter, r *http.Request) error {
var req struct {
Action string `json:"action"`
}
if err := s.bindJSON(r, &req); err != nil {
return err
}
if err := s.client.ClusterSetup(r.Context(), req.Action); err != nil {
return err
}
return serveJSON(w, http.StatusOK, map[string]bool{
"ok": true,
})
})
}
100 changes: 100 additions & 0 deletions x/server/cluster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

//go:build !js
// +build !js

package server

import (
"net/http"
"strings"
"testing"

"github.com/go-kivik/kivik/v4"
"github.com/go-kivik/kivik/v4/mockdb"
)

func Test_clusterStatus(t *testing.T) {
tests := serverTests{
{
name: "cluster status, unauthorized",
method: http.MethodGet,
path: "/_cluster_setup",
wantStatus: http.StatusUnauthorized,
wantJSON: map[string]interface{}{
"error": "unauthorized",
"reason": "User not authenticated",
},
},
{
name: "cluster status, success",
client: func() *kivik.Client {
client, mock, err := mockdb.New()
if err != nil {
t.Fatal(err)
}
mock.ExpectClusterStatus().
WillReturn("chicken")
return client
}(),
method: http.MethodGet,
path: "/_cluster_setup",
authUser: userAdmin,
wantStatus: http.StatusOK,
wantJSON: map[string]string{
"state": "chicken",
},
},
}

tests.Run(t)
}

func TestClusterSetup(t *testing.T) {
tests := serverTests{
{
name: "cluster status, unauthorized",
method: http.MethodPost,
path: "/_cluster_setup",
wantStatus: http.StatusUnauthorized,
wantJSON: map[string]string{
"error": "unauthorized",
"reason": "User not authenticated",
},
},
{
name: "cluster status, success",
client: func() *kivik.Client {
client, mock, err := mockdb.New()
if err != nil {
t.Fatal(err)
}
mock.ExpectClusterSetup().
WithAction("chicken").
WillReturnError(nil)
return client
}(),
method: http.MethodPost,
authUser: userAdmin,
path: "/_cluster_setup",
headers: map[string]string{"Content-Type": "application/json"},
body: strings.NewReader(`{"action":"chicken"}`),
wantStatus: http.StatusOK,
wantJSON: map[string]bool{
"ok": true,
},
},
}

tests.Run(t)
}
95 changes: 95 additions & 0 deletions x/server/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
package server

import (
"encoding/json"
"net/http"
"strconv"
"time"

"github.com/go-chi/chi/v5"
"gitlab.com/flimzy/httpe"

"github.com/go-kivik/kivik/v4/driver"
)

func (s *Server) db() httpe.HandlerWithError {
Expand Down Expand Up @@ -72,3 +77,93 @@ func (s *Server) deleteDB() httpe.HandlerWithError {
})
})
}

const defaultHeartbeat = heartbeat(60 * time.Second)

type heartbeat time.Duration

func (h *heartbeat) UnmarshalText(text []byte) error {
var value heartbeat
if string(text) == "true" {
value = defaultHeartbeat
} else {
ms, err := strconv.Atoi(string(text))
if err != nil {
return err
}
value = heartbeat(ms) * heartbeat(time.Millisecond)
}
*h = value
return nil
}

func (s *Server) dbUpdates() httpe.HandlerWithError {
return httpe.HandlerWithErrorFunc(func(w http.ResponseWriter, r *http.Request) error {
req := struct {
Heartbeat heartbeat `form:"heartbeat"`
}{
Heartbeat: defaultHeartbeat,
}
if err := s.bind(r, &req); err != nil {
return err
}
ticker := time.NewTicker(time.Duration(req.Heartbeat))
updates := s.client.DBUpdates(r.Context(), options(r))

if err := updates.Err(); err != nil {
return err
}

defer updates.Close()

w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)

if _, err := w.Write([]byte(`{"results":[`)); err != nil {
return err
}

nextUpdate := make(chan *driver.DBUpdate)
go func() {
for updates.Next() {
nextUpdate <- &driver.DBUpdate{
DBName: updates.DBName(),
Type: updates.Type(),
Seq: updates.Seq(),
}
}
close(nextUpdate)
}()

var lastSeq string
loop:
for {
select {
case <-ticker.C:
if _, err := w.Write([]byte("\n")); err != nil {
return err
}
case update, ok := <-nextUpdate:
if !ok {
break loop
}
ticker.Reset(time.Duration(req.Heartbeat))
if lastSeq != "" {
if _, err := w.Write([]byte(",")); err != nil {
return err
}
}
lastSeq = update.Seq
if err := json.NewEncoder(w).Encode(update); err != nil {
return err
}
}
}

if _, err := w.Write([]byte(`],"last_seq":"` + lastSeq + "\"}")); err != nil {
return err
}

return updates.Err()
})
}
Loading

0 comments on commit 00509ca

Please sign in to comment.