From c2a25863cfc95afd8fcb8ef6ffc70a07cf5d4465 Mon Sep 17 00:00:00 2001 From: Yogesh Ojha Date: Sat, 20 Jul 2024 08:02:15 +0530 Subject: [PATCH 1/3] remove temporary fix for celery beat --- web/reNgine/settings.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/web/reNgine/settings.py b/web/reNgine/settings.py index 4d33fd84c..0924a6391 100644 --- a/web/reNgine/settings.py +++ b/web/reNgine/settings.py @@ -154,10 +154,6 @@ USE_L10N = True USE_TZ = True -# Temporary fix for celery beat crash -# See https://github.com/yogeshojha/rengine/issues/971 -DJANGO_CELERY_BEAT_TZ_AWARE = False - MEDIA_URL = '/media/' MEDIA_ROOT = '/usr/src/scan_results/' FILE_UPLOAD_MAX_MEMORY_SIZE = 100000000 From 8c9187cb6e6a9344642e332a6f0e4629875d90ad Mon Sep 17 00:00:00 2001 From: Yogesh Ojha Date: Sat, 20 Jul 2024 08:03:37 +0530 Subject: [PATCH 2/3] refactor initiate_scan to accept initiated_by user id, for scheduled scans --- web/reNgine/tasks.py | 257 +++++++++++++++++++++++-------------------- 1 file changed, 137 insertions(+), 120 deletions(-) diff --git a/web/reNgine/tasks.py b/web/reNgine/tasks.py index bf44e3132..5f2bc3247 100644 --- a/web/reNgine/tasks.py +++ b/web/reNgine/tasks.py @@ -57,6 +57,7 @@ def initiate_scan( results_dir=RENGINE_RESULTS, imported_subdomains=[], out_of_scope_subdomains=[], + initiated_by_id=None, url_filter=''): """Initiate a new scan. @@ -68,137 +69,153 @@ def initiate_scan( results_dir (str): Results directory. imported_subdomains (list): Imported subdomains. out_of_scope_subdomains (list): Out-of-scope subdomains. - url_filter (str): URL path. Default: '' + url_filter (str): URL path. Default: ''. + initiated_by (int): User ID initiating the scan. """ + logger.info('Initiating scan on celery') + scan = None + try: + # Get scan engine + engine_id = engine_id or scan.scan_type.id # scan history engine_id + engine = EngineType.objects.get(pk=engine_id) + + # Get YAML config + config = yaml.safe_load(engine.yaml_configuration) + enable_http_crawl = config.get(ENABLE_HTTP_CRAWL, DEFAULT_ENABLE_HTTP_CRAWL) + gf_patterns = config.get(GF_PATTERNS, []) + + # Get domain and set last_scan_date + domain = Domain.objects.get(pk=domain_id) + domain.last_scan_date = timezone.now() + domain.save() + + # Get path filter + url_filter = url_filter.rstrip('/') + + # for live scan scan history id is passed as scan_history_id + # and no need to create scan_history object + + if scan_type == SCHEDULED_SCAN: # scheduled + # we need to create scan_history object for each scheduled scan + scan_history_id = create_scan_object( + host_id=domain_id, + engine_id=engine_id, + initiated_by_id=initiated_by_id, + ) - # Get scan history - scan = ScanHistory.objects.get(pk=scan_history_id) - - # Get scan engine - engine_id = engine_id or scan.scan_type.id # scan history engine_id - engine = EngineType.objects.get(pk=engine_id) - - # Get YAML config - config = yaml.safe_load(engine.yaml_configuration) - enable_http_crawl = config.get(ENABLE_HTTP_CRAWL, DEFAULT_ENABLE_HTTP_CRAWL) - gf_patterns = config.get(GF_PATTERNS, []) - - # Get domain and set last_scan_date - domain = Domain.objects.get(pk=domain_id) - domain.last_scan_date = timezone.now() - domain.save() - - # Get path filter - url_filter = url_filter.rstrip('/') - - # Get or create ScanHistory() object - if scan_type == LIVE_SCAN: # immediate scan = ScanHistory.objects.get(pk=scan_history_id) scan.scan_status = RUNNING_TASK - elif scan_type == SCHEDULED_SCAN: # scheduled - scan = ScanHistory() - scan.scan_status = INITIATED_TASK - scan.scan_type = engine - scan.celery_ids = [initiate_scan.request.id] - scan.domain = domain - scan.start_scan_date = timezone.now() - scan.tasks = engine.tasks - scan.results_dir = f'{results_dir}/{domain.name}_{scan.id}' - add_gf_patterns = gf_patterns and 'fetch_url' in engine.tasks - if add_gf_patterns: - scan.used_gf_patterns = ','.join(gf_patterns) - scan.save() - - # Create scan results dir - os.makedirs(scan.results_dir) - - # Build task context - ctx = { - 'scan_history_id': scan_history_id, - 'engine_id': engine_id, - 'domain_id': domain.id, - 'results_dir': scan.results_dir, - 'url_filter': url_filter, - 'yaml_configuration': config, - 'out_of_scope_subdomains': out_of_scope_subdomains - } - ctx_str = json.dumps(ctx, indent=2) - - # Send start notif - logger.warning(f'Starting scan {scan_history_id} with context:\n{ctx_str}') - send_scan_notif.delay( - scan_history_id, - subscan_id=None, - engine_id=engine_id, - status=CELERY_TASK_STATUS_MAP[scan.scan_status]) - - # Save imported subdomains in DB - save_imported_subdomains(imported_subdomains, ctx=ctx) - - # Create initial subdomain in DB: make a copy of domain as a subdomain so - # that other tasks using subdomains can use it. - subdomain_name = domain.name - subdomain, _ = save_subdomain(subdomain_name, ctx=ctx) + scan.scan_type = engine + scan.celery_ids = [initiate_scan.request.id] + scan.domain = domain + scan.start_scan_date = timezone.now() + scan.tasks = engine.tasks + scan.results_dir = f'{results_dir}/{domain.name}_{scan.id}' + add_gf_patterns = gf_patterns and 'fetch_url' in engine.tasks + if add_gf_patterns: + scan.used_gf_patterns = ','.join(gf_patterns) + scan.save() + + # Create scan results dir + os.makedirs(scan.results_dir) + + # Build task context + ctx = { + 'scan_history_id': scan_history_id, + 'engine_id': engine_id, + 'domain_id': domain.id, + 'results_dir': scan.results_dir, + 'url_filter': url_filter, + 'yaml_configuration': config, + 'out_of_scope_subdomains': out_of_scope_subdomains + } + ctx_str = json.dumps(ctx, indent=2) + + # Send start notif + logger.warning(f'Starting scan {scan_history_id} with context:\n{ctx_str}') + send_scan_notif.delay( + scan_history_id, + subscan_id=None, + engine_id=engine_id, + status=CELERY_TASK_STATUS_MAP[scan.scan_status]) + + # Save imported subdomains in DB + save_imported_subdomains(imported_subdomains, ctx=ctx) + + # Create initial subdomain in DB: make a copy of domain as a subdomain so + # that other tasks using subdomains can use it. + subdomain_name = domain.name + subdomain, _ = save_subdomain(subdomain_name, ctx=ctx) - # If enable_http_crawl is set, create an initial root HTTP endpoint so that - # HTTP crawling can start somewhere - http_url = f'{domain.name}{url_filter}' if url_filter else domain.name - endpoint, _ = save_endpoint( - http_url, - ctx=ctx, - crawl=enable_http_crawl, - is_default=True, - subdomain=subdomain - ) - if endpoint and endpoint.is_alive: - # TODO: add `root_endpoint` property to subdomain and simply do - # subdomain.root_endpoint = endpoint instead - logger.warning(f'Found subdomain root HTTP URL {endpoint.http_url}') - subdomain.http_url = endpoint.http_url - subdomain.http_status = endpoint.http_status - subdomain.response_time = endpoint.response_time - subdomain.page_title = endpoint.page_title - subdomain.content_type = endpoint.content_type - subdomain.content_length = endpoint.content_length - for tech in endpoint.techs.all(): - subdomain.technologies.add(tech) - subdomain.save() + # If enable_http_crawl is set, create an initial root HTTP endpoint so that + # HTTP crawling can start somewhere + http_url = f'{domain.name}{url_filter}' if url_filter else domain.name + endpoint, _ = save_endpoint( + http_url, + ctx=ctx, + crawl=enable_http_crawl, + is_default=True, + subdomain=subdomain + ) + if endpoint and endpoint.is_alive: + # TODO: add `root_endpoint` property to subdomain and simply do + # subdomain.root_endpoint = endpoint instead + logger.warning(f'Found subdomain root HTTP URL {endpoint.http_url}') + subdomain.http_url = endpoint.http_url + subdomain.http_status = endpoint.http_status + subdomain.response_time = endpoint.response_time + subdomain.page_title = endpoint.page_title + subdomain.content_type = endpoint.content_type + subdomain.content_length = endpoint.content_length + for tech in endpoint.techs.all(): + subdomain.technologies.add(tech) + subdomain.save() - # Build Celery tasks, crafted according to the dependency graph below: - # subdomain_discovery --> port_scan --> fetch_url --> dir_file_fuzz - # osint vulnerability_scan - # osint dalfox xss scan - # screenshot - # waf_detection - workflow = chain( - group( - subdomain_discovery.si(ctx=ctx, description='Subdomain discovery'), - osint.si(ctx=ctx, description='OS Intelligence') - ), - port_scan.si(ctx=ctx, description='Port scan'), - fetch_url.si(ctx=ctx, description='Fetch URL'), - group( - dir_file_fuzz.si(ctx=ctx, description='Directories & files fuzz'), - vulnerability_scan.si(ctx=ctx, description='Vulnerability scan'), - screenshot.si(ctx=ctx, description='Screenshot'), - waf_detection.si(ctx=ctx, description='WAF detection') + # Build Celery tasks, crafted according to the dependency graph below: + # subdomain_discovery --> port_scan --> fetch_url --> dir_file_fuzz + # osint vulnerability_scan + # osint dalfox xss scan + # screenshot + # waf_detection + workflow = chain( + group( + subdomain_discovery.si(ctx=ctx, description='Subdomain discovery'), + osint.si(ctx=ctx, description='OS Intelligence') + ), + port_scan.si(ctx=ctx, description='Port scan'), + fetch_url.si(ctx=ctx, description='Fetch URL'), + group( + dir_file_fuzz.si(ctx=ctx, description='Directories & files fuzz'), + vulnerability_scan.si(ctx=ctx, description='Vulnerability scan'), + screenshot.si(ctx=ctx, description='Screenshot'), + waf_detection.si(ctx=ctx, description='WAF detection') + ) ) - ) - # Build callback - callback = report.si(ctx=ctx).set(link_error=[report.si(ctx=ctx)]) + # Build callback + callback = report.si(ctx=ctx).set(link_error=[report.si(ctx=ctx)]) - # Run Celery chord - logger.info(f'Running Celery workflow with {len(workflow.tasks) + 1} tasks') - task = chain(workflow, callback).on_error(callback).delay() - scan.celery_ids.append(task.id) - scan.save() + # Run Celery chord + logger.info(f'Running Celery workflow with {len(workflow.tasks) + 1} tasks') + task = chain(workflow, callback).on_error(callback).delay() + scan.celery_ids.append(task.id) + scan.save() - return { - 'success': True, - 'task_id': task.id - } + return { + 'success': True, + 'task_id': task.id + } + except Exception as e: + logger.exception(e) + if scan: + scan.scan_status = FAILED_TASK + scan.error_message = str(e) + scan.save() + return { + 'success': False, + 'error': str(e) + } @app.task(name='initiate_subscan', bind=False, queue='subscan_queue') From 566789352a50923bf9d261b624cb156e14187edc Mon Sep 17 00:00:00 2001 From: Yogesh Ojha Date: Sat, 20 Jul 2024 08:03:50 +0530 Subject: [PATCH 3/3] fix task names and celery beat errors --- web/reNgine/common_func.py | 31 +++++++++++++- web/startScan/views.py | 84 ++++++++++++++++---------------------- 2 files changed, 66 insertions(+), 49 deletions(-) diff --git a/web/reNgine/common_func.py b/web/reNgine/common_func.py index 3a2e1b032..6736aecb4 100644 --- a/web/reNgine/common_func.py +++ b/web/reNgine/common_func.py @@ -1028,4 +1028,33 @@ def parse_llm_vulnerability_report(report): except Exception as e: return data - return data \ No newline at end of file + return data + + +def create_scan_object(host_id, engine_id, initiated_by_id=None): + ''' + create task with pending status so that celery task will execute when + threads are free + Args: + host_id: int: id of Domain model + engine_id: int: id of EngineType model + initiated_by_id: int : id of User model (Optional) + ''' + # get current time + current_scan_time = timezone.now() + # fetch engine and domain object + engine = EngineType.objects.get(pk=engine_id) + domain = Domain.objects.get(pk=host_id) + scan = ScanHistory() + scan.scan_status = INITIATED_TASK + scan.domain = domain + scan.scan_type = engine + scan.start_scan_date = current_scan_time + if initiated_by_id: + user = User.objects.get(pk=initiated_by_id) + scan.initiated_by = user + scan.save() + # save last scan date for domain model + domain.start_scan_date = current_scan_time + domain.save() + return scan.id \ No newline at end of file diff --git a/web/startScan/views.py b/web/startScan/views.py index 22780a3a6..ac08da282 100644 --- a/web/startScan/views.py +++ b/web/startScan/views.py @@ -271,9 +271,9 @@ def start_scan_ui(request, slug, domain_id): # Create ScanHistory object scan_history_id = create_scan_object( - domain_id, - engine_id, - request.user + host_id=domain_id, + engine_id=engine_id, + initiated_by_id=request.user.id ) scan = ScanHistory.objects.get(pk=scan_history_id) @@ -286,7 +286,8 @@ def start_scan_ui(request, slug, domain_id): 'results_dir': '/usr/src/scan_results', 'imported_subdomains': subdomains_in, 'out_of_scope_subdomains': subdomains_out, - 'url_filter': filterPath + 'url_filter': filterPath, + 'initiated_by_id': request.user.id } initiate_scan.apply_async(kwargs=kwargs) scan.save() @@ -328,9 +329,9 @@ def start_multiple_scan(request, slug): for domain_id in list_of_domains.split(","): # Start the celery task scan_history_id = create_scan_object( - domain_id, - engine_id, - request.user + host_id=domain_id, + engine_id=engine_id, + initiated_by_id=request.user.id ) # domain = get_object_or_404(Domain, id=domain_id) @@ -340,6 +341,7 @@ def start_multiple_scan(request, slug): 'engine_id': engine_id, 'scan_type': LIVE_SCAN, 'results_dir': '/usr/src/scan_results', + 'initiated_by_id': request.user.id # TODO: Add this to multiple scan view # 'imported_subdomains': subdomains_in, # 'out_of_scope_subdomains': subdomains_out @@ -535,12 +537,15 @@ def schedule_scan(request, host_id, slug): 'scan_history_id': 1, 'scan_type': SCHEDULED_SCAN, 'imported_subdomains': subdomains_in, - 'out_of_scope_subdomains': subdomains_out + 'out_of_scope_subdomains': subdomains_out, + 'initiated_by_id': request.user.id } - PeriodicTask.objects.create(interval=schedule, - name=task_name, - task='reNgine.tasks.initiate_scan', - kwargs=json.dumps(kwargs)) + PeriodicTask.objects.create( + interval=schedule, + name=task_name, + task='initiate_scan', + kwargs=json.dumps(kwargs) + ) elif scheduled_mode == 'clocked': schedule_time = request.POST['scheduled_time'] clock, _ = ClockedSchedule.objects.get_or_create( @@ -551,13 +556,16 @@ def schedule_scan(request, host_id, slug): 'engine_id': engine.id, 'scan_type': SCHEDULED_SCAN, 'imported_subdomains': subdomains_in, - 'out_of_scope_subdomains': subdomains_out + 'out_of_scope_subdomains': subdomains_out, + 'initiated_by_id': request.user.id } - PeriodicTask.objects.create(clocked=clock, - one_off=True, - name=task_name, - task='reNgine.tasks.initiate_scan', - kwargs=json.dumps(kwargs)) + PeriodicTask.objects.create( + clocked=clock, + one_off=True, + name=task_name, + task='initiate_scan', + kwargs=json.dumps(kwargs) + ) messages.add_message( request, messages.INFO, @@ -629,29 +637,6 @@ def change_vuln_status(request, id): return HttpResponse('') -def create_scan_object(host_id, engine_id, initiated_by): - ''' - create task with pending status so that celery task will execute when - threads are free - ''' - # get current time - current_scan_time = timezone.now() - # fetch engine and domain object - engine = EngineType.objects.get(pk=engine_id) - domain = Domain.objects.get(pk=host_id) - scan = ScanHistory() - scan.scan_status = INITIATED_TASK - scan.domain = domain - scan.scan_type = engine - scan.start_scan_date = current_scan_time - scan.initiated_by = initiated_by - scan.save() - # save last scan date for domain model - domain.start_scan_date = current_scan_time - domain.save() - return scan.id - - @has_permission_decorator(PERM_MODIFY_SYSTEM_CONFIGURATIONS, redirect_url=FOUR_OH_FOUR_URL) def delete_all_scan_results(request): if request.method == 'POST': @@ -694,9 +679,9 @@ def start_organization_scan(request, id, slug): # Start Celery task for each organization's domains for domain in organization.get_domains(): scan_history_id = create_scan_object( - domain.id, - engine_id, - request.user + host_id=domain.id, + engine_id=engine_id, + initiated_by_id=request.user.id ) scan = ScanHistory.objects.get(pk=scan_history_id) @@ -706,6 +691,7 @@ def start_organization_scan(request, id, slug): 'engine_id': engine_id, 'scan_type': LIVE_SCAN, 'results_dir': '/usr/src/scan_results', + 'initiated_by_id': request.user.id, # TODO: Add this to multiple scan view # 'imported_subdomains': subdomains_in, # 'out_of_scope_subdomains': subdomains_out @@ -773,12 +759,13 @@ def schedule_organization_scan(request, slug, id): 'engine_id': engine.id, 'scan_history_id': 0, 'scan_type': SCHEDULED_SCAN, - 'imported_subdomains': None + 'imported_subdomains': None, + 'initiated_by_id': request.user.id }) PeriodicTask.objects.create( interval=schedule, name=task_name, - task='reNgine.tasks.initiate_scan', + task='initiate_scan', kwargs=_kwargs ) @@ -793,12 +780,13 @@ def schedule_organization_scan(request, slug, id): 'engine_id': engine.id, 'scan_history_id': 0, 'scan_type': LIVE_SCAN, - 'imported_subdomains': None + 'imported_subdomains': None, + 'initiated_by_id': request.user.id }) PeriodicTask.objects.create(clocked=clock, one_off=True, name=task_name, - task='reNgine.tasks.initiate_scan', + task='initiate_scan', kwargs=_kwargs )