Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Reading data from OpenSearch is pulling all the data and not pushing down the aggregates #302

Open
raviprakashshahi opened this issue Jul 7, 2023 · 1 comment
Labels
bug Something isn't working

Comments

@raviprakashshahi
Copy link

raviprakashshahi commented Jul 7, 2023

What is the bug?

Trying to query OpenSearch 2.3 from a Glue job. The job is not pushing down the aggregate step and rather pulls all the data from OpenSearch after applying the filter and does groupBy and count in memory. The data size is too huge, I am expecting it to push the aggregate function to the OpenSearch instead.

Filter pushdown is working but aggregate pushdown is not working.

The relevant configs are enabled -

.option("opensearch.pushdown.aggregation.enabled", "true") .option("opensearch.internal.spark.sql.pushdown", "true")

val result2 = sparkSession.read.format("opensearch").load(indexName) .filter(col("status").equalTo("active")) .groupBy("id") .agg(count("*").alias("count"))

I also tried passing the query directly instead of using Spark aggregate function, this just pulls all the data from OpenSearch and doesn't honor the query.

`conf.set("opensearch.query", """{"search": 0, "query": {"bool":{"filter":[{"term":{"status":"active"}}]}}, "aggs": {"id": { "terms": { "field": "id", "size": 20 }}}}""".stripMargin)

sparkSession.read.format("opensearch").load(indexName)
`

Spark Physical plan being generated -

== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[id#11], functions=[count(1)], output=[id#11, count#65L]) +- Exchange hashpartitioning(id#11, 36), ENSURE_REQUIREMENTS, [id=#14] +- HashAggregate(keys=[id#11], functions=[partial_count(1)], output=[id#11, count#69L]) +- Project [id#11] +- Filter (isnotnull(status#14) AND (status#14 = active)) +- Scan OpenSearchRelation(Map(opensearch.resource -> ****),org.apache.spark.sql.SQLContext@27c827ec,None) [id#11,status#14] PushedFilters: [IsNotNull(status), EqualTo(status,active)], ReadSchema: struct<id:string,status:string>

What is the expected behavior?

I am expecting it to push the aggregate function to OpenSearch instead of pulling the data and aggregating it in the spark executor.

What is your host/environment?

Glue: 3 (Spark 3.0, scala: 2)
OpenSearch: 2.3
opensearch-hadoop connector: opensearch-spark-30_2.12-3.0.0-SNAPSHOT.jar

@raviprakashshahi raviprakashshahi added bug Something isn't working untriaged labels Jul 7, 2023
@wbeckler
Copy link

This makes sense. If anyone is interested in making this change, I think it would be welcome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants