Skip to content

Commit

Permalink
reactor(metric-drain): use caddy access logs
Browse files Browse the repository at this point in the history
Previously we were sending site usage analytics within our web app code.
This worked well for our use case because we could filter, parse, and
send the analytics to our pipe `metric-drain` which would then store the
analytics into our database.

Because we want to enable HTTP caching for pgs we won't always reach our
web app code since usage analytics will terminate at our cache layer.

Instead, we want to record analytics higher in the request stack.  In
this case, we want to record site analytics from Caddy access logs.

Here's how it works:

- `pub` caddy access logs to our pipe `container-drain`
- `auth/web` will `sub` to `container-drain`, filter, deserialize, and
  `pub` to `metric-drain`
- `auth/web` will `sub` to `metric-drain` and store the analytics in our
  database
  • Loading branch information
neurosnap committed Nov 27, 2024
1 parent be23a31 commit 4e0839a
Show file tree
Hide file tree
Showing 15 changed files with 222 additions and 209 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,11 @@ migrate:
$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20240819_add_projects_blocked.sql
$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241028_add_analytics_indexes.sql
$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql
$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql
.PHONY: migrate

latest:
$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql
$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql
.PHONY: latest

psql:
Expand Down
174 changes: 162 additions & 12 deletions auth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import (
"log/slog"
"net/http"
"net/url"
"strings"
"time"

"github.com/gorilla/feeds"
"github.com/picosh/pico/db"
"github.com/picosh/pico/db/postgres"
"github.com/picosh/pico/shared"
"github.com/picosh/utils"
"github.com/picosh/utils/pipe"
"github.com/picosh/utils/pipe/metrics"
)

Expand Down Expand Up @@ -578,6 +580,155 @@ func checkoutHandler() http.HandlerFunc {
}
}

type AccessLogReq struct {
RemoteIP string `json:"remote_ip"`
RemotePort string `json:"remote_port"`
ClientIP string `json:"client_ip"`
Method string `json:"method"`
Host string `json:"host"`
Uri string `json:"uri"`
Headers struct {
UserAgent []string `json:"User-Agent"`
Referer []string `json:"Referer"`
} `json:"headers"`
Tls struct {
ServerName string `json:"server_name"`
} `json:"tls"`
}

type RespHeaders struct {
ContentType []string `json:"Content-Type"`
}

type CaddyAccessLog struct {
Request AccessLogReq `json:"request"`
Status int `json:"status"`
RespHeaders RespHeaders `json:"resp_headers"`
}

func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2)
space := spaceRaw[0]
host := access.Request.Host
path := access.Request.Uri
subdomain := ""

// grab subdomain based on host
if strings.HasSuffix(host, "tuns.sh") {
subdomain = strings.TrimSuffix(host, ".tuns.sh")
} else if strings.HasSuffix(host, "pgs.sh") {
subdomain = strings.TrimSuffix(host, ".pgs.sh")
} else if strings.HasSuffix(host, "prose.sh") {
subdomain = strings.TrimSuffix(host, ".prose.sh")
} else {
subdomain = shared.GetCustomDomain(host, space)
}

// get user and namespace details from subdomain
props, err := shared.GetProjectFromSubdomain(subdomain)
if err != nil {
return nil, err
}
// get user ID
user, err := dbpool.FindUserForName(props.Username)
if err != nil {
return nil, err
}

projectID := ""
postID := ""
if space == "pgs" { // figure out project ID
project, err := dbpool.FindProjectByName(user.ID, props.ProjectName)
if err != nil {
return nil, err
}
projectID = project.ID
} else if space == "prose" { // figure out post ID
if path == "" || path == "/" {
} else {
post, err := dbpool.FindPostWithSlug(path, user.ID, space)
if err != nil {
return nil, err
}
postID = post.ID
}
}

return &db.AnalyticsVisits{
UserID: user.ID,
ProjectID: projectID,
PostID: postID,
Namespace: space,
Host: host,
Path: path,
IpAddress: access.Request.ClientIP,
UserAgent: strings.Join(access.Request.Headers.UserAgent, " "),
Referer: strings.Join(access.Request.Headers.Referer, " "),
ContentType: strings.Join(access.RespHeaders.ContentType, " "),
Status: access.Status,
}, nil
}

// this feels really stupid because i'm taking containter-drain,
// filtering it, and then sending it to metric-drain. The
// metricDrainSub function listens on the metric-drain and saves it.
// So why not just call the necessary functions to save the visit?
// We want to be able to use pipe as a debugging tool which means we
// can manually sub to `metric-drain` and have a nice clean output to view.
func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
info := shared.NewPicoPipeClient()
drain := pipe.NewReconnectReadWriteCloser(
ctx,
logger,
info,
"container drain",
"sub container-drain -k",
100,
-1,
)

send := pipe.NewReconnectReadWriteCloser(
ctx,
logger,
info,
"from container drain to metric drain",
"pub metric-drain -b=false",
100,
-1,
)

for {
scanner := bufio.NewScanner(drain)
for scanner.Scan() {
line := scanner.Text()
if strings.Contains(line, "http.log.access") {
clean := strings.TrimSpace(line)
visit, err := accessLogToVisit(dbpool, clean)
if err != nil {
logger.Debug("could not convert access log to a visit", "err", err)
continue
}
jso, err := json.Marshal(visit)
if err != nil {
logger.Error("could not marshal json of a visit", "err", err)
continue
}
_, _ = send.Write(jso)
}
}
}
}

func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
accessLog := CaddyAccessLog{}
err := json.Unmarshal([]byte(line), &accessLog)
if err != nil {
return nil, err
}

return deserializeCaddyAccessLog(dbpool, &accessLog)
}

func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
drain := metrics.ReconnectReadMetrics(
ctx,
Expand All @@ -594,30 +745,26 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
visit := db.AnalyticsVisits{}
err := json.Unmarshal([]byte(line), &visit)
if err != nil {
logger.Error("json unmarshal", "err", err)
logger.Info("could not unmarshal json", "err", err, "line", line)
continue
}

user := slog.Any("userId", visit.UserID)

err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
if err != nil {
if !errors.Is(err, shared.ErrAnalyticsDisabled) {
logger.Info("could not record analytics visit", "reason", err, "visit", visit, user)
continue
logger.Info("could not record analytics visit", "reason", err)
}
}

logger.Info("inserting visit", "visit", visit, user)
if visit.ContentType != "" && !strings.HasPrefix(visit.ContentType, "text/html") {
continue
}

logger.Info("inserting visit", "visit", visit)
err = dbpool.InsertVisit(&visit)
if err != nil {
logger.Error("could not insert visit record", "err", err, "visit", visit, user)
logger.Error("could not insert visit record", "err", err)
}
}

if scanner.Err() != nil {
logger.Error("scanner error", "err", scanner.Err())
}
}
}

Expand Down Expand Up @@ -689,6 +836,9 @@ func StartApiServer() {

// gather metrics in the auth service
go metricDrainSub(ctx, db, logger, cfg.Secret)
// convert container logs to access logs
go containerDrainSub(ctx, db, logger)

defer ctx.Done()

apiConfig := &shared.ApiConfig{
Expand Down
23 changes: 12 additions & 11 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,18 @@ type PostAnalytics struct {
}

type AnalyticsVisits struct {
ID string `json:"id"`
UserID string `json:"user_id"`
ProjectID string `json:"project_id"`
PostID string `json:"post_id"`
Namespace string `json:"namespace"`
Host string `json:"host"`
Path string `json:"path"`
IpAddress string `json:"ip_address"`
UserAgent string `json:"user_agent"`
Referer string `json:"referer"`
Status int `json:"status"`
ID string `json:"id"`
UserID string `json:"user_id"`
ProjectID string `json:"project_id"`
PostID string `json:"post_id"`
Namespace string `json:"namespace"`
Host string `json:"host"`
Path string `json:"path"`
IpAddress string `json:"ip_address"`
UserAgent string `json:"user_agent"`
Referer string `json:"referer"`
Status int `json:"status"`
ContentType string `json:"content_type"`
}

type VisitInterval struct {
Expand Down
3 changes: 2 additions & 1 deletion db/postgres/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ func newNullString(s string) sql.NullString {

func (me *PsqlDB) InsertVisit(visit *db.AnalyticsVisits) error {
_, err := me.Db.Exec(
`INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`,
`INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status, content_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`,
visit.UserID,
newNullString(visit.ProjectID),
newNullString(visit.PostID),
Expand All @@ -997,6 +997,7 @@ func (me *PsqlDB) InsertVisit(visit *db.AnalyticsVisits) error {
visit.UserAgent,
visit.Referer,
visit.Status,
visit.ContentType,
)
return err
}
Expand Down
2 changes: 0 additions & 2 deletions imgs/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ func ImgRequest(w http.ResponseWriter, r *http.Request) {
dbpool := shared.GetDB(r)
logger := shared.GetLogger(r)
username := shared.GetUsernameFromRequest(r)
analytics := shared.GetAnalyticsQueue(r)

user, err := dbpool.FindUserForName(username)
if err != nil {
Expand Down Expand Up @@ -241,7 +240,6 @@ func ImgRequest(w http.ResponseWriter, r *http.Request) {
logger,
dbpool,
st,
analytics,
)
router.ServeAsset(fname, opts, true, anyPerm, w, r)
}
Expand Down
5 changes: 0 additions & 5 deletions pastes/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ type PostPageData struct {
Unlisted bool
}

type TransparencyPageData struct {
Site shared.SitePageData
Analytics *db.Analytics
}

type Link struct {
URL string
Text string
Expand Down
10 changes: 3 additions & 7 deletions pgs/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/charmbracelet/promwish"
"github.com/charmbracelet/ssh"
"github.com/charmbracelet/wish"
"github.com/picosh/pico/db"
"github.com/picosh/pico/db/postgres"
"github.com/picosh/pico/shared"
"github.com/picosh/pico/shared/storage"
Expand Down Expand Up @@ -81,13 +80,10 @@ func StartSshServer() {
st,
)

ch := make(chan *db.AnalyticsVisits, 100)
go shared.AnalyticsCollect(ch, dbpool, logger)
apiConfig := &shared.ApiConfig{
Cfg: cfg,
Dbpool: dbpool,
Storage: st,
AnalyticsQueue: ch,
Cfg: cfg,
Dbpool: dbpool,
Storage: st,
}

webTunnel := &tunkit.WebTunnelHandler{
Expand Down
3 changes: 1 addition & 2 deletions pgs/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func createHttpHandler(apiConfig *shared.ApiConfig) CtxHttpBridge {
"pubkey", pubkeyStr,
)

props, err := getProjectFromSubdomain(subdomain)
props, err := shared.GetProjectFromSubdomain(subdomain)
if err != nil {
log.Error(err.Error())
return http.HandlerFunc(shared.UnauthorizedHandler)
Expand Down Expand Up @@ -121,7 +121,6 @@ func createHttpHandler(apiConfig *shared.ApiConfig) CtxHttpBridge {
logger,
apiConfig.Dbpool,
apiConfig.Storage,
apiConfig.AnalyticsQueue,
)
tunnelRouter := TunnelWebRouter{routes}
router := http.NewServeMux()
Expand Down
Loading

0 comments on commit 4e0839a

Please sign in to comment.