Skip to content

Commit

Permalink
Merge pull request #35126 from dimagi/ze/trigger-es-index-geospatial-…
Browse files Browse the repository at this point in the history
…enable

Trigger Geopoint ES Index on Geospatial Feature Flag Enable
  • Loading branch information
zandre-eng authored Nov 29, 2024
2 parents 90631dd + fad623c commit d677f0d
Show file tree
Hide file tree
Showing 19 changed files with 392 additions and 83 deletions.
4 changes: 4 additions & 0 deletions corehq/apps/geospatial/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,7 @@

# Case property to identify cases assigned through disbursement on the Case Management Page
ASSIGNED_VIA_DISBURSEMENT_CASE_PROPERTY = 'commcare_assigned_via_disbursement'
ES_INDEX_TASK_HELPER_BASE_KEY = 'geo_cases_index_cases'

DEFAULT_QUERY_LIMIT = 10_000
DEFAULT_CHUNK_SIZE = 100
45 changes: 41 additions & 4 deletions corehq/apps/geospatial/es.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from math import ceil

from corehq.apps.case_search.const import CASE_PROPERTIES_PATH
from corehq.apps.es import filters
from corehq.apps.es import filters, queries, CaseSearchES
from corehq.apps.es.aggregations import (
FilterAggregation,
GeoBoundsAggregation,
GeohashGridAggregation,
NestedAggregation,
)
from corehq.apps.es.case_search import PROPERTY_GEOPOINT_VALUE, PROPERTY_KEY
from corehq.apps.geospatial.const import MAX_GEOHASH_DOC_COUNT
from corehq.apps.es.case_search import (
CASE_PROPERTIES_PATH,
PROPERTY_GEOPOINT_VALUE,
PROPERTY_KEY,
)
from corehq.apps.geospatial.const import MAX_GEOHASH_DOC_COUNT, DEFAULT_QUERY_LIMIT

CASE_PROPERTIES_AGG = 'case_properties'
CASE_PROPERTY_AGG = 'case_property'
Expand Down Expand Up @@ -131,3 +134,37 @@ def mid(lower, upper):
"""
assert lower <= upper
return ceil(lower + (upper - lower) / 2)


def case_query_for_missing_geopoint_val(
domain, geo_case_property, case_type=None, offset=0, sort_by=None
):
query = (
CaseSearchES()
.domain(domain)
.filter(_geopoint_value_missing_for_property(geo_case_property))
.size(DEFAULT_QUERY_LIMIT)
)
if case_type:
query = query.case_type(case_type)
if sort_by:
query.sort(sort_by)
if offset:
query.start(offset)
return query


def _geopoint_value_missing_for_property(geo_case_property_name):
"""
Query to find docs with missing 'geopoint_value' for the given case property.
"""
return queries.nested(
CASE_PROPERTIES_PATH,
queries.filtered(
queries.match_all(),
filters.AND(
filters.term(PROPERTY_KEY, geo_case_property_name),
filters.missing(PROPERTY_GEOPOINT_VALUE)
)
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,10 @@

from django.core.management.base import BaseCommand

from dimagi.utils.chunked import chunked

from corehq.apps.es import CaseSearchES, case_search_adapter, filters, queries
from corehq.apps.es.case_search import (
CASE_PROPERTIES_PATH,
PROPERTY_GEOPOINT_VALUE,
PROPERTY_KEY,
)
from corehq.apps.es.client import manager
from corehq.apps.geospatial.const import DEFAULT_QUERY_LIMIT
from corehq.apps.geospatial.es import case_query_for_missing_geopoint_val
from corehq.apps.geospatial.management.commands.index_utils import process_batch
from corehq.apps.geospatial.utils import get_geo_case_property
from corehq.form_processor.models import CommCareCase
from corehq.util.log import with_progress_bar

DEFAULT_QUERY_LIMIT = 10_000
DEFAULT_CHUNK_SIZE = 100


class Command(BaseCommand):
Expand All @@ -25,64 +14,27 @@ class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('domain')
parser.add_argument('--case_type', required=False)
parser.add_argument('--query_limit', required=False)
parser.add_argument('--chunk_size', required=False)

def handle(self, *args, **options):
domain = options['domain']
case_type = options.get('case_type')
query_limit = options.get('query_limit')
chunk_size = options.get('chunk_size')
index_case_docs(domain, query_limit, chunk_size, case_type)
index_case_docs(domain, case_type)


def index_case_docs(domain, query_limit=DEFAULT_QUERY_LIMIT, chunk_size=DEFAULT_CHUNK_SIZE, case_type=None):
def index_case_docs(domain, case_type=None):
geo_case_property = get_geo_case_property(domain)
query = _es_case_query(domain, geo_case_property, case_type)
query = case_query_for_missing_geopoint_val(domain, geo_case_property, case_type)
count = query.count()
print(f'{count} case(s) to process')
batch_count = 1
if query_limit:
batch_count = math.ceil(count / query_limit)
batch_count = math.ceil(count / DEFAULT_QUERY_LIMIT)
print(f"Cases will be processed in {batch_count} batches")
for i in range(batch_count):
print(f'Processing {i+1}/{batch_count}')
query = _es_case_query(domain, geo_case_property, case_type, size=query_limit)
case_ids = query.get_ids()
_index_case_ids(domain, case_ids, chunk_size)


def _index_case_ids(domain, case_ids, chunk_size):
for case_id_chunk in chunked(with_progress_bar(case_ids), chunk_size):
case_chunk = CommCareCase.objects.get_cases(list(case_id_chunk), domain)
case_search_adapter.bulk_index(case_chunk)
manager.index_refresh(case_search_adapter.index_name)


def _es_case_query(domain, geo_case_property, case_type=None, size=None):
query = (
CaseSearchES()
.domain(domain)
.filter(_geopoint_value_missing_for_property(geo_case_property))
)
if case_type:
query = query.case_type(case_type)
if size:
query = query.size(size)
return query


def _geopoint_value_missing_for_property(geo_case_property_name):
"""
Query to find docs with missing 'geopoint_value' for the given case property.
"""
return queries.nested(
CASE_PROPERTIES_PATH,
queries.filtered(
queries.match_all(),
filters.AND(
filters.term(PROPERTY_KEY, geo_case_property_name),
filters.missing(PROPERTY_GEOPOINT_VALUE)
)
process_batch(
domain,
geo_case_property,
total_count=count,
case_type=case_type,
with_progress=True,
offset=i * DEFAULT_QUERY_LIMIT,
)
)
33 changes: 33 additions & 0 deletions corehq/apps/geospatial/management/commands/index_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from dimagi.utils.chunked import chunked

from corehq.apps.es import case_search_adapter
from corehq.apps.geospatial.es import case_query_for_missing_geopoint_val
from corehq.form_processor.models import CommCareCase
from corehq.util.log import with_progress_bar
from corehq.apps.geospatial.const import DEFAULT_CHUNK_SIZE, DEFAULT_QUERY_LIMIT


def process_batch(
domain,
geo_case_property,
total_count,
case_type=None,
with_progress=False,
offset=0,
):
sort_by = 'opened_on' if total_count > DEFAULT_QUERY_LIMIT else None
query = case_query_for_missing_geopoint_val(
domain, geo_case_property, case_type, offset=offset, sort_by=sort_by
)
case_ids = query.get_ids()
_index_case_ids(domain, case_ids, with_progress)


def _index_case_ids(domain, case_ids, with_progress):
if with_progress:
ids = with_progress_bar(case_ids)
else:
ids = case_ids
for case_id_chunk in chunked(ids, DEFAULT_CHUNK_SIZE):
case_chunk = CommCareCase.objects.get_cases(list(case_id_chunk), domain)
case_search_adapter.bulk_index(case_chunk)
4 changes: 4 additions & 0 deletions corehq/apps/geospatial/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from corehq.util.quickcache import quickcache

from .dispatchers import CaseManagementMapDispatcher
from corehq.apps.geospatial.const import ES_INDEX_TASK_HELPER_BASE_KEY
from .es import (
BUCKET_CASES_AGG,
CASE_PROPERTIES_AGG,
Expand All @@ -38,6 +39,7 @@
geojson_to_es_geoshape,
get_geo_case_property,
validate_geometry,
get_celery_task_tracker,
)


Expand All @@ -59,12 +61,14 @@ class BaseCaseMapReport(ProjectReport, CaseListMixin, XpathCaseSearchFilterMixin
def template_context(self):
# Whatever is specified here can be accessed through initial_page_data
context = super(BaseCaseMapReport, self).template_context
celery_task_tracker = get_celery_task_tracker(self.domain, task_slug=ES_INDEX_TASK_HELPER_BASE_KEY)
context.update({
'mapbox_access_token': settings.MAPBOX_ACCESS_TOKEN,
'saved_polygons': [
{'id': p.id, 'name': p.name, 'geo_json': p.geo_json}
for p in GeoPolygon.objects.filter(domain=self.domain).all()
],
'task_status': celery_task_tracker.get_status(),
})
return context

Expand Down
5 changes: 2 additions & 3 deletions corehq/apps/geospatial/static/geospatial/js/models.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ hqDefine('geospatial/js/models', [
const SELECTED_FEATURE_ID_QUERY_PARAM = 'selected_feature_id';
const DEFAULT_CENTER_COORD = [-20.0, -0.0];
const DISBURSEMENT_LAYER_PREFIX = 'route-';
const saveGeoPolygonUrl = initialPageData.reverse('geo_polygons');
const reassignCasesUrl = initialPageData.reverse('reassign_cases');
const unexpectedErrorMessage = gettext(
"Oops! Something went wrong!" +
" Please report an issue if the problem persists."
Expand Down Expand Up @@ -808,6 +806,7 @@ hqDefine('geospatial/js/models', [
if (!validateSavedPolygonName(name)) {
return;
}
const saveGeoPolygonUrl = initialPageData.reverse('geo_polygons');

if (!clearDisbursementBeforeProceeding()) {
return;
Expand Down Expand Up @@ -1062,7 +1061,7 @@ hqDefine('geospatial/js/models', [
'case_id_to_owner_id': caseIdToOwnerId,
'include_related_cases': self.includeRelatedCases(),
};

const reassignCasesUrl = initialPageData.reverse('reassign_cases');
self.assignmentAjaxInProgress(true);
$.ajax({
type: 'post',
Expand Down
56 changes: 55 additions & 1 deletion corehq/apps/geospatial/tasks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
import math

from dimagi.utils.logging import notify_exception

from corehq.apps.celery import task
from corehq.apps.geospatial.const import (
ES_INDEX_TASK_HELPER_BASE_KEY,
DEFAULT_QUERY_LIMIT,
)
from corehq.apps.geospatial.es import case_query_for_missing_geopoint_val
from corehq.apps.geospatial.utils import (
CeleryTaskTracker,
get_celery_task_tracker,
get_flag_assigned_cases_config,
CeleryTaskTracker,
update_cases_owner,
get_geo_case_property,
)
from corehq.apps.geospatial.management.commands.index_utils import process_batch

from settings import MAX_GEOSPATIAL_INDEX_DOC_LIMIT


@task(queue="background_queue", ignore_result=True)
Expand All @@ -14,3 +28,43 @@ def geo_cases_reassignment_update_owners(domain, case_owner_updates_dict, task_k
finally:
celery_task_tracker = CeleryTaskTracker(task_key)
celery_task_tracker.mark_completed()


@task(queue='geospatial_queue', ignore_result=True)
def index_es_docs_with_location_props(domain):
celery_task_tracker = get_celery_task_tracker(domain, ES_INDEX_TASK_HELPER_BASE_KEY)
geo_case_prop = get_geo_case_property(domain)
query = case_query_for_missing_geopoint_val(domain, geo_case_prop)
doc_count = query.count()
if doc_count > MAX_GEOSPATIAL_INDEX_DOC_LIMIT:
celery_task_tracker.mark_as_error(error_slug='TOO_MANY_CASES')
return

celery_task_tracker.mark_requested()
batch_count = math.ceil(doc_count / DEFAULT_QUERY_LIMIT)
try:
for i in range(batch_count):
process_batch(
domain,
geo_case_prop,
offset=i * DEFAULT_QUERY_LIMIT,
total_count=doc_count
)
current_batch = i + 1
processed_count = min(current_batch * DEFAULT_QUERY_LIMIT, doc_count)
celery_task_tracker.update_progress(
current=processed_count,
total=doc_count
)
except Exception as e:
celery_task_tracker.mark_as_error(error_slug='CELERY')
notify_exception(
None,
'Something went wrong while indexing ES docs with location props.',
details={
'error': str(e),
'domain': domain
}
)
else:
celery_task_tracker.mark_completed()
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
{% load hq_shared_tags %}

{% block reportcontent %}
{% include 'geospatial/partials/index_alert.html' %}
<div class="row panel">
<div class="col col-md-2">
<span id="lock-groups-controls">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{% load i18n %}

{% block reportcontent %}
{% include 'geospatial/partials/index_alert.html' %}
<div class="panel panel-default" id="user-filters-panel">
<div class="panel-body collapse in" aria-expanded="true">
<legend>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
{% initial_page_data 'case_types_with_gps' case_types_with_gps %}
{% initial_page_data 'couch_user_username' couch_user_username %}

{% include 'geospatial/partials/index_alert.html' %}
<ul id="tabs-list" class="nav nav-tabs">
<li data-bind="click: onclickAction" class="active"><a data-toggle="tab" href="#tabs-cases">{% trans 'Update Case Data' %}</a></li>
<li data-bind="click: onclickAction"><a data-toggle="tab" href="#tabs-users">{% trans 'Update Mobile Worker Data' %}</a></li>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{% load i18n %}

{% if task_status.status == 'ERROR' %}
<div class="alert alert-danger">
<p>
{% if task_status.error_slug == 'TOO_MANY_CASES' %}
{% blocktrans %}
Existing cases in the domain were not processed to be available in Microplanning reports
because there were too many to be processed. New or updated cases will still be available
for use for Microplanning. Please reach out to support if you need support with existing cases.
{% endblocktrans %}
{% elif task_status.error_slug == 'CELERY' %}
{% blocktrans %}
Oops! Something went wrong while processing existing cases to be available in Microplanning
reports. Please reach out to support.
{% endblocktrans %}
{% endif %}
</p>
</div>
{% elif task_status.status == 'ACTIVE' %}
<div class="alert alert-info">
<p>
{% blocktrans %}
Existing cases in the domain are being processed to be available in
Microplanning reports. Please be patient.
{% endblocktrans %}
({{ task_status.progress}}%)
</p>
</div>
{% endif %}
1 change: 1 addition & 0 deletions corehq/apps/geospatial/templates/geospatial/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
{% initial_page_data 'road_network_algorithm_slug' road_network_algorithm_slug %}

<form id="geospatial-config-form" class="form-horizontal disable-on-submit ko-template" method="post">
{% include 'geospatial/partials/index_alert.html' %}
{% crispy form %}
</form>
{% endblock %}
Loading

0 comments on commit d677f0d

Please sign in to comment.