From 7c853c04db03dcfef5607bd7bd7fa2f7730d309e Mon Sep 17 00:00:00 2001 From: wrench Date: Tue, 28 Nov 2023 11:40:21 +0530 Subject: [PATCH] feat: add multi-client support --- README.md | 33 ++++++++++- bot/client.go | 2 +- bot/workers.go | 144 +++++++++++++++++++++++++++++++++++++++++++++ commands/stream.go | 4 +- config/config.go | 14 +++++ go.mod | 8 +-- go.sum | 8 +++ main.go | 3 +- routes/stream.go | 6 +- utils/helpers.go | 15 ++--- utils/reader.go | 6 +- 11 files changed, 221 insertions(+), 22 deletions(-) create mode 100644 bot/workers.go diff --git a/README.md b/README.md index e11ac4a4..d3cd1fb8 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,9 @@ +
  • Contributing
  • Contact me
  • @@ -73,10 +76,13 @@ An example of `.env` file: ```sh API_ID=452525 API_HASH=esx576f8738x883f3sfzx83 -BOT_TOKEN=55838383:yourtbottokenhere +BOT_TOKEN=55838383:yourbottokenhere BIN_CHANNEL=-10045145224562 PORT=8080 HOST=http://yourserverip +# (if you want to set up multiple bots) +MULTI_TOKEN1=55838373:yourworkerbottokenhere +MULTI_TOKEN2=55838355:yourworkerbottokenhere ``` ### Required Vars @@ -97,7 +103,30 @@ In addition to the mandatory variables, you can also set the following optional - `HOST` : A Fully Qualified Domain Name if present or use your server IP. (eg. `https://example.com` or `http://14.1.154.2:8080`) -- `HASH_LENGTH` : This is the custom hash length for generated URLs. The hash length must be greater than 5 and less than or equal to 32. The default value is 6. +- `HASH_LENGTH` : Custom hash length for generated URLs. The hash length must be greater than 5 and less than or equal to 32. The default value is 6. + +- `USE_SESSION_FILE` : Use session files for worker client(s). This speeds up the worker bot startups. (default: `false`) + +### Use Multiple Bots to speed up + +> **Note** +> What it multi-client feature and what it does?
    +> This feature shares the Telegram API requests between worker bots to speed up download speed when many users are using the server and to avoid the flood limits that are set by Telegram.
    + +> **Note** +> You can add up to 50 bots since 50 is the max amount of bot admins you can set in a Telegram Channel. + +To enable multi-client, generate new bot tokens and add it as your `.env` with the following key names. + +`MULTI_TOKEN1`: Add your first bot token here. + +`MULTI_TOKEN2`: Add your second bot token here. + +you may also add as many as bots you want. (max limit is 50) +`MULTI_TOKEN3`, `MULTI_TOKEN4`, etc. + +> **Warning** +> Don't forget to add all these worker bots to the `BIN_CHANNEL` for the proper functioning ## Contributing diff --git a/bot/client.go b/bot/client.go index 2f440dbc..bf885cf9 100644 --- a/bot/client.go +++ b/bot/client.go @@ -20,7 +20,7 @@ func StartClient(log *zap.Logger) (*gotgproto.Client, error) { BotToken: config.ValueOf.BotToken, }, &gotgproto.ClientOpts{ - Session: sessionMaker.NewSession("fsb", sessionMaker.Session), + Session: sessionMaker.SqliteSession("fsb"), DisableCopyright: true, }, ) diff --git a/bot/workers.go b/bot/workers.go new file mode 100644 index 00000000..e18bfb3b --- /dev/null +++ b/bot/workers.go @@ -0,0 +1,144 @@ +package bot + +import ( + "EverythingSuckz/fsb/config" + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/celestix/gotgproto" + "github.com/celestix/gotgproto/sessionMaker" + "github.com/gotd/td/tg" + "go.uber.org/zap" +) + +type Worker struct { + ID int + Client *gotgproto.Client + Self *tg.User + log *zap.Logger +} + +func (w *Worker) String() string { + return fmt.Sprintf("{Worker (%d|@%s)}", w.ID, w.Self.Username) +} + +type BotWorkers struct { + Bots []*Worker + starting int + index int + mut sync.Mutex + log *zap.Logger +} + +var Workers *BotWorkers = &BotWorkers{ + log: nil, + Bots: make([]*Worker, 0), +} + +func (w *BotWorkers) Init(log *zap.Logger) { + w.log = log.Named("Workers") +} + +func (w *BotWorkers) AddDefaultClient(client *gotgproto.Client, self *tg.User) { + if w.Bots == nil { + w.Bots = make([]*Worker, 0) + } + w.incStarting() + w.Bots = append(w.Bots, &Worker{ + Client: client, + ID: w.starting, + Self: self, + }) + w.log.Sugar().Info("Default bot loaded") +} + +func (w *BotWorkers) incStarting() { + w.mut.Lock() + defer w.mut.Unlock() + w.starting++ +} + +func (w *BotWorkers) Add(token string) (err error) { + w.incStarting() + var botID int = w.starting + client, err := startWorker(w.log, token, botID) + if err != nil { + return err + } + w.log.Sugar().Infof("Bot @%s loaded with ID %d", client.Self.Username, botID) + w.Bots = append(w.Bots, &Worker{ + Client: client, + ID: botID, + Self: client.Self, + log: w.log, + }) + return nil +} + +func GetNextWorker() *Worker { + Workers.mut.Lock() + defer Workers.mut.Unlock() + index := (Workers.index + 1) % len(Workers.Bots) + Workers.index = index + worker := Workers.Bots[index] + Workers.log.Sugar().Infof("Using worker %d", worker.ID) + return worker +} + +func StartWorkers(log *zap.Logger) { + log.Sugar().Info("Starting workers") + Workers.Init(log) + if config.ValueOf.UseSessionFile { + log.Sugar().Info("Using session file for workers") + newpath := filepath.Join(".", "sessions") + err := os.MkdirAll(newpath, os.ModePerm) + if err != nil { + log.Error("Failed to create sessions directory", zap.Error(err)) + return + } + } + c := make(chan struct{}) + for i := 0; i < len(config.ValueOf.MultiTokens); i++ { + go func(i int) { + err := Workers.Add(config.ValueOf.MultiTokens[i]) + if err != nil { + log.Error("Failed to start worker", zap.Error(err)) + return + } + c <- struct{}{} + }(i) + } + // wait for all workers to start + log.Sugar().Info("Waiting for all workers to start") + for i := 0; i < len(config.ValueOf.MultiTokens); i++ { + <-c + } +} + +func startWorker(l *zap.Logger, botToken string, index int) (*gotgproto.Client, error) { + log := l.Named("Worker").Sugar() + log.Infof("Starting worker with index - %d", index) + var sessionType *sessionMaker.SqliteSessionConstructor + if config.ValueOf.UseSessionFile { + sessionType = sessionMaker.SqliteSession(fmt.Sprintf("sessions/worker-%d", index)) + } else { + sessionType = sessionMaker.SqliteSession(":memory:") + } + client, err := gotgproto.NewClient( + int(config.ValueOf.ApiID), + config.ValueOf.ApiHash, + gotgproto.ClientType{ + BotToken: botToken, + }, + &gotgproto.ClientOpts{ + Session: sessionType, + DisableCopyright: true, + }, + ) + if err != nil { + return nil, err + } + return client, nil +} diff --git a/commands/stream.go b/commands/stream.go index b2b9a78d..026073b5 100644 --- a/commands/stream.go +++ b/commands/stream.go @@ -27,11 +27,11 @@ func sendLink(ctx *ext.Context, u *ext.Update) error { return dispatcher.EndGroups } chatId := u.EffectiveChat().GetID() - peerChatId := storage.GetPeerById(chatId) + peerChatId := ctx.PeerStorage.GetPeerById(chatId) if peerChatId.Type != int(storage.TypeUser) { return dispatcher.EndGroups } - peer := storage.GetPeerById(config.ValueOf.LogChannelID) + peer := ctx.PeerStorage.GetPeerById(config.ValueOf.LogChannelID) switch storage.EntityType(peer.Type) { case storage.TypeChat: return dispatcher.EndGroups diff --git a/config/config.go b/config/config.go index d87510fd..6bde978b 100644 --- a/config/config.go +++ b/config/config.go @@ -1,6 +1,9 @@ package config import ( + "os" + "reflect" + "regexp" "strconv" "strings" @@ -20,13 +23,24 @@ type config struct { Port int `envconfig:"PORT" default:"8080"` Host string `envconfig:"HOST" default:"http://localhost:8080"` HashLength int `envconfig:"HASH_LENGTH" default:"6"` + UseSessionFile bool `envconfig:"USE_SESSION_FILE" default:"true"` + MultiTokens []string } +var botTokenRegex = regexp.MustCompile(`MULTI\_TOKEN[\d+]=(.*)`) + func (c *config) setupEnvVars() { err := envconfig.Process("", c) if err != nil { panic(err) } + val := reflect.ValueOf(c).Elem() + for _, env := range os.Environ() { + if strings.HasPrefix(env, "MULTI_TOKEN") { + c.MultiTokens = append(c.MultiTokens, botTokenRegex.FindStringSubmatch(env)[1]) + } + } + val.FieldByName("MultiTokens").Set(reflect.ValueOf(c.MultiTokens)) } func Load(log *zap.Logger) { diff --git a/go.mod b/go.mod index f80e83e9..d7287636 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module EverythingSuckz/fsb go 1.21.3 require ( - github.com/celestix/gotgproto v1.0.0-beta13 + github.com/celestix/gotgproto v1.0.0-beta13.0.20231124171805-6c04fae60b80 github.com/gin-gonic/gin v1.9.1 github.com/gotd/td v0.89.0 github.com/joho/godotenv v1.5.1 @@ -58,9 +58,9 @@ require ( github.com/ugorji/go/codec v1.2.11 // indirect go.uber.org/zap v1.26.0 golang.org/x/arch v0.3.0 // indirect - golang.org/x/crypto v0.15.0 // indirect - golang.org/x/net v0.18.0 // indirect - golang.org/x/sys v0.14.0 // indirect + golang.org/x/crypto v0.16.0 // indirect + golang.org/x/net v0.19.0 // indirect + golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 diff --git a/go.sum b/go.sum index 6d3bf0f9..7da29ab7 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= github.com/celestix/gotgproto v1.0.0-beta13 h1:5BlGUJMwJmXrWD9RhBbHRuJhbPkv5CJd04x/sDCpYeg= github.com/celestix/gotgproto v1.0.0-beta13/go.mod h1:WHwqFwgXEpFN/2ReP+vVnxCs2IvULaRK7n0N5ouVmDw= +github.com/celestix/gotgproto v1.0.0-beta13.0.20231124171805-6c04fae60b80 h1:OhekKvJhPQx7jkPVomt7rD8tAlnel4l/sR6X7D9Pkpw= +github.com/celestix/gotgproto v1.0.0-beta13.0.20231124171805-6c04fae60b80/go.mod h1:sPTsFAhN6apWcxCLc07LFEkb9wuoQa9L7JxXl6znLY4= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= @@ -124,10 +126,14 @@ golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k= golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA= golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g= +golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= +golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20230116083435-1de6713980de h1:DBWn//IJw30uYCgERoxCg84hWtA97F4wMiKOIh00Uf0= golang.org/x/exp v0.0.0-20230116083435-1de6713980de/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= @@ -135,6 +141,8 @@ golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/main.go b/main.go index 29b489bf..c1db528b 100644 --- a/main.go +++ b/main.go @@ -26,13 +26,14 @@ func main() { mainLogger.Info("Starting server") config.Load(log) router := getRouter(log) - + _, err := bot.StartClient(log) if err != nil { log.Info(err.Error()) return } cache.InitCache(log) + bot.StartWorkers(log) mainLogger.Info("Server started", zap.Int("port", config.ValueOf.Port)) mainLogger.Info("File Stream Bot", zap.String("version", versionString)) err = router.Run(fmt.Sprintf(":%d", config.ValueOf.Port)) diff --git a/routes/stream.go b/routes/stream.go index 2eee3e48..2fbaf277 100644 --- a/routes/stream.go +++ b/routes/stream.go @@ -43,7 +43,9 @@ func getStreamRoute(ctx *gin.Context) { var start, end int64 rangeHeader := r.Header.Get("Range") - file, err := utils.FileFromMessage(ctx, bot.Bot.Client, messageID) + worker := bot.GetNextWorker() + + file, err := utils.FileFromMessage(ctx, worker.Client, messageID) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -100,7 +102,7 @@ func getStreamRoute(ctx *gin.Context) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - lr, _ := utils.NewTelegramReader(ctx, bot.Bot.Client, file.Location, start, end, contentLength) + lr, _ := utils.NewTelegramReader(ctx, worker.Client, file.Location, start, end, contentLength) if _, err := io.CopyN(w, lr, contentLength); err != nil { log.WithOptions(zap.AddStacktrace(zap.DPanicLevel)).Error(err.Error()) } diff --git a/utils/helpers.go b/utils/helpers.go index cad92d23..ac0022e9 100644 --- a/utils/helpers.go +++ b/utils/helpers.go @@ -8,11 +8,12 @@ import ( "errors" "fmt" - "github.com/gotd/td/telegram" + "github.com/celestix/gotgproto" "github.com/gotd/td/tg" + "go.uber.org/zap" ) -func GetTGMessage(ctx context.Context, client *telegram.Client, messageID int) (*tg.Message, error) { +func GetTGMessage(ctx context.Context, client *gotgproto.Client, messageID int) (*tg.Message, error) { inputMessageID := tg.InputMessageClass(&tg.InputMessageID{ID: messageID}) channel, err := GetChannelById(ctx, client) if err != nil { @@ -58,16 +59,16 @@ func FileFromMedia(media tg.MessageMediaClass) (*types.File, error) { return nil, fmt.Errorf("unexpected type %T", media) } -func FileFromMessage(ctx context.Context, client *telegram.Client, messageID int) (*types.File, error) { - key := fmt.Sprintf("file:%d", messageID) +func FileFromMessage(ctx context.Context, client *gotgproto.Client, messageID int) (*types.File, error) { + key := fmt.Sprintf("file:%d:%d", messageID, client.Self.ID) log := Logger.Named("GetMessageMedia") var cachedMedia types.File err := cache.GetCache().Get(key, &cachedMedia) if err == nil { - log.Sugar().Debug("Using cached media message properties") + log.Debug("Using cached media message properties", zap.Int("messageID", messageID), zap.Int64("clientID", client.Self.ID)) return &cachedMedia, nil } - log.Sugar().Debug("Fetching file properties from message ID") + log.Debug("Fetching file properties from message ID", zap.Int("messageID", messageID), zap.Int64("clientID", client.Self.ID)) message, err := GetTGMessage(ctx, client, messageID) if err != nil { return nil, err @@ -88,7 +89,7 @@ func FileFromMessage(ctx context.Context, client *telegram.Client, messageID int // TODO: add photo support } -func GetChannelById(ctx context.Context, client *telegram.Client) (*tg.InputChannel, error) { +func GetChannelById(ctx context.Context, client *gotgproto.Client) (*tg.InputChannel, error) { channel := &tg.InputChannel{} inputChannel := &tg.InputChannel{ ChannelID: config.ValueOf.LogChannelID, diff --git a/utils/reader.go b/utils/reader.go index aaac6998..3a4c102e 100644 --- a/utils/reader.go +++ b/utils/reader.go @@ -5,7 +5,7 @@ import ( "fmt" "io" - "github.com/gotd/td/telegram" + "github.com/celestix/gotgproto" "github.com/gotd/td/tg" "go.uber.org/zap" ) @@ -13,7 +13,7 @@ import ( type telegramReader struct { ctx context.Context log *zap.Logger - client *telegram.Client + client *gotgproto.Client location *tg.InputDocumentFileLocation start int64 end int64 @@ -31,7 +31,7 @@ func (*telegramReader) Close() error { func NewTelegramReader( ctx context.Context, - client *telegram.Client, + client *gotgproto.Client, location *tg.InputDocumentFileLocation, start int64, end int64,