Skip to content

Commit

Permalink
✨ feat: support query cache (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
lwnmengjing committed May 10, 2024
1 parent 8f13aa6 commit bcd184a
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 11 deletions.
16 changes: 13 additions & 3 deletions core/logger/writer/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"io"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -55,11 +58,18 @@ func (p *LokiWriter) Write(data []byte) (n int, err error) {

func (p *LokiWriter) write() {
entries := make([]logproto.Entry, 0)
defer func() {
_ = p.send(entries)
}()

done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case <-done:
err := p.send(entries)
if err != nil {
slog.Error("application exit, send to loki failed", slog.String("error", err.Error()))
err = nil
}
return
case <-time.After(p.opts.lokiInterval):
// send to loki
if len(entries) > 0 {
Expand Down
15 changes: 12 additions & 3 deletions core/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import (
"errors"
"fmt"
"log/slog"
"os"
"os/signal"
"sync"
"syscall"
)

// Server server
Expand Down Expand Up @@ -44,7 +47,7 @@ func New(opts ...Option) Manager {
return s
}

// Add add runnable
// Add runnable
func (e *Server) Add(r ...Runnable) {
if e.services == nil {
e.services = make(map[string]Runnable)
Expand All @@ -57,7 +60,7 @@ func (e *Server) Add(r ...Runnable) {
}
}

// Start start runnable
// Start runnable
func (e *Server) Start(ctx context.Context) (err error) {
//e.mux.Lock()
//defer e.mux.Unlock()
Expand Down Expand Up @@ -87,12 +90,18 @@ func (e *Server) Start(ctx context.Context) (err error) {
e.startRunnable(e.services[k])
e.setStarted(k)
}
done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
e.waitForRunnable.Wait()
select {
case <-ctx.Done():
return nil
case err := <-e.errChan:
case err = <-e.errChan:
return err
case <-done:
// 优雅退出
fmt.Println("received SIGINT, shutting down server")
return nil
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/response/actions/gorm/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ package gorm
*/

import (
"context"
"net/http"

"github.com/gin-gonic/gin"
)

var CleanCacheFromTag func(ctx context.Context, tag string) error

// Base action
type Base struct {
opts *Options
Expand Down
6 changes: 5 additions & 1 deletion pkg/response/actions/gorm/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package gorm
*/

import (
"context"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -129,7 +130,7 @@ func (e *Control) update(c *gin.Context) {
api.Err(http.StatusUnprocessableEntity)
return
}
query := gormdb.DB.Where(e.opts.Key, id)
query := gormdb.DB.WithContext(context.WithValue(c, "gorm:cache:tag", m.TableName())).Where(e.opts.Key, id)
if e.opts.Scope != nil {
query = query.Scopes(e.opts.Scope(c, m))
}
Expand Down Expand Up @@ -168,6 +169,9 @@ func (e *Control) update(c *gin.Context) {
api.Err(http.StatusInternalServerError)
return
}
if CleanCacheFromTag != nil {
_ = CleanCacheFromTag(c, m.TableName())
}
if e.opts.AfterUpdate != nil {
err = e.opts.AfterUpdate(c, query, m)
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion pkg/response/actions/gorm/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package gorm
*/

import (
"context"
"fmt"
"net/http"

Expand Down Expand Up @@ -80,7 +81,8 @@ func (e *Delete) delete(c *gin.Context, ids ...string) {
return
}
}
query := gormdb.DB.WithContext(c).Where(fmt.Sprintf("%s IN ?", e.opts.Key), ids)
query := gormdb.DB.WithContext(context.WithValue(c, "gorm:cache:tag", e.opts.Model.TableName())).
Where(fmt.Sprintf("%s IN ?", e.opts.Key), ids)
if e.opts.Scope != nil {
query = query.Scopes(e.opts.Scope(c, e.opts.Model))
}
Expand All @@ -90,6 +92,9 @@ func (e *Delete) delete(c *gin.Context, ids ...string) {
api.Err(http.StatusInternalServerError)
return
}
if CleanCacheFromTag != nil {
_ = CleanCacheFromTag(c, e.opts.Model.TableName())
}
if e.opts.AfterDelete != nil {
if err = e.opts.AfterDelete(c, gormdb.DB, e.opts.Model); err != nil {
api.AddError(err).Log.ErrorContext(c, "AfterDelete error", "error", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/response/actions/gorm/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package gorm
*/

import (
"context"
"errors"
"net/http"

Expand Down Expand Up @@ -63,7 +64,7 @@ func (e *Get) get(c *gin.Context, key string) {
api := response.Make(c)
m := pkg.TablerDeepCopy(e.opts.Model)
preloads := c.QueryArray("preloads[]")
query := gormdb.DB.Model(m).Where("id = ?", c.Param(key))
query := gormdb.DB.WithContext(context.WithValue(c, "gorm:cache:tag", m.TableName())).Model(m).Where("id = ?", c.Param(key))

if e.opts.BeforeGet != nil {
if err := e.opts.BeforeGet(c, query, m); err != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/response/actions/gorm/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package gorm
*/

import (
"context"
"errors"
"net/http"
"strings"
Expand Down Expand Up @@ -68,9 +69,10 @@ func (e *Search) search(c *gin.Context) {
api.Err(http.StatusUnprocessableEntity)
return
}
db := gormdb.DB
m := pkg.TablerDeepCopy(e.opts.Model)

db := gormdb.DB.WithContext(context.WithValue(c, "gorm:cache:tag", m.TableName()))

if e.opts.BeforeSearch != nil {
if err := e.opts.BeforeSearch(c, db, m); err != nil {
api.AddError(err).Log.Error("BeforeSearch error", "error", err)
Expand All @@ -87,7 +89,7 @@ func (e *Search) search(c *gin.Context) {
if e.opts.Scope != nil {
scopes = append(scopes, e.opts.Scope(c, m))
}
query := db.WithContext(c).Model(m).Scopes(scopes...)
query := db.Model(m).Scopes(scopes...)
//if err := query.Limit(-1).Offset(-1).Count(&count).Error; err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
// api.AddError(err).Log.ErrorContext(c, "Search error", "error", err)
// api.Err(http.StatusInternalServerError)
Expand Down

0 comments on commit bcd184a

Please sign in to comment.