-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.go
121 lines (99 loc) · 4.58 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package flare
import (
"context"
"fmt"
"net/http"
"connectrpc.com/connect"
"connectrpc.com/grpcreflect"
"github.com/tommyo/flare/proto"
"github.com/tommyo/flare/proto/protoconnect"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
var _ protoconnect.SparkConnectServiceHandler = &Server{}
type Server struct {
conf *Config
sessions *SessionStore
}
// TODO revisit
func (s *Server) Build(sessions *SessionStore) *http.Server {
s.sessions = sessions
mux := http.NewServeMux()
mux.Handle(protoconnect.NewSparkConnectServiceHandler(s))
reflector := grpcreflect.NewStaticReflector(protoconnect.SparkConnectServiceName)
mux.Handle(grpcreflect.NewHandlerV1(reflector))
mux.Handle(grpcreflect.NewHandlerV1Alpha(reflector))
return &http.Server{
Addr: s.conf.String("addr"),
Handler: h2c.NewHandler(mux, &http2.Server{}),
}
}
// AddArtifacts implements protoconnect.SparkConnectServiceHandler.
// This should mimic the behavior of the SparkConnectAddArtifactsHandler from the Scala codebase.
func (s *Server) AddArtifacts(ctx context.Context, stream *connect.ClientStream[proto.AddArtifactsRequest]) (*connect.Response[proto.AddArtifactsResponse], error) {
fmt.Printf("%v, %v\n", ctx, stream)
// stagingDir is a unique temp jetstream.kv path for staging results
// TODO do we need more than 1 per session?
// stagingDir := fmt.Sprintf("staging.%s", stream.Co)
res := &proto.AddArtifactsResponse{}
return connect.NewResponse(res), nil
}
// AnalyzePlan implements protoconnect.SparkConnectServiceHandler.
func (s *Server) AnalyzePlan(ctx context.Context, req *connect.Request[proto.AnalyzePlanRequest]) (*connect.Response[proto.AnalyzePlanResponse], error) {
fmt.Printf("%v, %v\n", ctx, req)
res := &proto.AnalyzePlanResponse{}
return connect.NewResponse(res), nil
}
// ArtifactStatus implements protoconnect.SparkConnectServiceHandler.
func (s *Server) ArtifactStatus(ctx context.Context, req *connect.Request[proto.ArtifactStatusesRequest]) (*connect.Response[proto.ArtifactStatusesResponse], error) {
fmt.Printf("%v\n", req)
res := &proto.ArtifactStatusesResponse{}
return connect.NewResponse(res), nil
}
// Config implements protoconnect.SparkConnectServiceHandler.
func (s *Server) Config(ctx context.Context, req *connect.Request[proto.ConfigRequest]) (*connect.Response[proto.ConfigResponse], error) {
fmt.Printf("%v, %v\n", ctx, req)
res := &proto.ConfigResponse{}
return connect.NewResponse(res), nil
}
// ExecutePlan implements protoconnect.SparkConnectServiceHandler.
func (s *Server) ExecutePlan(ctx context.Context, req *connect.Request[proto.ExecutePlanRequest], stream *connect.ServerStream[proto.ExecutePlanResponse]) error {
fmt.Printf("%v, %v, %v\n", ctx, req, stream)
return nil
}
// FetchErrorDetails implements protoconnect.SparkConnectServiceHandler.
func (s *Server) FetchErrorDetails(ctx context.Context, req *connect.Request[proto.FetchErrorDetailsRequest]) (*connect.Response[proto.FetchErrorDetailsResponse], error) {
fmt.Printf("%v, %v\n", ctx, req)
res := &proto.FetchErrorDetailsResponse{}
return connect.NewResponse(res), nil
}
// Interrupt implements protoconnect.SparkConnectServiceHandler.
func (s *Server) Interrupt(ctx context.Context, req *connect.Request[proto.InterruptRequest]) (*connect.Response[proto.InterruptResponse], error) {
fmt.Printf("%v, %v\n", ctx, req)
res := &proto.InterruptResponse{}
return connect.NewResponse(res), nil
}
// ReattachExecute implements protoconnect.SparkConnectServiceHandler.
func (s *Server) ReattachExecute(ctx context.Context, req *connect.Request[proto.ReattachExecuteRequest], stream *connect.ServerStream[proto.ExecutePlanResponse]) error {
fmt.Printf("%v, %v, %v\n", ctx, req, stream)
return nil
}
// ReleaseExecute implements protoconnect.SparkConnectServiceHandler.
func (s *Server) ReleaseExecute(ctx context.Context, req *connect.Request[proto.ReleaseExecuteRequest]) (*connect.Response[proto.ReleaseExecuteResponse], error) {
fmt.Printf("%v, %v\n", ctx, req)
res := &proto.ReleaseExecuteResponse{}
return connect.NewResponse(res), nil
}
// ReleaseSession implements protoconnect.SparkConnectServiceHandler.
func (s *Server) ReleaseSession(ctx context.Context, req *connect.Request[proto.ReleaseSessionRequest]) (*connect.Response[proto.ReleaseSessionResponse], error) {
fmt.Printf("%v, %v\n", ctx, req)
serverSessionId := s.sessions.CloseSession(req.Msg.GetUserContext().UserId, req.Msg.SessionId)
res := &proto.ReleaseSessionResponse{
SessionId: req.Msg.SessionId,
ServerSideSessionId: serverSessionId,
}
return connect.NewResponse(res), nil
}
func NewServer(conf *Config) *Server {
return &Server{conf: conf}
}