Skip to content

Commit

Permalink
feat: 记录每一次变更的事件
Browse files Browse the repository at this point in the history
  • Loading branch information
Ambition9186 committed Oct 12, 2024
1 parent cffe7a7 commit 8a19e2d
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 4 deletions.
57 changes: 55 additions & 2 deletions client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,20 @@ func (r *Release) compareRelease() (bool, error) {
logger.Warn("can not find metadata file, maybe you should exec pull command first")
return false, nil
}
if lastMetadata.ReleaseID == r.ReleaseID && util.StrSlicesEqual(lastMetadata.ConfigMatches, r.AppMate.Match) {

lastChangeEventData, err := eventmeta.GetLatestChangeEventFromFile(r.AppDir)
if err != nil {
logger.Error("get metadata file failed", logger.ErrAttr(err))
return false, err
}

if lastChangeEventData == nil {
logger.Warn("can not find change event file, maybe you should exec pull command first")
return false, err
}

if lastMetadata.ReleaseID == r.ReleaseID && util.StrSlicesEqual(lastMetadata.ConfigMatches, r.AppMate.Match) &&
lastChangeEventData.ReleaseID == r.ReleaseID && lastChangeEventData.Status == eventmeta.EventStatusSuccess {
r.AppMate.CurrentReleaseID = r.ReleaseID
logger.Info("current release is consistent with the received release, skip", slog.Any("releaseID", r.ReleaseID))
return true, nil
Expand Down Expand Up @@ -412,8 +425,27 @@ func (r *Release) Execute(steps ...Function) error {
}

// sendVersionChangeMessaging 发送客户端版本变更信息
func (r *Release) sendVersionChangeMessaging(bd *sfs.BasicData) error {
func (r *Release) sendVersionChangeMessaging(bd *sfs.BasicData) (err error) {
r.AppMate.FailedDetailReason = util.TruncateString(r.AppMate.FailedDetailReason, 1024)

defer func(r *Release) {
// 在上报完消息后记录变更的ID以及成功还是失败
if r.AppMate.ReleaseChangeStatus != sfs.Processing {
err = r.recordChangeEvent()
}
}(r)

changeEvent, err := eventmeta.GetLatestChangeEventFromFile(r.AppDir)
if err != nil {
return err
}

if r.ClientMode != sfs.Pull {
if changeEvent != nil && changeEvent.ReleaseID > 0 {
r.AppMate.CurrentReleaseID = changeEvent.ReleaseID
}
}

pullPayload := sfs.VersionChangePayload{
BasicData: bd,
Application: r.AppMate,
Expand Down Expand Up @@ -610,3 +642,24 @@ func updateFiles(filesDir string, files []*ConfigItemFile, successDownloads *int

return nil
}

// recordChangeEvent 记录变更事件
func (r *Release) recordChangeEvent() error {
var eventStatus eventmeta.EventStatus
if r.AppMate.ReleaseChangeStatus == sfs.Success {
eventStatus = eventmeta.EventStatusSuccess
} else {
eventStatus = eventmeta.EventStatusFailed
}
metadata := &eventmeta.ChangeEvent{
ReleaseID: r.ReleaseID,
Status: eventStatus,
}
err := eventmeta.RecordChangeEvent(r.AppDir, metadata)
if err != nil {
logger.Error("record change event failed", logger.ErrAttr(err))
return err
}
return nil
}

Check failure on line 665 in client/types.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/TencentBlueKing/bscp-go) (gci)
3 changes: 1 addition & 2 deletions client/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,7 @@ func (w *watcher) OnReleaseChange(event *sfs.ReleaseChangeEvent) { // nolint
for _, subscriber := range w.subscribers {
if subscriber.App == pl.Instance.App &&
subscriber.UID == pl.Instance.Uid &&
reflect.DeepEqual(subscriber.Labels, pl.Instance.Labels) &&
subscriber.CurrentReleaseID != pl.ReleaseMeta.ReleaseID {
reflect.DeepEqual(subscriber.Labels, pl.Instance.Labels) {

// 更新心跳数据需要cursorID
subscriber.CursorID = cursorID
Expand Down
96 changes: 96 additions & 0 deletions internal/util/eventmeta/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,99 @@ func GetLatestMetadataFromFile(tempDir string) (*EventMeta, bool, error) {

return metadata, true, nil
}

// ChangeEvent 记录变更事件的结构体
type ChangeEvent struct {
// ReleaseID release id
ReleaseID uint32 `json:"releaseID"`
// Status event status
Status EventStatus `json:"status"`
}

// RecordChangeEvent 记录变更的事件
func RecordChangeEvent(tempDir string, eventData *ChangeEvent) error {
if tempDir == "" {
return sfs.WrapPrimaryError(sfs.UnknownFailed,
sfs.SecondaryError{SpecificFailedReason: sfs.FilePathNotFound,
Err: errors.New("the file path for record change event is empty")})
}

// prepare temp dir, make sure it exists
if err := os.MkdirAll(tempDir, os.ModePerm); err != nil {
return sfs.WrapPrimaryError(sfs.UnknownFailed,
sfs.SecondaryError{SpecificFailedReason: sfs.NewFolderFailed, Err: err})
}

metaFilePath := filepath.Join(tempDir, "changeevent.json")

metaFile, err := os.OpenFile(metaFilePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return sfs.WrapPrimaryError(sfs.UnknownFailed,
sfs.SecondaryError{SpecificFailedReason: sfs.OpenFileFailed,
Err: fmt.Errorf("open record change event file failed, err: %s", err.Error())})
}
defer metaFile.Close()

b, err := json.Marshal(eventData)
if err != nil {
return sfs.WrapPrimaryError(sfs.UpdateMetadataFailed,
sfs.SecondaryError{SpecificFailedReason: sfs.SerializationFailed,
Err: fmt.Errorf("marshal metadata failed, err: %s", err.Error())})
}
compress := bytes.NewBuffer([]byte{})
if err := json.Compact(compress, b); err != nil {
return sfs.WrapPrimaryError(sfs.UpdateMetadataFailed,
sfs.SecondaryError{SpecificFailedReason: sfs.FormattingFailed,
Err: fmt.Errorf("compress metadata failed, err: %s", err.Error())})
}
if _, err := metaFile.WriteString(compress.String() + "\n"); err != nil {
return sfs.WrapPrimaryError(sfs.UpdateMetadataFailed,
sfs.SecondaryError{SpecificFailedReason: sfs.WriteFileFailed,
Err: fmt.Errorf("record change event failed, err: %s", err.Error())})
}

logger.Info("record change event success", slog.String("event", compress.String()))

return nil
}

// GetLatestChangeEventFromFile get the last data from the file that change event
func GetLatestChangeEventFromFile(tempDir string) (*ChangeEvent, error) {
if tempDir == "" {
return nil, sfs.WrapPrimaryError(sfs.UnknownFailed,
sfs.SecondaryError{SpecificFailedReason: sfs.DataEmpty,
Err: errors.New("the file path for record change event is empty")})
}

metaFilePath := filepath.Join(tempDir, "changeevent.json")

metaFile, err := os.Open(metaFilePath)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, sfs.WrapPrimaryError(sfs.UpdateMetadataFailed,
sfs.SecondaryError{SpecificFailedReason: sfs.OpenFileFailed,
Err: err})
}
defer metaFile.Close()
var lastLine string
scanner := bufio.NewScanner(metaFile)
for scanner.Scan() {
lastLine = scanner.Text()
}
if err := scanner.Err(); err != nil {
return nil, sfs.WrapPrimaryError(sfs.UpdateMetadataFailed,
sfs.SecondaryError{SpecificFailedReason: sfs.ReadFileFailed,
Err: err})
}

metadata := &ChangeEvent{}
if err := json.Unmarshal([]byte(lastLine), metadata); err != nil {
return nil, sfs.WrapPrimaryError(sfs.UpdateMetadataFailed,
sfs.SecondaryError{SpecificFailedReason: sfs.SerializationFailed,
Err: fmt.Errorf("unmarshal metadata failed, err: %s", err.Error())})
}

return metadata, nil
}

0 comments on commit 8a19e2d

Please sign in to comment.