Skip to content

Commit

Permalink
re #149 reuse code from EventSubscriptionAPI
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergei Egorov authored and bsideup committed Sep 14, 2015
1 parent ddfe126 commit 6ba64c6
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 24 deletions.
17 changes: 12 additions & 5 deletions api/event_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package api

import (
"encoding/json"
"github.com/QubitProducts/bamboo/configuration"
eb "github.com/QubitProducts/bamboo/services/event_bus"
"io"
"io/ioutil"
"log"
"net/http"

"github.com/QubitProducts/bamboo/configuration"
eb "github.com/QubitProducts/bamboo/services/event_bus"
)

type EventSubscriptionAPI struct {
Expand All @@ -16,15 +17,21 @@ type EventSubscriptionAPI struct {
}

func (sub *EventSubscriptionAPI) Callback(w http.ResponseWriter, r *http.Request) {
var event eb.MarathonEvent

payload, _ := ioutil.ReadAll(r.Body)

sub.Notify(payload)

io.WriteString(w, "Got it!")
}

func (sub *EventSubscriptionAPI) Notify(payload []byte) {

var event eb.MarathonEvent
err := json.Unmarshal(payload, &event)

if err != nil {
log.Printf("Unable to decode JSON Marathon Event request: %s \n", string(payload))
}

sub.EventBus.Publish(event)
io.WriteString(w, "Got it!")
}
25 changes: 6 additions & 19 deletions main/bamboo/bamboo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"bufio"
"encoding/json"
"flag"
"io"
"io/ioutil"
Expand Down Expand Up @@ -78,11 +77,6 @@ func main() {
// Handle gracefully exit
registerOSSignals()

if conf.Marathon.UseEventStream {
// Listen events stream from Marathon
listenToEventStream(conf, eventBus)
}

// Start server
initServer(&conf, zkConn, eventBus)
}
Expand Down Expand Up @@ -112,7 +106,10 @@ func initServer(conf *configuration.Configuration, conn *zk.Conn, eventBus *even
// Static pages
router.Use(martini.Static(path.Join(executableFolder(), "webapp")))

if !conf.Marathon.UseEventStream {
if conf.Marathon.UseEventStream {
// Listen events stream from Marathon
listenToEventStream(conf, eventSubAPI)
} else {
registerMarathonEvent(conf)
}
router.RunOnAddr(serverBindPort)
Expand Down Expand Up @@ -183,7 +180,7 @@ func listenToZookeeper(conf configuration.Configuration, eventBus *event_bus.Eve
return serviceConn
}

func listenToEventStream(conf configuration.Configuration, eventBus *event_bus.EventBus) {
func listenToEventStream(conf *configuration.Configuration, sub api.EventSubscriptionAPI) {
client := &http.Client{}
client.Timeout = 0 * time.Second

Expand Down Expand Up @@ -230,17 +227,7 @@ func listenToEventStream(conf configuration.Configuration, eventBus *event_bus.E
}

line = line[6:]

var event event_bus.MarathonEvent
err = json.Unmarshal([]byte(line), &event)

if err != nil {
errorMsg := "Unable to decode JSON Marathon Event request: %s\n"
log.Printf(errorMsg, line)
continue
}

eventBus.Publish(event)
sub.Notify([]byte(line))
}

log.Println("Event stream connection was closed. Re-opening...")
Expand Down

0 comments on commit 6ba64c6

Please sign in to comment.