Skip to content

Commit

Permalink
Upgraded NPS logic KB-5727
Browse files Browse the repository at this point in the history
  • Loading branch information
varshamahuli97 committed Jul 5, 2024
1 parent ed65bb2 commit 27b91e3
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import DashboardUtil._
import java.time.{Instant, ZoneOffset, ZonedDateTime, LocalDate}
import.java.format.DateTimeFormatter
import org.ekstep.analytics.framework.{FrameworkContext, StorageConfig}

import java.io.{File, Serializable}
import java.util
import java.sql.Timestamp
import java.util.UUID
import scala.collection.mutable.ListBuffer



object DataUtil extends Serializable {

/*
Expand Down Expand Up @@ -432,7 +436,6 @@ object DataUtil extends Serializable {

val df = userDF.join(joinOrgDF, Seq("userOrgID"), "left")
show(df, "userOrgDataFrame")

df
}

Expand Down Expand Up @@ -1592,6 +1595,17 @@ object DataUtil extends Serializable {
df
}

def npsUpgradedTriggerC1DataFrame()(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = {
val query = """SELECT userID as userid FROM \"nps-users-data-upgraded\" where __time >= CURRENT_TIMESTAMP - INTERVAL '15' DAY"""
var df = druidDFOption(query, conf.sparkDruidRouterHost, limit = 1000000).orNull
if(df == null) return emptySchemaDataFrame(Schema.npsUserIds)
df = df.na.drop(Seq("userid"))
df
}




def npsTriggerC2DataFrame()(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = {
val query = """(SELECT DISTINCT(userID) as userid FROM \"dashboards-user-course-program-progress\" WHERE __time = (SELECT MAX(__time) FROM \"dashboards-user-course-program-progress\") AND courseCompletedTimestamp >= TIMESTAMP_TO_MILLIS(__time + INTERVAL '5:30' HOUR TO MINUTE - INTERVAL '3' MONTH) / 1000.0 AND category IN ('Course','Program') AND courseStatus IN ('Live', 'Retired') AND dbCompletionStatus = 2) UNION ALL (SELECT uid as userid FROM (SELECT SUM(total_time_spent) AS totalTime, uid FROM \"summary-events\" WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '3' MONTH AND dimensions_type='app' GROUP BY 2) WHERE totalTime >= 7200)"""
var df = druidDFOption(query, conf.sparkDruidRouterHost, limit = 1000000).orNull
Expand All @@ -1600,13 +1614,43 @@ object DataUtil extends Serializable {
df
}

def npsUpgradedTriggerC2DataFrame()(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = {
// val formatter = DateTimeFormatter.ofPattern("yyyy-mm-dd hh:mm:ss")
val currentDate = LocalDate.now()
val currentTimestamp = Timestamp.valueOf(currentDate.atStartOfDay())
val fifteenDaysAgoDate = LocalDate.now().minusDays(15)
val fifteenDaysAgoTimestamp = Timestamp.valueOf(fifteenDaysAgoDate.atStartOfDay())

var enrolmentDF = cache.load("enrolment")
show(enrolmentDF, "This is the enrolmentDF")
// println("This is the schema :"+ enrolmentDF.schema())
enrolmentDF = enrolmentDF.filter((col("completedon").between(fifteenDaysAgoTimestamp, currentTimestamp)) ||
(col("enrolled_date").between(fifteenDaysAgoTimestamp, currentTimestamp))).select("userid").distinct()
enrolmentDF
}

def npsTriggerC3DataFrame()(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = {
var df = mongodbTableAsDataFrame(conf.mongoDatabase, conf.mongoDBCollection)
if (df == null) return emptySchemaDataFrame(Schema.npsUserIds)
df = df.na.drop(Seq("userid"))
df
}

def npsUpgradedTriggerC3DataFrame()(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = {
val timeUUIDToTimestampMills = udf((timeUUID: String) => (UUID.fromString(timeUUID).timestamp() - 0x01b21dd213814000L) / 10000)
val currentDate = current_timestamp()
// Calculate start milliseconds of current date
val currentStartMillis = unix_timestamp(date_trunc("day", currentDate)) * 1000
// Calculate start milliseconds of 15 days ago
val fifteenDaysAgo = currentDate - expr("interval 15 days")
val fifteenDaysAgoStartMillis = unix_timestamp(date_trunc("day", fifteenDaysAgo)) * 1000
val ratingsDF = cache.load("rating")
.withColumn("rated_on", timeUUIDToTimestampMills(col("createdon")))
.where(s"rated_on >= '${fifteenDaysAgoStartMillis}' AND rated_on <= '${currentStartMillis}'")
.select("userid").distinct()
ratingsDF
}

def userFeedFromCassandraDataFrame()(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = {
var df = cassandraTableAsDataFrame(conf.cassandraUserFeedKeyspace, conf.cassandraUserFeedTable)
.select(col("userid").alias("userid"))
Expand All @@ -1616,6 +1660,15 @@ object DataUtil extends Serializable {
df
}

def userUpgradedFeedFromCassandraDataFrame()(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = {
var df = cassandraTableAsDataFrame(conf.cassandraUserFeedKeyspace, conf.cassandraUserFeedTable)
.select(col("userid").alias("userid"))
.where(col("category") === "NPS2")
if(df == null) return emptySchemaDataFrame(Schema.npsUserIds)
df = df.na.drop(Seq("userid"))
df
}

def acbpDetailsDF()(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = {
val df = cache.load("acbp")
.select(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.ekstep.analytics.dashboard.survey.npsUpgrade

import org.apache.spark.SparkContext
import org.ekstep.analytics.framework.util.JobLogger
import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobDriver}

object NpsUpgradeJob extends optional.Application with IJob{
implicit val className = "org.ekstep.analytics.dashboard.survey.npsUpgrade.NpsUpgradeJob"
override def main(config: String)(implicit sc: Option[SparkContext], fc: Option[FrameworkContext]): Unit ={
implicit val sparkContext: SparkContext = sc.getOrElse(null);
JobLogger.log("Started executing Job")
JobDriver.run("batch", config, NpsUprgadeModel)
JobLogger.log("Job Completed.")
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.ekstep.analytics.dashboard.survey.npsUpgrade

import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, current_date, expr, lit}
import org.apache.spark.sql.types.{DateType, StringType}
import DashboardUtil._
import DataUtil._
import org.ekstep.analytics.dashboard.{AbsDashboardModel, DashboardConfig}
import org.ekstep.analytics.framework.FrameworkContext


object NpsUpgradeModel extends AbsDashboardModel {
implicit val className: String = "org.ekstep.analytics.dashboard.survey.npsUpgrade.NpsUpgradeModel"

override def name() = "NpsModel"
def processData(timestamp: Long) (implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = {

val usersSubmittedRejectedNPSDF = npsUpgradedTriggerC1DataFrame() // gives user data from druid who have received the popup in last 15 days
val usersEnrolledCompletedCourseDF = npsUpgradedTriggerC2DataFrame() // gives user data from cassandra who have enrolled / completed atleast 1 course in last 15 days
val usersRatedCouseDF = npsUpgradedTriggerC3DataFrame() // gives user data who have rated atleast one course in last 15 days

val usersSubmittedRejectedNPSDFCount = usersSubmittedRejectedNPSDF.count()
println(s"DataFrame Count for set of users who have submitted the form in last 15 days: $usersSubmittedRejectedNPSDFCount")
val usersEnrolledCompletedCourseDFCount = usersEnrolledCompletedCourseDF.count()
println(s"DataFrame Count for set of users who have enrolled or completed into atleast 1 course in last 15 days: $usersEnrolledCompletedCourseDFCount")
val usersRatedCouseDFCount = usersRatedCouseDF.count()
println(s"DataFrame Count for set of users who have rated atleast 1 course in last 15 days: $usersRatedCouseDFCount")

var df = usersEnrolledCompletedCourseDF.union(usersRatedCouseDF)
df = df.dropDuplicates("userid")
df = df.na.drop(Seq("userid"))

var filteredDF = df.except(usersSubmittedRejectedNPSDF)
filteredDF = filteredDF.na.drop(Seq("userid"))


val totalCount = df.count()
println(s"DataFrame Count for users who are eligible: $totalCount")

val filteredCount = filteredDF.count()
println(s"DataFrame Count for set of users who are eligible and not filled form: $filteredCount")

// check if the feed for these users is already there
val cassandraDF = userUpgradedFeedFromCassandraDataFrame()
val existingFeedCount = cassandraDF.count()
println(s"DataFrame Count for users who have feed data: $existingFeedCount")
val storeToCassandraDF = filteredDF.except(cassandraDF)
var filteredStoreToCassandraDF = storeToCassandraDF.filter(col("userid").isNotNull && col("userid") =!= "" && col("userid") =!= "''")
filteredStoreToCassandraDF = filteredStoreToCassandraDF.dropDuplicates("userid")
val finalFeedCount = filteredStoreToCassandraDF.count()
println(s"DataFrame Count for final set of users to create feed: $finalFeedCount")

//create an additional dataframe that has columns of user_feed table as we have to insert thes userIDS to user_feed table

val additionalDF = filteredStoreToCassandraDF.withColumn("category", lit("NPS2"))
.withColumn("id", expr("uuid()").cast(StringType))
.withColumn("createdby", lit("platform_rating"))
.withColumn("createdon", current_date())
.withColumn("action", lit("{\"dataValue\":\"yes\",\"actionData\":{\"formId\":"+conf.platformRatingSurveyId+"}}"))
.withColumn("expireon", lit(null).cast(DateType))
.withColumn("priority", lit(1))
.withColumn("status", lit("unread"))
.withColumn("updatedby", lit(null).cast(StringType))
.withColumn("updatedon", lit(null).cast(DateType))
.withColumn("version", lit("v1"))
show(additionalDF)

// write the dataframe to cassandra user_feed table
additionalDF.write
.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> conf.cassandraUserFeedKeyspace , "table" -> conf.cassandraUserFeedTable))
.mode("append")
.save()

// write the dataframe to cassandra user_feed_backup table
additionalDF.write
.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "sunbird_notifications" , "table" -> "notification_feed_history"))
.mode("append")
.save()
}
}

0 comments on commit 27b91e3

Please sign in to comment.