Skip to content

Commit

Permalink
go fmt + some minor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
cpg committed May 24, 2014
1 parent 7922a53 commit 32e6867
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 95 deletions.
88 changes: 44 additions & 44 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,52 +206,52 @@ func (s *Session) frameReceiver(done chan<- bool, incoming chan<- frame) {
}

func (s *Session) processControlFrame(frame controlFrame) (err error) {

switch frame.kind {
case FRAME_SYN_STREAM:
err = s.processSynStream(frame)
select{
case ns, ok := <-s.new_stream:
// registering a new stream for this session
if ok {
s.streams[ns.id] = ns
} else {
return
}
}
controlflag := 0
// for non-FIN frames wait for data frames
if(!frame.isFIN()){
for controlflag == 0 {
deadline := time.After(3 * time.Second)
select {
case f := <-s.in :
// received a frame
switch fr := f.(type) {
case dataFrame:
//process data frames
err = s.processDataFrame(fr)
if fr.isFIN() {
controlflag=1
}
break
case controlFrame:
err = s.processControlFrame(fr)
}
if err != nil {
return
}
break
case <-deadline:
//unsuccessfully waited for FIN
debug.Println("Waited long enough but no data frames recieved")
controlflag=2
break
}
}
}
return
err = s.processSynStream(frame)
select {
case ns, ok := <-s.new_stream:
// registering a new stream for this session
if ok {
s.streams[ns.id] = ns
} else {
return
}
}
controlflag := 0
// for non-FIN frames wait for data frames
if !frame.isFIN() {
for controlflag == 0 {
deadline := time.After(3 * time.Second)
select {
case f := <-s.in:
// received a frame
switch fr := f.(type) {
case dataFrame:
//process data frames
err = s.processDataFrame(fr)
if fr.isFIN() {
controlflag = 1
}
break
case controlFrame:
err = s.processControlFrame(fr)

}
if err != nil {
return
}
break
case <-deadline:
//unsuccessfully waited for FIN
debug.Println("Waited long enough but no data frames recieved")
controlflag = 2
break
}
}
}
return
case FRAME_SYN_REPLY:
return s.processSynReply(frame)
case FRAME_SETTINGS:
Expand Down
96 changes: 45 additions & 51 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (s *Stream) initiate_stream(frame controlFrame) (err error) {
debug.Println("Processing SYN_STREAM", ss)
if frame.isFIN() {
// call the handler

req := &http.Request{
Method: headers.Get(HEADER_METHOD),
Proto: headers.Get(HEADER_VERSION),
Expand All @@ -320,59 +320,53 @@ func (s *Stream) initiate_stream(frame controlFrame) (err error) {
go s.requestHandler(req)

} else {
/*
// FIXME this will not work with POST, PUT or anything that has a body!
// FIXME if not FIN, need to wait for the data frames to see if there is a body!
// if so, call the handler then!
panic("need to implement non-FIN streams")
*/
var data []byte

endflag := 0
for endflag==0 {
deadline := time.After(3 * time.Second)
select {
case df,ok := <- s.data :
//collecting data
if !ok {
debug.Println("Error collecting data frames",ok)
return
}
data = append(data, df.data...)
if df.isFIN() {
endflag=1
}
break
case <-deadline:
//unsuccessfully waited for FIN
// no activity in a while. Assume that body is completely recieved. Bail
//panic("Waited long enough but no data frames recieved")
debug.Println("Waited long enough but no data frames recieved")
endflag=2
break
for endflag == 0 {
deadline := time.After(3 * time.Second)
select {
case df, ok := <-s.data:
//collecting data
if !ok {
debug.Println("Error collecting data frames", ok)
return
}
data = append(data, df.data...)
if df.isFIN() {
endflag = 1
}
break

case <-deadline:
//unsuccessfully waited for FIN
// no activity in a while. Assume that body is completely recieved. Bail
//panic("Waited long enough but no data frames recieved")
debug.Println("Waited long enough but no data frames recieved")
endflag = 2
break
}
}
if endflag==1 {
//http request if data frames collected sucessfully
// call the handler
contLen,_ := strconv.Atoi(headers.Get(HEADER_CONTENT_LENGTH))
req := &http.Request{
Method: headers.Get(HEADER_METHOD),
Proto: headers.Get(HEADER_VERSION),
Header: headers,
RemoteAddr: s.session.conn.RemoteAddr().String(),
ContentLength: int64(contLen),
Body: &readCloser{bytes.NewReader(data)},
}
req.URL, _ = url.ParseRequestURI(headers.Get(HEADER_PATH))

// Clear the headers in the session now that the request has them
s.headers = make(http.Header)

go s.requestHandler(req)
if endflag == 1 {
//http request if data frames collected sucessfully
// call the handler
contLen, _ := strconv.Atoi(headers.Get(HEADER_CONTENT_LENGTH))
req := &http.Request{
Method: headers.Get(HEADER_METHOD),
Proto: headers.Get(HEADER_VERSION),
Header: headers,
RemoteAddr: s.session.conn.RemoteAddr().String(),
ContentLength: int64(contLen),
Body: &readCloser{bytes.NewReader(data)},
}
req.URL, _ = url.ParseRequestURI(headers.Get(HEADER_PATH))

// Clear the headers in the session now that the request has them
s.headers = make(http.Header)

go s.requestHandler(req)
}

}

return nil
Expand Down Expand Up @@ -434,8 +428,8 @@ func (s *Stream) stream_loop() (err error) {
}
switch cf.kind {
case FRAME_SYN_STREAM:
err = s.initiate_stream(cf)
debug.Println("Goroutines:", runtime.NumGoroutine())
err = s.initiate_stream(cf)
debug.Println("Goroutines:", runtime.NumGoroutine())
case FRAME_SYN_REPLY:
err = s.handleSynReply(cf)
case FRAME_RST_STREAM:
Expand Down

0 comments on commit 32e6867

Please sign in to comment.