diff --git a/CHANGES.md b/CHANGES.md index a75760a..d7b62a6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -12,6 +12,24 @@ ## develop +## 2024.2.0 + +- [CHANGE] retry 設定を削除し、リトライ回数を指定する max_retry 設定を追加する + - リトライしない場合は、max_retry を設定ファイルから削除するか、または、max_retry = 0 を設定する + - デフォルト値: 0 (リトライ無し) + - @Hexa +- [ADD] サービス接続時のエラーによるリトライまでの時間間隔を指定する retry_interval_ms 設定(ミリ秒間隔)を追加する + - デフォルト値: 100 (100 ms) + - @Hexa +- [ADD] サービス接続時の特定のエラー発生時に、リトライする仕組みを追加する + - @Hexa +- [ADD] ハンドラーにリトライ回数を管理するメソッドを追加する + - @Hexa +- [CHANGE] aws への接続時に、時間をおいて再接続できる可能性がある HTTP ステータスコードが 429 の応答の場合は、指定されたリトライ設定に応じて、再接続を試みるように変更する + - @Hexa +- [CHANGE] aws、または、gcp への接続後にリトライ回数が max_retry を超えた場合は、{"type": "error", "reason": string} をクライアントへ送信する + - @Hexa + ## 2024.1.0 - [UPDATE] go.mod の Go のバージョンを 1.22.0 にあげる diff --git a/VERSION b/VERSION index a73a851..7e08acf 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2024.1.0 +2024.2.0 diff --git a/amazon_transcribe.go b/amazon_transcribe.go index 0b0e4da..ce61cbc 100644 --- a/amazon_transcribe.go +++ b/amazon_transcribe.go @@ -99,9 +99,16 @@ func (at *AmazonTranscribe) Start(ctx context.Context, r io.Reader) (*transcribe if reqErr, ok := err.(awserr.RequestFailure); ok { code := reqErr.StatusCode() message := reqErr.Message() + + var retry bool + if code == http.StatusTooManyRequests { + retry = true + } + return nil, &SuzuError{ Code: code, Message: message, + Retry: retry, } } return nil, err diff --git a/amazon_transcribe_handler.go b/amazon_transcribe_handler.go index 32214b2..dd3717d 100644 --- a/amazon_transcribe_handler.go +++ b/amazon_transcribe_handler.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "io" + "sync" "github.com/aws/aws-sdk-go/service/transcribestreamingservice" zlog "github.com/rs/zerolog/log" @@ -22,6 +23,8 @@ type AmazonTranscribeHandler struct { SampleRate uint32 ChannelCount uint16 LanguageCode string + RetryCount int + mu sync.Mutex OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error } @@ -34,6 +37,7 @@ func NewAmazonTranscribeHandler(config Config, channelID, connectionID string, s SampleRate: sampleRate, ChannelCount: channelCount, LanguageCode: languageCode, + RetryCount: 0, OnResultFunc: onResultFunc.(func(context.Context, io.WriteCloser, string, string, string, any) error), } } @@ -67,6 +71,24 @@ func (ar *AwsResult) SetMessage(message string) *AwsResult { return ar } +func (h *AmazonTranscribeHandler) UpdateRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount++ + return h.RetryCount +} + +func (h *AmazonTranscribeHandler) GetRetryCount() int { + return h.RetryCount +} + +func (h *AmazonTranscribeHandler) ResetRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount = 0 + return h.RetryCount +} + func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { at := NewAmazonTranscribe(h.Config, h.LanguageCode, int64(h.SampleRate), int64(h.ChannelCount)) @@ -153,17 +175,18 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) } if err := stream.Err(); err != nil { + zlog.Error(). + Err(err). + Str("channel_id", h.ChannelID). + Str("connection_id", h.ConnectionID). + Int("retry_count", h.GetRetryCount()). + Send() + // 復帰が不可能なエラー以外は再接続を試みる switch err.(type) { case *transcribestreamingservice.LimitExceededException: - zlog.Error(). - Err(err). - Str("channel_id", h.ChannelID). - Str("connection_id", h.ConnectionID). - Send() - - // リトライしない設定の場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する - if !*at.Config.Retry { + // リトライしない設定の場合、または、max_retry を超えた場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する + if (at.Config.MaxRetry < 1) || (at.Config.MaxRetry <= h.GetRetryCount()) { if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil { zlog.Error(). Err(err). diff --git a/config.go b/config.go index 4fee353..45e47e8 100644 --- a/config.go +++ b/config.go @@ -27,6 +27,9 @@ const ( // 100ms DefaultTimeToWaitForOpusPacketMs = 100 + + // リトライ間隔 100ms + DefaultRetryIntervalMs = 100 ) type Config struct { @@ -46,7 +49,8 @@ type Config struct { HTTP2MaxReadFrameSize uint32 `ini:"http2_max_read_frame_size"` HTTP2IdleTimeout uint32 `ini:"http2_idle_timeout"` - Retry *bool `ini:"retry"` + MaxRetry int `ini:"max_retry"` + RetryIntervalMs int `ini:"retry_interval_ms"` ExporterHTTPS bool `ini:"exporter_https"` ExporterListenAddr string `ini:"exporter_listen_addr"` @@ -160,12 +164,11 @@ func setDefaultsConfig(config *Config) { config.TimeToWaitForOpusPacketMs = DefaultTimeToWaitForOpusPacketMs } - // 未指定の場合は true - if config.Retry == nil { - defaultRetry := true - config.Retry = &defaultRetry + if config.RetryIntervalMs == 0 { + config.RetryIntervalMs = DefaultRetryIntervalMs } } + func validateConfig(config *Config) error { var err error // アドレスとして正しいことを確認する @@ -213,4 +216,6 @@ func ShowConfig(config *Config) { zlog.Info().Str("exporter_listen_addr", config.ExporterListenAddr).Msg("CONF") zlog.Info().Int("exporter_listen_port", config.ExporterListenPort).Msg("CONF") + zlog.Info().Int("max_retry", config.MaxRetry).Msg("CONF") + zlog.Info().Int("retry_interval_ms", config.RetryIntervalMs).Msg("CONF") } diff --git a/config_example.ini b/config_example.ini index be8bf5b..90ee00d 100644 --- a/config_example.ini +++ b/config_example.ini @@ -44,8 +44,10 @@ audio_channel_count = 1 # 受信した音声データの保存先ファイルです dump_file = ./dump.jsonl -# サーバからの切断時に再接続を試みます -retry = true +# サーバからの切断時またはハンドラー個別で指定した条件でのリトライ回数を指定します +max_retry = 0 +# リトライ間隔(ミリ秒)です +retry_interval_ms = 100 # aws の場合は IsPartial が false, gcp の場合は IsFinal が true の場合の最終的な結果のみを返す指定 final_result_only = true diff --git a/errors.go b/errors.go index 34a3551..ce7ecd8 100644 --- a/errors.go +++ b/errors.go @@ -3,8 +3,13 @@ package suzu type SuzuError struct { Code int Message string + Retry bool } func (e *SuzuError) Error() string { return e.Message } + +func (e *SuzuError) IsRetry() bool { + return e.Retry +} diff --git a/go.mod b/go.mod index e8a58ac..0e65414 100644 --- a/go.mod +++ b/go.mod @@ -3,19 +3,19 @@ module github.com/shiguredo/suzu go 1.22.0 require ( - cloud.google.com/go/speech v1.21.1 - github.com/aws/aws-sdk-go v1.50.20 + cloud.google.com/go/speech v1.22.0 + github.com/aws/aws-sdk-go v1.50.30 github.com/labstack/echo-contrib v0.15.0 github.com/labstack/echo/v4 v4.11.4 github.com/pion/randutil v0.1.0 github.com/pion/rtp v1.8.3 github.com/rs/zerolog v1.32.0 - github.com/stretchr/testify v1.8.4 - golang.org/x/exp v0.0.0-20240213143201-ec583247a57a + github.com/stretchr/testify v1.9.0 + golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 golang.org/x/net v0.21.0 golang.org/x/sync v0.6.0 - google.golang.org/api v0.165.0 - google.golang.org/grpc v1.61.1 + google.golang.org/api v0.167.0 + google.golang.org/grpc v1.62.0 google.golang.org/protobuf v1.32.0 gopkg.in/ini.v1 v1.67.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 @@ -23,9 +23,9 @@ require ( require ( cloud.google.com/go v0.112.0 // indirect - cloud.google.com/go/compute v1.23.3 // indirect + cloud.google.com/go/compute v1.24.0 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect - cloud.google.com/go/longrunning v0.5.4 // indirect + cloud.google.com/go/longrunning v0.5.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -37,7 +37,7 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/google/s2a-go v0.1.7 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect - github.com/googleapis/gax-go/v2 v2.12.0 // indirect + github.com/googleapis/gax-go/v2 v2.12.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/labstack/gommon v0.4.2 // indirect @@ -52,8 +52,8 @@ require ( github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.48.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.48.0 // indirect go.opentelemetry.io/otel v1.23.0 // indirect go.opentelemetry.io/otel/metric v1.23.0 // indirect go.opentelemetry.io/otel/trace v1.23.0 // indirect @@ -63,8 +63,8 @@ require ( golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/appengine v1.6.8 // indirect - google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014 // indirect + google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240221002015-b0ce06bbee7c // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index e0ea80e..e948900 100644 --- a/go.sum +++ b/go.sum @@ -7,14 +7,22 @@ cloud.google.com/go v0.112.0 h1:tpFCD7hpHFlQ8yPwT3x+QeXqc2T6+n6T+hmABHfDUSM= cloud.google.com/go v0.112.0/go.mod h1:3jEEVwZ/MHU4djK5t5RHuKOA/GbLddgTdVubX1qnPD4= cloud.google.com/go/compute v1.23.3 h1:6sVlXXBmbd7jNX0Ipq0trII3e4n1/MsADLK6a+aiVlk= cloud.google.com/go/compute v1.23.3/go.mod h1:VCgBUoMnIVIR0CscqQiPJLAG25E3ZRZMzcFZeQ+h8CI= +cloud.google.com/go/compute v1.23.4 h1:EBT9Nw4q3zyE7G45Wvv3MzolIrCJEuHys5muLY0wvAw= +cloud.google.com/go/compute v1.23.4/go.mod h1:/EJMj55asU6kAFnuZET8zqgwgJ9FvXWXOkkfQZa4ioI= +cloud.google.com/go/compute v1.24.0 h1:phWcR2eWzRJaL/kOiJwfFsPs4BaKq1j6vnpZrc1YlVg= +cloud.google.com/go/compute v1.24.0/go.mod h1:kw1/T+h/+tK2LJK0wiPPx1intgdAM3j/g3hFDlscY40= cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/longrunning v0.5.4 h1:w8xEcbZodnA2BbW6sVirkkoC+1gP8wS57EUUgGS0GVg= cloud.google.com/go/longrunning v0.5.4/go.mod h1:zqNVncI0BOP8ST6XQD1+VcvuShMmq7+xFSzOL++V0dI= +cloud.google.com/go/longrunning v0.5.5 h1:GOE6pZFdSrTb4KAiKnXsJBtlE6mEyaW44oKyMILWnOg= +cloud.google.com/go/longrunning v0.5.5/go.mod h1:WV2LAxD8/rg5Z1cNW6FJ/ZpX4E4VnDnoTk0yawPBB7s= cloud.google.com/go/speech v1.21.0 h1:qkxNao58oF8ghAHE1Eghen7XepawYEN5zuZXYWaUTA4= cloud.google.com/go/speech v1.21.0/go.mod h1:wwolycgONvfz2EDU8rKuHRW3+wc9ILPsAWoikBEWavY= cloud.google.com/go/speech v1.21.1 h1:nuFc+Kj5B8de75nN4FdPyUbI2SiBoHZG6BLurXL56Q0= cloud.google.com/go/speech v1.21.1/go.mod h1:E5GHZXYQlkqWQwY5xRSLHw2ci5NMQNG52FfMU1aZrIA= +cloud.google.com/go/speech v1.22.0 h1:AWpbl2POalAOvO5uudJoaknkFNhATuBVODozDXyTD1Q= +cloud.google.com/go/speech v1.22.0/go.mod h1:d7pmrSKyrD12c7dRrjqgA/U0eeyZs0i4VpvOlpJXEBA= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/aws/aws-sdk-go v1.49.4 h1:qiXsqEeLLhdLgUIyfr5ot+N/dGPWALmtM1SetRmbUlY= github.com/aws/aws-sdk-go v1.49.4/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= @@ -36,6 +44,10 @@ github.com/aws/aws-sdk-go v1.50.15 h1:wEMnPfEQQFaoIJwuO18zq/vtG4Ft7NxQ3r9xlEi/8z github.com/aws/aws-sdk-go v1.50.15/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= github.com/aws/aws-sdk-go v1.50.20 h1:xfAnSDVf/azIWTVQXQODp89bubvCS85r70O3nuQ4dnE= github.com/aws/aws-sdk-go v1.50.20/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= +github.com/aws/aws-sdk-go v1.50.25 h1:vhiHtLYybv1Nhx3Kv18BBC6L0aPJHaG9aeEsr92W99c= +github.com/aws/aws-sdk-go v1.50.25/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= +github.com/aws/aws-sdk-go v1.50.30 h1:2OelKH1eayeaH7OuL1Y9Ombfw4HK+/k0fEnJNWjyLts= +github.com/aws/aws-sdk-go v1.50.30/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -102,6 +114,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfF github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= +github.com/googleapis/gax-go/v2 v2.12.1 h1:9F8GV9r9ztXyAi00gsMQHNoF51xPZm8uj1dpYt2ZETM= +github.com/googleapis/gax-go/v2 v2.12.1/go.mod h1:61M8vcyyXR2kqKFxKrfA22jaA8JGF7Dc8App1U3H6jc= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= @@ -163,6 +177,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= @@ -175,10 +191,14 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.4 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 h1:UNQQKPfTDe1J81ViolILjTKPr9WetKW6uei2hFgJmFs= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0/go.mod h1:r9vWsPS/3AQItv3OSlEJ/E4mbrhUbbw18meOjArPtKQ= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.48.0 h1:P+/g8GpuJGYbOp2tAdKrIPUX9JO02q8Q0YNlHolpibA= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.48.0/go.mod h1:tIKj3DbO8N9Y2xo52og3irLsPI4GW02DSMtrVgNMgxg= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJMNjENlcleuuOkGAPH82y0yULBScfXcIEdS24= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1/go.mod h1:sEGXWArGqc3tVa+ekntsN65DmVbVeW+7lTKTjZF3/Fo= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 h1:sv9kVfal0MK0wBMCOGr+HeJm9v803BkJxGrk2au7j08= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0/go.mod h1:SK2UL73Zy1quvRPonmOmRDiWk1KBV3LyIeeIxcEApWw= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.48.0 h1:doUP+ExOpH3spVTLS0FcWGLnQrPct/hD/bCPbDRUEAU= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.48.0/go.mod h1:rdENBZMT2OE6Ne/KLwpiXudnAsbdrdBaqBvTN8M8BgA= go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= go.opentelemetry.io/otel v1.22.0 h1:xS7Ku+7yTFvDfDraDIJVpw7XPyuHlB9MCiqqX5mcJ6Y= @@ -225,6 +245,8 @@ golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3 h1:/RIbNt/Zr7rVhIkQhooTxCxFc golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= golang.org/x/exp v0.0.0-20240213143201-ec583247a57a h1:HinSgX1tJRX3KsL//Gxynpw5CTOAIPhgL4W8PNiIpVE= golang.org/x/exp v0.0.0-20240213143201-ec583247a57a/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= +golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ= +golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -315,6 +337,8 @@ google.golang.org/api v0.163.0 h1:4BBDpPaSH+H28NhnX+WwjXxbRLQ7TWuEKp4BQyEjxvk= google.golang.org/api v0.163.0/go.mod h1:6SulDkfoBIg4NFmCuZ39XeeAgSHCPecfSUuDyYlAHs0= google.golang.org/api v0.165.0 h1:zd5d4JIIIaYYsfVy1HzoXYZ9rWCSBxxAglbczzo7Bgc= google.golang.org/api v0.165.0/go.mod h1:2OatzO7ZDQsoS7IFf3rvsE17/TldiU3F/zxFHeqUB5o= +google.golang.org/api v0.167.0 h1:CKHrQD1BLRii6xdkatBDXyKzM0mkawt2QP+H3LtPmSE= +google.golang.org/api v0.167.0/go.mod h1:4FcBc686KFi7QI/U51/2GKKevfZMpM17sCdibqe/bSA= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= @@ -334,6 +358,10 @@ google.golang.org/genproto v0.0.0-20240116215550-a9fa1716bcac h1:ZL/Teoy/ZGnzyrq google.golang.org/genproto v0.0.0-20240116215550-a9fa1716bcac/go.mod h1:+Rvu7ElI+aLzyDQhpHMFMMltsD6m7nqpuWDd2CwJw3k= google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe h1:USL2DhxfgRchafRvt/wYyyQNzwgL7ZiURcozOE/Pkvo= google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:cc8bqMqtv9gMOr0zHg2Vzff5ULhhL2IXP4sbcn32Dro= +google.golang.org/genproto v0.0.0-20240205150955-31a09d347014 h1:g/4bk7P6TPMkAUbUhquq98xey1slwvuVJPosdBqYJlU= +google.golang.org/genproto v0.0.0-20240205150955-31a09d347014/go.mod h1:xEgQu1e4stdSSsxPDK8Azkrk/ECl5HvdPf6nbZrTS5M= +google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 h1:9+tzLLstTlPTRyJTh+ah5wIMsBW5c4tQwGTN3thOW9Y= +google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:mqHbVIp48Muh7Ywss/AD6I5kNVKZMmAa/QEW58Gxp2s= google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f h1:2yNACc1O40tTnrsbk9Cv6oxiW8pxI/pXj0wRtdlYmgY= google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f/go.mod h1:Uy9bTZJqmfrw2rIBxgGLnamc78euZULUBrLZ9XTITKI= google.golang.org/genproto/googleapis/api v0.0.0-20231211222908-989df2bf70f3 h1:EWIeHfGuUf00zrVZGEgYFxok7plSAXBGcH7NNdMAWvA= @@ -344,6 +372,10 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240102182953-50ed04b92917 h1: google.golang.org/genproto/googleapis/api v0.0.0-20240102182953-50ed04b92917/go.mod h1:CmlNWB9lSezaYELKS5Ym1r44VrrbPUa7JTvw+6MbpJ0= google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe h1:0poefMBYvYbs7g5UkjS6HcxBPaTRAmznle9jnxYoAI8= google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA= +google.golang.org/genproto/googleapis/api v0.0.0-20240205150955-31a09d347014 h1:x9PwdEgd11LgK+orcck69WVRo7DezSO4VUMPI4xpc8A= +google.golang.org/genproto/googleapis/api v0.0.0-20240205150955-31a09d347014/go.mod h1:rbHMSEDyoYX62nRVLOCc4Qt1HbsdytAYoVwgjiOhF3I= +google.golang.org/genproto/googleapis/api v0.0.0-20240221002015-b0ce06bbee7c h1:9g7erC9qu44ks7UK4gDNlnk4kOxZG707xKm4jVniy6o= +google.golang.org/genproto/googleapis/api v0.0.0-20240221002015-b0ce06bbee7c/go.mod h1:5iCWqnniDlqZHrd3neWVTOwvh/v6s3232omMecelax8= google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 h1:DC7wcm+i+P1rN3Ff07vL+OndGg5OhNddHyTA+ocPqYE= google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4/go.mod h1:eJVxU6o+4G1PSczBr85xmyvSNYAKvAYgkub40YGomFM= google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0 h1:/jFB8jK5R3Sq3i/lmeZO0cATSzFfZaJq1J2Euan3XKU= @@ -356,6 +388,10 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s= google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014 h1:FSL3lRCkhaPFxqi0s9o+V4UI2WTzAVOvkgbd4kVV4Wg= google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014/go.mod h1:SaPjaZGWb0lPqs6Ittu0spdfrOArqji4ZdeP5IC/9N4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9 h1:hZB7eLIaYlW9qXRfCq/qDaPdbeY3757uARz5Vvfv+cY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:YUWgXUFRPfoYK1IHMuxH5K6nPEXSCzIMljnQ59lLRCk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c h1:NUsgEN92SQQqzfA+YtqYNqYmB3DMMYLlIwUZAQFVFbo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= @@ -369,6 +405,8 @@ google.golang.org/grpc v1.61.0 h1:TOvOcuXn30kRao+gfcvsebNEa5iZIiLkisYEkf7R7o0= google.golang.org/grpc v1.61.0/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= google.golang.org/grpc v1.61.1 h1:kLAiWrZs7YeDM6MumDe7m3y4aM6wacLzM1Y/wiLP9XY= google.golang.org/grpc v1.61.1/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= +google.golang.org/grpc v1.62.0 h1:HQKZ/fa1bXkX1oFOvSjmZEUL8wLSaZTjCcLAlmZRtdk= +google.golang.org/grpc v1.62.0/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/handler.go b/handler.go index c348a42..cec4245 100644 --- a/handler.go +++ b/handler.go @@ -125,14 +125,13 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte return echo.NewHTTPError(http.StatusInternalServerError) } - retryCount := 0 - // サーバへの接続・結果の送信処理 // サーバへの再接続が期待できる限りは、再接続を試みる for { zlog.Info(). Str("channel_id", h.SoraChannelID). Str("connection_id", h.SoraConnectionID). + Int("retry_count", serviceHandler.GetRetryCount()). Msg("NEW-REQUEST") reader, err := serviceHandler.Handle(ctx, r) @@ -143,10 +142,20 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Str("connection_id", h.SoraConnectionID). Send() if err, ok := err.(*SuzuError); ok { + if err.IsRetry() { + if s.config.MaxRetry > serviceHandler.GetRetryCount() { + serviceHandler.UpdateRetryCount() + + // 連続のリトライを避けるために少し待つ + time.Sleep(time.Duration(s.config.RetryIntervalMs) * time.Millisecond) + + // リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする + continue + } + } // SuzuError の場合はその Status Code を返す return c.NoContent(err.Code) } - // SuzuError 以外の場合は 500 を返す return echo.NewHTTPError(http.StatusInternalServerError, err) } @@ -168,37 +177,42 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Send() return err } else if errors.Is(err, ErrServerDisconnected) { - if *s.config.Retry { - // サーバから切断されたが再度接続できる可能性があるため、接続を試みる - retryCount += 1 - - zlog.Debug(). + if s.config.MaxRetry < 1 { + // サーバから切断されたが再接続させない設定の場合 + zlog.Error(). Err(err). Str("channel_id", h.SoraChannelID). Str("connection_id", h.SoraConnectionID). - Int("retry_count", retryCount). Send() + return err + } + + if s.config.MaxRetry > serviceHandler.GetRetryCount() { + // サーバから切断されたが再度接続できる可能性があるため、接続を試みる + + serviceHandler.UpdateRetryCount() + + // TODO: 必要な場合は連続のリトライを避けるために少し待つ処理を追加する + break } else { - // サーバから切断されたが再接続させない設定の場合 zlog.Error(). Err(err). Str("channel_id", h.SoraChannelID). Str("connection_id", h.SoraConnectionID). Send() - return err + // max_retry を超えた場合は終了 + return c.NoContent(http.StatusOK) } } - zlog.Error(). - Err(err). - Str("channel_id", h.SoraChannelID). - Str("connection_id", h.SoraConnectionID). - Send() // サーバから切断されたが再度の接続が期待できない場合 return err } + // 1 度でも接続結果を受け取れた場合はリトライ回数をリセットする + serviceHandler.ResetRetryCount() + // メッセージが空でない場合はクライアントに結果を送信する if n > 0 { if _, err := c.Response().Write(buf[:n]); err != nil { diff --git a/packet_dump_handler.go b/packet_dump_handler.go index 3d8b1a7..1ed7e94 100644 --- a/packet_dump_handler.go +++ b/packet_dump_handler.go @@ -5,6 +5,7 @@ import ( "encoding/json" "io" "os" + "sync" "time" ) @@ -20,6 +21,8 @@ type PacketDumpHandler struct { SampleRate uint32 ChannelCount uint16 LanguageCode string + RetryCount int + mu sync.Mutex OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error } @@ -46,6 +49,24 @@ type PacketDumpResult struct { Payload []byte `json:"payload"` } +func (h *PacketDumpHandler) UpdateRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount++ + return h.RetryCount +} + +func (h *PacketDumpHandler) GetRetryCount() int { + return h.RetryCount +} + +func (h *PacketDumpHandler) ResetRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount = 0 + return h.RetryCount +} + func (h *PacketDumpHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { c := h.Config filename := c.DumpFile diff --git a/service_handler.go b/service_handler.go index 0064f9f..0a8b722 100644 --- a/service_handler.go +++ b/service_handler.go @@ -16,6 +16,9 @@ var ( type serviceHandlerInterface interface { Handle(context.Context, io.Reader) (*io.PipeReader, error) + UpdateRetryCount() int + GetRetryCount() int + ResetRetryCount() int } type newServiceHandlerFunc func(Config, string, string, uint32, uint16, string, any) serviceHandlerInterface diff --git a/speech_to_text_handler.go b/speech_to_text_handler.go index e49dea0..5c85fd6 100644 --- a/speech_to_text_handler.go +++ b/speech_to_text_handler.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "strings" + "sync" zlog "github.com/rs/zerolog/log" @@ -25,6 +26,8 @@ type SpeechToTextHandler struct { SampleRate uint32 ChannelCount uint16 LanguageCode string + RetryCount int + mu sync.Mutex OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error } @@ -70,6 +73,24 @@ func (gr *GcpResult) SetMessage(message string) *GcpResult { return gr } +func (h *SpeechToTextHandler) UpdateRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount++ + return h.RetryCount +} + +func (h *SpeechToTextHandler) GetRetryCount() int { + return h.RetryCount +} + +func (h *SpeechToTextHandler) ResetRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount = 0 + return h.RetryCount +} + func (h *SpeechToTextHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { stt := NewSpeechToText(h.Config, h.LanguageCode, int32(h.SampleRate), int32(h.ChannelCount)) @@ -141,8 +162,8 @@ func (h *SpeechToTextHandler) Handle(ctx context.Context, reader io.Reader) (*io Int32("code", status.GetCode()). Send() - // リトライしない設定の場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する - if !*stt.Config.Retry { + // リトライしない設定の場合、または、max_retry を超えた場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する + if (stt.Config.MaxRetry < 1) || (stt.Config.MaxRetry <= h.GetRetryCount()) { if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil { zlog.Error(). Err(err). diff --git a/test_handler.go b/test_handler.go index a8c3912..e264439 100644 --- a/test_handler.go +++ b/test_handler.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "sync" zlog "github.com/rs/zerolog/log" ) @@ -21,6 +22,8 @@ type TestHandler struct { SampleRate uint32 ChannelCount uint16 LanguageCode string + RetryCount int + mu sync.Mutex OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error } @@ -52,6 +55,24 @@ func NewTestResult(channelID, message string) TestResult { } } +func (h *TestHandler) UpdateRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount++ + return h.RetryCount +} + +func (h *TestHandler) GetRetryCount() int { + return h.RetryCount +} + +func (h *TestHandler) ResetRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount = 0 + return h.RetryCount +} + func (h *TestHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { r, w := io.Pipe() diff --git a/test_handler_test.go b/test_handler_test.go index 57541ff..506a6c4 100644 --- a/test_handler_test.go +++ b/test_handler_test.go @@ -243,17 +243,6 @@ func TestSpeechHandler(t *testing.T) { }) t.Run("packet read error", func(t *testing.T) { - logger := log.Logger - defer func() { - log.Logger = logger - }() - - pr, pw, err := os.Pipe() - if err != nil { - t.Fatal(err) - } - log.Logger = zerolog.New(pw).With().Caller().Timestamp().Logger() - r := iotest.ErrReader(errors.New("packet read error")) e := echo.New() @@ -271,14 +260,11 @@ func TestSpeechHandler(t *testing.T) { assert.Equal(t, "packet read error", err.Error()) } - pw.Close() - - var buf bytes.Buffer - n, err := buf.ReadFrom(pr) + line, err := rec.Body.ReadBytes([]byte("\n")[0]) if err != nil { t.Fatal(err) } - assert.Contains(t, buf.String()[:n], "packet read error") + assert.Contains(t, string(line), "packet read error") }) t.Run("silent packet", func(t *testing.T) {