Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tech: Minor code improvements #628

Merged
merged 1 commit into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: "3"
services:

amqp:
Expand Down
3 changes: 2 additions & 1 deletion exodus/exodus/core/static_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ def download_fdroid_apk(storage, handle, tmp_dir, apk_name, apk_tmp):

try:
r = requests.get(url)
open(apk_tmp, 'wb').write(r.content)
with open(apk_tmp, 'wb') as f:
f.write(r.content)
except Exception:
return False

Expand Down
4 changes: 2 additions & 2 deletions exodus/reports/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def update_fdroid_data():
with tempfile.NamedTemporaryFile() as f:
try:
r = requests.get('{}/index.xml'.format(settings.FDROID_MIRROR))
open(f.name, 'wb').write(r.content)
f.write(r.content)

storage_helper = RemoteStorageHelper()
storage_helper.put_file(f.name, 'fdroid_index.xml')
Expand All @@ -81,7 +81,7 @@ def update_fdroid_data():
with tempfile.NamedTemporaryFile() as f:
try:
r = requests.get('{}/index-v1.jar'.format(settings.FDROID_MIRROR))
open(f.name, 'wb').write(r.content)
f.write(r.content)

zip_file = zipfile.ZipFile(f.name)
zip_file.extract('index-v1.json', '/tmp')
Expand Down
246 changes: 117 additions & 129 deletions exodus/restful_api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,57 +25,55 @@
@authentication_classes((TokenAuthentication,))
@permission_classes((IsAuthenticated,))
def get_report_infos(request, r_id):
if request.method == 'GET':
try:
report = Report.objects.get(pk=r_id)
except Report.DoesNotExist:
raise Http404('No report found')

certificate = None
if hasattr(report.application, 'apk'):
certificates = Certificate.objects.filter(apk=report.application.apk)
certificate = certificates.first()

obj = {
'creation_date': report.creation_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
'report_id': report.id,
'handle': report.application.handle,
'certificate': certificate,
'apk_dl_link': '',
}
if request.user.is_staff:
obj['apk_dl_link'] = '/api/apk/{}/'.format(report.id)

serializer = ReportInfosSerializer(obj, many=False)
return JsonResponse(serializer.data, safe=True)
try:
report = Report.objects.get(pk=r_id)
except Report.DoesNotExist:
raise Http404('No report found')

certificate = None
if hasattr(report.application, 'apk'):
certificates = Certificate.objects.filter(apk=report.application.apk)
certificate = certificates.first()

obj = {
'creation_date': report.creation_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
'report_id': report.id,
'handle': report.application.handle,
'certificate': certificate,
'apk_dl_link': '',
}
if request.user.is_staff:
obj['apk_dl_link'] = '/api/apk/{}/'.format(report.id)

serializer = ReportInfosSerializer(obj, many=False)
return JsonResponse(serializer.data, safe=True)


@csrf_exempt
@api_view(['GET'])
@authentication_classes((TokenAuthentication,))
@permission_classes((IsAuthenticated, IsAdminUser))
def get_apk(request, r_id):
if request.method == 'GET':
try:
report = Report.objects.get(pk=r_id)
except Report.DoesNotExist:
raise Http404('No report found')

apk_path = report.apk_file

minioClient = Minio(
settings.MINIO_STORAGE_ENDPOINT,
access_key=settings.MINIO_STORAGE_ACCESS_KEY,
secret_key=settings.MINIO_STORAGE_SECRET_KEY,
secure=settings.MINIO_STORAGE_USE_HTTPS
)
try:
data = minioClient.get_object(settings.MINIO_STORAGE_MEDIA_BUCKET_NAME, apk_path)
return HttpResponse(
data.data, content_type=data.getheader('Content-Type'))
except Exception as err:
print(err)
return HttpResponse(status=500)
try:
report = Report.objects.get(pk=r_id)
except Report.DoesNotExist:
raise Http404('No report found')

apk_path = report.apk_file

minioClient = Minio(
settings.MINIO_STORAGE_ENDPOINT,
access_key=settings.MINIO_STORAGE_ACCESS_KEY,
secret_key=settings.MINIO_STORAGE_SECRET_KEY,
secure=settings.MINIO_STORAGE_USE_HTTPS
)
try:
data = minioClient.get_object(settings.MINIO_STORAGE_MEDIA_BUCKET_NAME, apk_path)
return HttpResponse(
data.data, content_type=data.getheader('Content-Type'))
except Exception as err:
print(err)
return HttpResponse(status=500)


def _get_reports_list(report_list):
Expand Down Expand Up @@ -127,94 +125,88 @@ def _get_tracker_list():
@authentication_classes((TokenAuthentication,))
@permission_classes((IsAuthenticated,))
def get_all_reports(request):
if request.method == 'GET':
report_list = Report.objects.order_by('-creation_date')[:500]
applications = _get_reports_list(report_list)
trackers = _get_tracker_list()
return JsonResponse(
{
'applications': applications,
'trackers': trackers
}
)
report_list = Report.objects.order_by('-creation_date')[:500]
applications = _get_reports_list(report_list)
trackers = _get_tracker_list()
return JsonResponse(
{
'applications': applications,
'trackers': trackers
}
)


@csrf_exempt
@api_view(['GET'])
@authentication_classes(())
@permission_classes(())
def get_all_trackers(request):
if request.method == 'GET':
trackers = _get_tracker_list()
return JsonResponse({'trackers': trackers})
trackers = _get_tracker_list()
return JsonResponse({'trackers': trackers})


@csrf_exempt
@api_view(['GET'])
@authentication_classes((TokenAuthentication,))
@permission_classes((IsAuthenticated,))
def get_all_applications(request):
if request.method == 'GET':
try:
if request.GET.get('tracker'):
tracker_id = request.GET.get('tracker')
applications = Application.objects.filter(report__found_trackers__id=tracker_id).order_by('handle', '-source').distinct('handle', 'source')
else:
applications = Application.objects.order_by('handle', '-source').distinct('handle', 'source')
if request.GET.get('option', 'full') == 'short':
serializer = ApplicationShortSerializer(applications, many=True)
else:
serializer = ApplicationSerializer(applications, many=True)
return JsonResponse({'applications': serializer.data}, safe=False)
except Application.DoesNotExist:
return JsonResponse({}, safe=True)
try:
if request.GET.get('tracker'):
tracker_id = request.GET.get('tracker')
applications = Application.objects.filter(report__found_trackers__id=tracker_id).order_by('handle', '-source').distinct('handle', 'source')
else:
applications = Application.objects.order_by('handle', '-source').distinct('handle', 'source')
if request.GET.get('option', 'full') == 'short':
serializer = ApplicationShortSerializer(applications, many=True)
else:
serializer = ApplicationSerializer(applications, many=True)
return JsonResponse({'applications': serializer.data}, safe=False)
except Application.DoesNotExist:
return JsonResponse({}, safe=True)


@csrf_exempt
@api_view(['GET'])
@authentication_classes((TokenAuthentication,))
@permission_classes((IsAuthenticated,))
def search_strict_handle(request, handle):
if request.method == 'GET':
try:
reports = Report.objects.filter(application__handle=handle).order_by('-creation_date')
except Report.DoesNotExist:
return JsonResponse({}, safe=True)
return JsonResponse(_get_reports_list(reports))
try:
reports = Report.objects.filter(application__handle=handle).order_by('-creation_date')
except Report.DoesNotExist:
return JsonResponse({}, safe=True)
return JsonResponse(_get_reports_list(reports))


@csrf_exempt
@api_view(['GET'])
@authentication_classes(())
@permission_classes(())
def search_latest_report(request, handle):
if request.method == 'GET':
try:
report = Report.objects.filter(application__handle=handle).order_by('-creation_date').first()
if not report:
raise Report.DoesNotExist
except Report.DoesNotExist:
return JsonResponse({}, safe=True)
obj = {
'id': report.id,
'name': report.application.name,
'creation_date': report.creation_date
}
return JsonResponse(obj)
try:
report = Report.objects.filter(application__handle=handle).order_by('-creation_date').first()
if not report:
raise Report.DoesNotExist
except Report.DoesNotExist:
return JsonResponse({}, safe=True)
obj = {
'id': report.id,
'name': report.application.name,
'creation_date': report.creation_date
}
return JsonResponse(obj)


@csrf_exempt
@api_view(['GET'])
@authentication_classes((TokenAuthentication,))
@permission_classes((IsAuthenticated,))
def get_report_details(request, r_id):
if request.method == 'GET':
try:
report = Report.objects.get(pk=r_id)
except Report.DoesNotExist:
raise Http404('No reports found')
serializer = ReportSerializer(report, many=False)
return JsonResponse(serializer.data, safe=True)
try:
report = Report.objects.get(pk=r_id)
except Report.DoesNotExist:
raise Http404('No reports found')
serializer = ReportSerializer(report, many=False)
return JsonResponse(serializer.data, safe=True)


def _get_applications(input, limit):
Expand Down Expand Up @@ -268,56 +260,52 @@ def search(request):
@authentication_classes((TokenAuthentication,))
@permission_classes((IsAuthenticated,))
def search_strict_handle_details(request, handle):
if request.method == 'GET':
try:
reports = Report.objects.filter(application__handle=handle)
details = []
for report in reports:
app = report.application
details.append({
'handle': app.handle,
'app_name': app.name,
'uaid': app.app_uid,
'version_name': app.version,
'version_code': app.version_code,
'source': app.source,
'icon_hash': app.icon_phash,
'apk_hash': app.apk.sum,
'created': report.creation_date,
'updated': report.updated_at,
'report': report.id,
'creator': app.creator,
'downloads': app.downloads,
'trackers': [t.id for t in report.found_trackers.all()],
'permissions': sorted([p.name for p in app.permission_set.all()])
})
except Report.DoesNotExist:
return JsonResponse({}, safe=True)
return JsonResponse(details, safe=False)
try:
reports = Report.objects.filter(application__handle=handle)
details = []
for report in reports:
app = report.application
details.append({
'handle': app.handle,
'app_name': app.name,
'uaid': app.app_uid,
'version_name': app.version,
'version_code': app.version_code,
'source': app.source,
'icon_hash': app.icon_phash,
'apk_hash': app.apk.sum,
'created': report.creation_date,
'updated': report.updated_at,
'report': report.id,
'creator': app.creator,
'downloads': app.downloads,
'trackers': [t.id for t in report.found_trackers.all()],
'permissions': sorted([p.name for p in app.permission_set.all()])
})
except Report.DoesNotExist:
return JsonResponse({}, safe=True)
return JsonResponse(details, safe=False)


@csrf_exempt
@api_view(['GET'])
@authentication_classes((TokenAuthentication,))
@permission_classes((IsAuthenticated,))
def get_trackers_count(request):
if request.method == 'GET':
return JsonResponse({'count': Tracker.objects.count()})
return JsonResponse({'count': Tracker.objects.count()})


@csrf_exempt
@api_view(['GET'])
@authentication_classes((TokenAuthentication,))
@permission_classes((IsAuthenticated,))
def get_reports_count(request):
if request.method == 'GET':
return JsonResponse({'count': Report.objects.count()})
return JsonResponse({'count': Report.objects.count()})


@csrf_exempt
@api_view(['GET'])
@authentication_classes((TokenAuthentication,))
@permission_classes((IsAuthenticated,))
def get_applications_count(request):
if request.method == 'GET':
return JsonResponse({'count': Application.objects.distinct('handle').count()})
return JsonResponse({'count': Application.objects.distinct('handle').count()})
Loading