Skip to content

Commit

Permalink
feat: supports batch operation of tasks by filter (#753)
Browse files Browse the repository at this point in the history
  • Loading branch information
monkeyWie authored Sep 21, 2024
1 parent 2dc7048 commit bdb8ed1
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 105 deletions.
210 changes: 141 additions & 69 deletions pkg/download/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/pkgerrors"
"github.com/virtuald/go-paniclog"
"math"
gohttp "net/http"
"net/url"
"os"
Expand Down Expand Up @@ -371,66 +372,104 @@ func (d *Downloader) Create(rrId string, opts *base.Options) (taskId string, err
return d.doCreate(fetcher, opts)
}

func (d *Downloader) Pause(id string) (err error) {
task := d.GetTask(id)
if task == nil {
func (d *Downloader) Pause(filter *TaskFilter) (err error) {
if filter == nil || filter.IsEmpty() {
return d.pauseAll()
}

filter.NotStatuses = []base.Status{base.DownloadStatusPause, base.DownloadStatusError, base.DownloadStatusDone}
pauseTasks := d.GetTasksByFilter(filter)
if len(pauseTasks) == 0 {
return ErrTaskNotFound
}

if err = d.doPause(task); err != nil {
return
for _, task := range pauseTasks {
if err = d.doPause(task); err != nil {
return
}
}
d.notifyRunning()

return
}

func (d *Downloader) Continue(id string) (err error) {
task := d.GetTask(id)
if task == nil {
func (d *Downloader) pauseAll() (err error) {
func() {
d.lock.Lock()
defer d.lock.Unlock()

// Clear wait tasks
d.waitTasks = d.waitTasks[:0]
}()

for _, task := range d.tasks {
if err = d.doPause(task); err != nil {
return
}
}

return
}

// Continue specific tasks, if continue tasks will exceed maxRunning, it needs pause some running tasks before that
func (d *Downloader) Continue(filter *TaskFilter) (err error) {
if filter == nil || filter.IsEmpty() {
return d.continueAll()
}

filter.NotStatuses = []base.Status{base.DownloadStatusRunning, base.DownloadStatusDone}
continueTasks := d.GetTasksByFilter(filter)
if len(continueTasks) == 0 {
return ErrTaskNotFound
}

realContinueTasks := make([]*Task, 0)
func() {
d.lock.Lock()
defer d.lock.Unlock()

continueCount := len(continueTasks)
remainRunningCount := d.remainRunningCount()
if remainRunningCount == 0 {
for _, t := range d.tasks {
if t.Status == base.DownloadStatusRunning {
if err = d.doPause(t); err != nil {
needRunningCount := int(math.Min(float64(d.cfg.MaxRunning), float64(continueCount)))
needPauseCount := needRunningCount - remainRunningCount
if needPauseCount > 0 {
pausedCount := 0
for _, task := range d.tasks {
if task.Status == base.DownloadStatusRunning {
if err = d.doPause(task); err != nil {
return
}
t.Status = base.DownloadStatusWait
d.waitTasks = append(d.waitTasks, t)
task.Status = base.DownloadStatusWait
d.waitTasks = append(d.waitTasks, task)
pausedCount++
}
if pausedCount == needPauseCount {
break
}
}
}
}()

return d.doStart(task)
}

func (d *Downloader) PauseAll() (err error) {
func() {
d.lock.Lock()
defer d.lock.Unlock()

// Clear wait tasks
d.waitTasks = d.waitTasks[:0]
for _, task := range continueTasks {
if len(realContinueTasks) < needRunningCount {
realContinueTasks = append(realContinueTasks, task)
} else {
task.Status = base.DownloadStatusWait
d.waitTasks = append(d.waitTasks, task)
}
}
}()

for _, task := range d.tasks {
if err = d.doPause(task); err != nil {
for _, task := range realContinueTasks {
if err = d.doStart(task); err != nil {
return
}
}

return
}

func (d *Downloader) ContinueAll() (err error) {
// continueAll continue all tasks but does not affect tasks already running
func (d *Downloader) continueAll() (err error) {
continuedTasks := make([]*Task, 0)

func() {
Expand All @@ -451,49 +490,34 @@ func (d *Downloader) ContinueAll() (err error) {
}()

for _, task := range continuedTasks {
tt := task
if err = d.doStart(tt); err != nil {
if err = d.doStart(task); err != nil {
return
}
}

return
}

func (d *Downloader) Delete(id string, force bool) (err error) {
task := d.GetTask(id)
if task == nil {
return ErrTaskNotFound
func (d *Downloader) ContinueBatch(filter *TaskFilter) (err error) {
if filter == nil || filter.IsEmpty() {
return d.continueAll()
}

func() {
d.lock.Lock()
defer d.lock.Unlock()

for i, t := range d.tasks {
if t.ID == id {
d.tasks = append(d.tasks[:i], d.tasks[i+1:]...)
break
}
}
for i, t := range d.waitTasks {
if t.ID == id {
d.waitTasks = append(d.waitTasks[:i], d.waitTasks[i+1:]...)
break
}
continueTasks := d.GetTasksByFilter(filter)
for _, task := range continueTasks {
if err = d.doStart(task); err != nil {
return
}
}()

err = d.doDelete(task, force)
if err != nil {
return
}
d.notifyRunning()
return
}

func (d *Downloader) DeleteByStatues(statues []base.Status, force bool) (err error) {
deleteTasks := d.GetTasksByStatues(statues)
func (d *Downloader) Delete(filter *TaskFilter, force bool) (err error) {
if filter == nil || filter.IsEmpty() {
return d.deleteAll()
}

deleteTasks := d.GetTasksByFilter(filter)
if len(deleteTasks) == 0 {
return
}
Expand Down Expand Up @@ -535,6 +559,23 @@ func (d *Downloader) DeleteByStatues(statues []base.Status, force bool) (err err
return
}

func (d *Downloader) deleteAll() (err error) {
func() {
d.lock.Lock()
defer d.lock.Unlock()

d.tasks = make([]*Task, 0)
d.waitTasks = make([]*Task, 0)
}()

for _, task := range d.tasks {
if err = d.doDelete(task, true); err != nil {
return
}
}
return
}

func (d *Downloader) Stats(id string) (sr any, err error) {
task := d.GetTask(id)
if task == nil {
Expand All @@ -557,9 +598,6 @@ func (d *Downloader) Stats(id string) (sr any, err error) {

func (d *Downloader) doDelete(task *Task, force bool) (err error) {
err = func() error {
d.lock.Lock()
defer d.lock.Unlock()

if err := d.storage.Delete(bucketTask, task.ID); err != nil {
return err
}
Expand Down Expand Up @@ -587,18 +625,18 @@ func (d *Downloader) doDelete(task *Task, force bool) (err error) {
task = nil
return nil
}()

if err != nil {
d.Logger.Error().Stack().Err(err).Msgf("delete task failed, task id: %s", task.ID)
}

return
}

func (d *Downloader) Close() error {
d.closed.Store(true)

closeArr := []func() error{
d.PauseAll,
d.pauseAll,
}
for _, fm := range d.cfg.FetchManagers {
closeArr = append(closeArr, fm.Close)
Expand Down Expand Up @@ -660,18 +698,52 @@ func (d *Downloader) GetTasks() []*Task {
return d.tasks
}

func (d *Downloader) GetTasksByStatues(statues []base.Status) []*Task {
if len(statues) == 0 {
// GetTasksByFilter get tasks by filter, if filter is nil, return all tasks
// return tasks and if match all tasks
func (d *Downloader) GetTasksByFilter(filter *TaskFilter) []*Task {
if filter == nil || filter.IsEmpty() {
return d.tasks
}
tasks := make([]*Task, 0)
for _, task := range d.tasks {
for _, status := range statues {

idMatch := func(task *Task) bool {
if len(filter.IDs) == 0 {
return true
}
for _, id := range filter.IDs {
if task.ID == id {
return true
}
}
return false
}
statusMatch := func(task *Task) bool {
if len(filter.Statuses) == 0 {
return true
}
for _, status := range filter.Statuses {
if task.Status == status {
tasks = append(tasks, task)
break
return true
}
}
return false
}
notStatusMatch := func(task *Task) bool {
if len(filter.NotStatuses) == 0 {
return true
}
for _, status := range filter.NotStatuses {
if task.Status == status {
return false
}
}
return true
}

tasks := make([]*Task, 0)
for _, task := range d.tasks {
if idMatch(task) && statusMatch(task) && notStatusMatch(task) {
tasks = append(tasks, task)
}
}
return tasks
}
Expand Down
Loading

0 comments on commit bdb8ed1

Please sign in to comment.