Skip to content

Commit

Permalink
feat: auto resume failed but recoverable tasks (#67)
Browse files Browse the repository at this point in the history
* feat: auto resume failed but recoverable tasks

* fix: make program not crash even the data is wrong

* feat: resume is actually schedule_resume

* test: runresume command
  • Loading branch information
xyb authored Jul 11, 2023
1 parent 362301f commit 38b477a
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 6 deletions.
1 change: 1 addition & 0 deletions baidupcsleecher/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
RUNNER_SLEEP_SECONDS = int(getenv("RUNNER_SLEEP_SECONDS", "5"))
SAMPLE_SIZE = int(getenv("SAMPLE_SIZE", "10240"))
FULL_DOWNLOAD_IMMEDIATELY = bool(int(getenv("FULL_DOWNLOAD_IMMEDIATELY", 0)))
RETRY_TIMES_LIMIT = int(getenv("RETRY_TIMES_LIMIT", 5))
# shared link transfer policy: always, if_not_present
TRANSFER_POLICY = getenv("TRANSFER_POLICY", "if_not_present")
PAN_BAIDU_BDUSS = getenv("PAN_BAIDU_BDUSS", "")
Expand Down
1 change: 1 addition & 0 deletions entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ echo
python manage.py runtransfer &
python manage.py runsamplingdownloader &
python manage.py runleecher &
python manage.py runresume &
echo

wait -n
Expand Down
37 changes: 37 additions & 0 deletions task/management/commands/runresume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging
from time import sleep

from django.conf import settings
from django.core.management.base import BaseCommand

from task.models import Task

logger = logging.getLogger("runresume")


class Command(BaseCommand):
help = "resume failed but recoverable tasks."

def add_arguments(self, parser):
parser.add_argument(
"--once",
action="store_true",
help="resume all failed tasks once and exit immediately.",
)

def resume_once(self):
for task in Task.filter_failed():
if not task.recoverable:
continue
if task.retry_times >= settings.RETRY_TIMES_LIMIT:
continue
task.schedule_resume()

def handle(self, *args, **options):
logger.info("auto resume failed but recoverable tasks.")
while True:
self.resume_once()
if options["once"]:
logger.info("auto resume tasks once and exit now.")
return
sleep(settings.RUNNER_SLEEP_SECONDS)
14 changes: 9 additions & 5 deletions task/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,22 +222,25 @@ def get_resume_method_name(self):
"downloading_files": "restart_downloading",
}
step_name = self.get_current_step()
return resume_methods[step_name]
if step_name:
return resume_methods[step_name]

def inc_retry_times(self):
self.retry_times += 1
self.save()

def resume(self):
def schedule_resume(self):
if not self.failed:
return
method = getattr(self, self.get_resume_method_name())
method()
method_name = self.get_resume_method_name()
if method_name:
method = getattr(self, method_name)
method()

@classmethod
def schedule_resume_failed(cls):
for task in cls.filter_failed():
task.resume()
task.schedule_resume()

@property
def done(self):
Expand Down Expand Up @@ -267,4 +270,5 @@ def recoverable(self):
if "error_code: 105," in e or "啊哦,链接错误没找到文件,请打开正确的分享链接" in e:
return False

# assume task is not recoverable by default to avoid flood requests
return False
64 changes: 64 additions & 0 deletions task/tests/test_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from django.conf import settings
from django.core.management import call_command
from django.test import TestCase

from task.management.commands.runresume import Command as ResumeCommand
from task.models import Task


class ResumeCommandTest(TestCase):
def setUp(self):
self.task = Task.objects.create(shared_id="foo", shared_password="foo")
self.task.status = Task.Status.SAMPLING_DOWNLOADED
self.task.started_at = self.task.created_at
self.task.transfer_completed_at = self.task.created_at
self.task.sample_downloaded_at = self.task.created_at
self.task.failed = True
self.task.message = "BaiduPCS._request"
self.task.save()

def test_resume(self):
assert self.task.retry_times == 0

ResumeCommand().resume_once()

task = Task.objects.get(pk=self.task.id)
assert task.status == Task.Status.TRANSFERRED
assert not task.failed
assert task.retry_times == 1

def test_resume_not_recoverable_task(self):
self.task.message = "error_code: 105,"
self.task.save()
assert self.task.retry_times == 0

ResumeCommand().resume_once()

task = Task.objects.get(pk=self.task.id)
assert task.retry_times == 0
assert task.failed
assert task.status == Task.Status.SAMPLING_DOWNLOADED

def test_resume_too_many_times(self):
self.task.retry_times = settings.RETRY_TIMES_LIMIT + 1
self.task.save()
assert self.task.retry_times == settings.RETRY_TIMES_LIMIT + 1
assert self.task.get_resume_method_name() == "restart_downloading"

ResumeCommand().resume_once()

task = Task.objects.get(pk=self.task.id)
assert task.retry_times == settings.RETRY_TIMES_LIMIT + 1
assert task.get_resume_method_name() == "restart_downloading"
assert task.failed
assert task.status == Task.Status.SAMPLING_DOWNLOADED

def test_run_command(self):
assert self.task.retry_times == 0

call_command("runresume", "--once")

task = Task.objects.get(pk=self.task.id)
assert task.status == Task.Status.TRANSFERRED
assert not task.failed
assert task.retry_times == 1
22 changes: 22 additions & 0 deletions task/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def test_steps1_failed(self):
]
assert self.task.get_current_step() == "waiting_assign"
assert not self.task.done
assert self.task.get_resume_method_name() == "restart"

def test_steps2_failed(self):
self.task.status = self.task.Status.STARTED
Expand All @@ -103,6 +104,7 @@ def test_steps2_failed(self):
]
assert self.task.get_current_step() == "transferring"
assert not self.task.done
assert self.task.get_resume_method_name() == "restart"

def test_steps3_failed(self):
self.task.status = self.task.Status.TRANSFERRED
Expand All @@ -118,6 +120,7 @@ def test_steps3_failed(self):
]
assert self.task.get_current_step() == "downloading_samplings"
assert not self.task.done
assert self.task.get_resume_method_name() == "restart_downloading"

def test_steps4_failed(self):
self.task.status = self.task.Status.SAMPLING_DOWNLOADED
Expand All @@ -134,6 +137,25 @@ def test_steps4_failed(self):
]
assert self.task.get_current_step() == "downloading_files"
assert not self.task.done
assert self.task.get_resume_method_name() == "restart_downloading"

def test_steps5_failed_but_yes_it_should_not_happend(self):
self.task.status = self.task.Status.SAMPLING_DOWNLOADED
self.task.started_at = self.task.created_at
self.task.transfer_completed_at = self.task.created_at
self.task.sample_downloaded_at = self.task.created_at
self.task.full_downloaded_at = self.task.created_at
self.task.failed = True

assert list(self.task.get_steps()) == [
("waiting_assign", "done"),
("transferring", "done"),
("downloading_samplings", "done"),
("downloading_files", "done"),
]
assert self.task.get_current_step() is None
assert not self.task.done
assert self.task.get_resume_method_name() is None

def test_filter_failed_but_nothing(self):
assert list(Task.filter_failed()) == []
Expand Down
2 changes: 1 addition & 1 deletion task/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def restart(self, request, pk=None):
@action(methods=["post"], detail=True)
def resume(self, request, pk=None):
task = self.get_object()
task.resume()
task.schedule_resume()
return Response({"status": task.status})

def get_serializer(self, *args, **kwargs):
Expand Down

0 comments on commit 38b477a

Please sign in to comment.