Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: supports batch operation of tasks by filter #753

Merged
merged 7 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 141 additions & 69 deletions pkg/download/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"github.com/rs/zerolog"
"github.com/rs/zerolog/pkgerrors"
"github.com/virtuald/go-paniclog"
"math"
gohttp "net/http"
"net/url"
"os"
Expand Down Expand Up @@ -371,66 +372,104 @@
return d.doCreate(fetcher, opts)
}

func (d *Downloader) Pause(id string) (err error) {
task := d.GetTask(id)
if task == nil {
func (d *Downloader) Pause(filter *TaskFilter) (err error) {
if filter == nil || filter.IsEmpty() {
return d.pauseAll()

Check warning on line 377 in pkg/download/downloader.go

View check run for this annotation

Codecov / codecov/patch

pkg/download/downloader.go#L377

Added line #L377 was not covered by tests
}

filter.NotStatuses = []base.Status{base.DownloadStatusPause, base.DownloadStatusError, base.DownloadStatusDone}
pauseTasks := d.GetTasksByFilter(filter)
if len(pauseTasks) == 0 {
return ErrTaskNotFound
}

if err = d.doPause(task); err != nil {
return
for _, task := range pauseTasks {
if err = d.doPause(task); err != nil {
return

Check warning on line 388 in pkg/download/downloader.go

View check run for this annotation

Codecov / codecov/patch

pkg/download/downloader.go#L388

Added line #L388 was not covered by tests
}
}
d.notifyRunning()

return
}

func (d *Downloader) Continue(id string) (err error) {
task := d.GetTask(id)
if task == nil {
func (d *Downloader) pauseAll() (err error) {
func() {
d.lock.Lock()
defer d.lock.Unlock()

// Clear wait tasks
d.waitTasks = d.waitTasks[:0]
}()

for _, task := range d.tasks {
if err = d.doPause(task); err != nil {
return

Check warning on line 407 in pkg/download/downloader.go

View check run for this annotation

Codecov / codecov/patch

pkg/download/downloader.go#L407

Added line #L407 was not covered by tests
}
}

return
}

// Continue specific tasks, if continue tasks will exceed maxRunning, it needs pause some running tasks before that
func (d *Downloader) Continue(filter *TaskFilter) (err error) {
if filter == nil || filter.IsEmpty() {
return d.continueAll()

Check warning on line 417 in pkg/download/downloader.go

View check run for this annotation

Codecov / codecov/patch

pkg/download/downloader.go#L417

Added line #L417 was not covered by tests
}

filter.NotStatuses = []base.Status{base.DownloadStatusRunning, base.DownloadStatusDone}
continueTasks := d.GetTasksByFilter(filter)
if len(continueTasks) == 0 {
return ErrTaskNotFound
}

realContinueTasks := make([]*Task, 0)
func() {
d.lock.Lock()
defer d.lock.Unlock()

continueCount := len(continueTasks)
remainRunningCount := d.remainRunningCount()
if remainRunningCount == 0 {
for _, t := range d.tasks {
if t.Status == base.DownloadStatusRunning {
if err = d.doPause(t); err != nil {
needRunningCount := int(math.Min(float64(d.cfg.MaxRunning), float64(continueCount)))
needPauseCount := needRunningCount - remainRunningCount
if needPauseCount > 0 {
pausedCount := 0
for _, task := range d.tasks {
if task.Status == base.DownloadStatusRunning {
if err = d.doPause(task); err != nil {

Check warning on line 439 in pkg/download/downloader.go

View check run for this annotation

Codecov / codecov/patch

pkg/download/downloader.go#L436-L439

Added lines #L436 - L439 were not covered by tests
return
}
t.Status = base.DownloadStatusWait
d.waitTasks = append(d.waitTasks, t)
task.Status = base.DownloadStatusWait
d.waitTasks = append(d.waitTasks, task)
pausedCount++

Check warning on line 444 in pkg/download/downloader.go

View check run for this annotation

Codecov / codecov/patch

pkg/download/downloader.go#L442-L444

Added lines #L442 - L444 were not covered by tests
}
if pausedCount == needPauseCount {

Check warning on line 446 in pkg/download/downloader.go

View check run for this annotation

Codecov / codecov/patch

pkg/download/downloader.go#L446

Added line #L446 was not covered by tests
break
}
}
}
}()

return d.doStart(task)
}

func (d *Downloader) PauseAll() (err error) {
func() {
d.lock.Lock()
defer d.lock.Unlock()

// Clear wait tasks
d.waitTasks = d.waitTasks[:0]
for _, task := range continueTasks {
if len(realContinueTasks) < needRunningCount {
realContinueTasks = append(realContinueTasks, task)
} else {
task.Status = base.DownloadStatusWait
d.waitTasks = append(d.waitTasks, task)

Check warning on line 457 in pkg/download/downloader.go

View check run for this annotation

Codecov / codecov/patch

pkg/download/downloader.go#L456-L457

Added lines #L456 - L457 were not covered by tests
}
}
}()

for _, task := range d.tasks {
if err = d.doPause(task); err != nil {
for _, task := range realContinueTasks {
if err = d.doStart(task); err != nil {
return
}
}

return
}

func (d *Downloader) ContinueAll() (err error) {
// continueAll continue all tasks but does not affect tasks already running
func (d *Downloader) continueAll() (err error) {

Check warning on line 472 in pkg/download/downloader.go

View check run for this annotation

Codecov / codecov/patch

pkg/download/downloader.go#L472

Added line #L472 was not covered by tests
continuedTasks := make([]*Task, 0)

func() {
Expand All @@ -451,49 +490,34 @@
}()

for _, task := range continuedTasks {
tt := task
if err = d.doStart(tt); err != nil {
if err = d.doStart(task); err != nil {

Check warning on line 493 in pkg/download/downloader.go

View check run for this annotation

Codecov / codecov/patch

pkg/download/downloader.go#L493

Added line #L493 was not covered by tests
return
}
}

return
}

func (d *Downloader) Delete(id string, force bool) (err error) {
task := d.GetTask(id)
if task == nil {
return ErrTaskNotFound
func (d *Downloader) ContinueBatch(filter *TaskFilter) (err error) {
if filter == nil || filter.IsEmpty() {
return d.continueAll()

Check warning on line 503 in pkg/download/downloader.go

View check run for this annotation

Codecov / codecov/patch

pkg/download/downloader.go#L501-L503

Added lines #L501 - L503 were not covered by tests
}

func() {
d.lock.Lock()
defer d.lock.Unlock()

for i, t := range d.tasks {
if t.ID == id {
d.tasks = append(d.tasks[:i], d.tasks[i+1:]...)
break
}
}
for i, t := range d.waitTasks {
if t.ID == id {
d.waitTasks = append(d.waitTasks[:i], d.waitTasks[i+1:]...)
break
}
continueTasks := d.GetTasksByFilter(filter)
for _, task := range continueTasks {
if err = d.doStart(task); err != nil {
return

Check warning on line 509 in pkg/download/downloader.go

View check run for this annotation

Codecov / codecov/patch

pkg/download/downloader.go#L506-L509

Added lines #L506 - L509 were not covered by tests
}
}()

err = d.doDelete(task, force)
if err != nil {
return
}
d.notifyRunning()
return
}

func (d *Downloader) DeleteByStatues(statues []base.Status, force bool) (err error) {
deleteTasks := d.GetTasksByStatues(statues)
func (d *Downloader) Delete(filter *TaskFilter, force bool) (err error) {
if filter == nil || filter.IsEmpty() {
return d.deleteAll()
}

deleteTasks := d.GetTasksByFilter(filter)

Check warning on line 520 in pkg/download/downloader.go

View check run for this annotation

Codecov / codecov/patch

pkg/download/downloader.go#L520

Added line #L520 was not covered by tests
if len(deleteTasks) == 0 {
return
}
Expand Down Expand Up @@ -535,6 +559,23 @@
return
}

func (d *Downloader) deleteAll() (err error) {
func() {
d.lock.Lock()
defer d.lock.Unlock()

d.tasks = make([]*Task, 0)
d.waitTasks = make([]*Task, 0)
}()

for _, task := range d.tasks {
if err = d.doDelete(task, true); err != nil {
return

Check warning on line 573 in pkg/download/downloader.go

View check run for this annotation

Codecov / codecov/patch

pkg/download/downloader.go#L572-L573

Added lines #L572 - L573 were not covered by tests
}
}
return
}

func (d *Downloader) Stats(id string) (sr any, err error) {
task := d.GetTask(id)
if task == nil {
Expand All @@ -557,9 +598,6 @@

func (d *Downloader) doDelete(task *Task, force bool) (err error) {
err = func() error {
d.lock.Lock()
defer d.lock.Unlock()

if err := d.storage.Delete(bucketTask, task.ID); err != nil {
return err
}
Expand Down Expand Up @@ -587,18 +625,18 @@
task = nil
return nil
}()

if err != nil {
d.Logger.Error().Stack().Err(err).Msgf("delete task failed, task id: %s", task.ID)
}

return
}

func (d *Downloader) Close() error {
d.closed.Store(true)

closeArr := []func() error{
d.PauseAll,
d.pauseAll,
}
for _, fm := range d.cfg.FetchManagers {
closeArr = append(closeArr, fm.Close)
Expand Down Expand Up @@ -660,18 +698,52 @@
return d.tasks
}

func (d *Downloader) GetTasksByStatues(statues []base.Status) []*Task {
if len(statues) == 0 {
// GetTasksByFilter get tasks by filter, if filter is nil, return all tasks
// return tasks and if match all tasks
func (d *Downloader) GetTasksByFilter(filter *TaskFilter) []*Task {
if filter == nil || filter.IsEmpty() {
return d.tasks
}
tasks := make([]*Task, 0)
for _, task := range d.tasks {
for _, status := range statues {

idMatch := func(task *Task) bool {
if len(filter.IDs) == 0 {
return true
}
for _, id := range filter.IDs {
if task.ID == id {
return true
}
}
return false
}
statusMatch := func(task *Task) bool {
if len(filter.Statuses) == 0 {
return true
}
for _, status := range filter.Statuses {
if task.Status == status {
tasks = append(tasks, task)
break
return true
}
}
return false
}
notStatusMatch := func(task *Task) bool {
if len(filter.NotStatuses) == 0 {
return true
}
for _, status := range filter.NotStatuses {
if task.Status == status {
return false
}
}
return true
}

tasks := make([]*Task, 0)
for _, task := range d.tasks {
if idMatch(task) && statusMatch(task) && notStatusMatch(task) {
tasks = append(tasks, task)
}
}
return tasks
}
Expand Down
Loading
Loading