Skip to content

Commit

Permalink
Merge pull request Sunbird-Obsrv#115 from abhishekpnt/dp-4.8.14
Browse files Browse the repository at this point in the history
KB-5761: Syntax fix
  • Loading branch information
shishirsuman092 authored Jul 11, 2024
2 parents 96bf17e + 03dccd3 commit 179382d
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.ekstep.analytics.dashboard.DashboardUtil._
import org.ekstep.analytics.dashboard.DataUtil._
import org.ekstep.analytics.framework._
import org.joda.time.DateTime
import org.apache.spark.sql.functions.col

import java.text.SimpleDateFormat
import java.time.LocalDate
Expand Down Expand Up @@ -95,13 +96,13 @@ object DashboardSyncModel extends AbsDashboardModel {
val toJsonStringUDF = udf((userID: String, fullName: String, userOrgName: String, designation: String, userProfileImgUrl: String, total_points: Long, rank: Int) => {
s"""{"userID":"$userID","fullName":"$fullName","userOrgName":"$userOrgName","designation":"$designation","userProfileImgUrl":"$userProfileImgUrl","total_points":$total_points,"rank":$rank}"""
})
val windowSpec = Window.partitionBy("userOrgID").orderBy($"total_points".desc)
val windowSpec = Window.partitionBy("userOrgID").orderBy(col("total_points").desc)
val rankedDF = kPointsWithUserOrgDF.withColumn("rank", rank().over(windowSpec))
val top10LearnersByMDODF = rankedDF.filter($"rank" <= 10)
val top10LearnersByMDODF = rankedDF.filter(col("rank") <= 10)
val jsonStringDF = top10LearnersByMDODF.withColumn("json_details", toJsonStringUDF(
$"userID", $"fullName", $"userOrgName", $"designation", $"userProfileImgUrl", $"total_points", $"rank"
)).groupBy("userOrgID").agg(collect_list($"json_details").as("top_learners"))
val resultDF = jsonStringDF.select($"userOrgID", to_json(struct($"top_learners")).alias("top_learners"))
col("userID"), col("fullName"), col("userOrgName"), col("designation"), col("userProfileImgUrl"), col("total_points"), col("rank")
)).groupBy("userOrgID").agg(collect_list(col("json_details")).as("top_learners"))
val resultDF = jsonStringDF.select(col("userOrgID"), to_json(struct(col("top_learners"))).alias("top_learners"))

Redis.dispatchDataFrame[String]("dashboard_top_10_learners_on_kp_by_user_org", resultDF, "userOrgID", "top_learners")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import DashboardUtil._
import java.time.{Instant, ZoneOffset, ZonedDateTime, LocalDate}
import java.format.DateTimeFormatter

import java.time.{Instant, LocalDate, ZoneOffset, ZonedDateTime}
import org.ekstep.analytics.framework.{FrameworkContext, StorageConfig}

import java.io.{File, Serializable}
import java.sql.Timestamp
import java.util
import java.util.UUID
import scala.collection.mutable.ListBuffer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ object NpsUpgradeJob extends optional.Application with IJob{
override def main(config: String)(implicit sc: Option[SparkContext], fc: Option[FrameworkContext]): Unit ={
implicit val sparkContext: SparkContext = sc.getOrElse(null);
JobLogger.log("Started executing Job")
JobDriver.run("batch", config, NpsUprgadeModel)
JobDriver.run("batch", config, NpsUpgradeModel)
JobLogger.log("Job Completed.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, current_date, expr, lit}
import org.apache.spark.sql.types.{DateType, StringType}
import DashboardUtil._
import DataUtil._
import org.ekstep.analytics.dashboard.DashboardUtil._
import org.ekstep.analytics.dashboard.DataUtil._
import org.ekstep.analytics.dashboard.{AbsDashboardModel, DashboardConfig}
import org.ekstep.analytics.framework.FrameworkContext

Expand Down

0 comments on commit 179382d

Please sign in to comment.