Skip to content

Commit

Permalink
Reorganize lessons
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian Herrera committed Jul 21, 2024
1 parent 1eedf38 commit 2a05a5a
Show file tree
Hide file tree
Showing 116 changed files with 399 additions and 2,488 deletions.
6 changes: 6 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
16 changes: 16 additions & 0 deletions .github/workflows/check-dependencies-updates.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
on:
schedule:
- cron: '0 6 * * 1-5'

name: 🍄 Check dependencies updates

permissions:
contents: write
pull-requests: write

jobs:
scala-steward:
runs-on: ubuntu-22.04
name: Check Scala project dependencies updates with Scala Steward
steps:
- uses: scala-steward-org/scala-steward-action@v2
25 changes: 25 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: CI

on:
push:
branches:
- main
pull_request:

permissions:
contents: read

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v4
with:
distribution: 'zulu'
java-version: '21'
cache: 'sbt'
- name: 👌 Run "pre-push" tasks (compile and style-check)
run: sbt prep
- name: ✅ Run test
run: sbt test
16 changes: 16 additions & 0 deletions .github/workflows/update-github-dependency-graph.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: Update GitHub Dependency Graph

on:
push:
branches:
- main

permissions:
contents: write

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: scalacenter/sbt-dependency-submission@v3
13 changes: 10 additions & 3 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
version=2.5.2
version = 3.8.2
runner.dialect = scala213
style = default
maxColumn = 120
continuationIndent.callSite = 2
align.preset = more
maxColumn = 80
importSelectors = singleLine
runner.optimizer.forceConfigStyleMinArgCount = 1
rewrite.rules = [SortImports]
importSelectors = singleLine
project.excludeFilters = ["target/"]
project.git = true # Only format files tracked by git
40 changes: 5 additions & 35 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,38 +1,8 @@
scalaVersion := "2.12.12"
version := "0.1.0-SNAPSHOT"
name := "spark-for-programmers-course"
organization := "com.codely"
Settings.settings

val sparkVesion = "3.5.0"
libraryDependencies := Dependencies.all

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVesion,
"org.apache.spark" %% "spark-sql" % sparkVesion,
"org.apache.spark" %% "spark-hive" % sparkVesion,
"org.apache.spark" %% "spark-streaming" % sparkVesion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVesion,
"io.delta" %% "delta-spark" % "3.1.0",
// "com.amazonaws" % "aws-java-sdk-bundle" % "1.11.375",
"org.apache.hadoop" % "hadoop-aws" % "3.2.2",
"com.rabbitmq" % "amqp-client" % "5.12.0",
"com.typesafe" % "config" % "1.4.1",
//"org.apache.hadoop" % "hadoop-common" % "3.3.1",
"org.scalatest" %% "scalatest" % "3.2.18" % Test,
"org.scalatest" %% "scalatest-flatspec" % "3.2.18" % Test,
"com.dimafeng" %% "testcontainers-scala" % "0.40.12" % Test,
"com.dimafeng" %% "testcontainers-scala-kafka" % "0.40.12" % Test,
"com.dimafeng" %% "testcontainers-scala-postgresql" % "0.41.4" % Test,
"org.postgresql" % "postgresql" % "9.4.1207" % Test,
"org.mockito" %% "mockito-scala" % "1.16.42" % Test
)

assembly / mainClass := Some(
"com.codely.lesson_07_spark_optimize_and_monitoring.video_01__deploy_application.DeploySparkApp"
)

assembly / assemblyMergeStrategy := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") =>
MergeStrategy.first
case _ => MergeStrategy.first
SbtAliases.aliases.flatMap {
case (alias, command) =>
addCommandAlias(alias, command)
}
8 changes: 8 additions & 0 deletions doc/hooks/install-hooks.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/sh

cd "$(dirname "$0")/../.."

rm -rf .git/hooks

ln -s ../doc/hooks .git/hooks
sudo chmod -R 777 doc/hooks/*
50 changes: 50 additions & 0 deletions doc/hooks/pre-push
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/bin/bash

# Checks if locally staged changes are formatted properly ignoring non-staged changes.
# Install it with the `install-hooks.sh` script
# Based on: https://gist.github.com/cvogt/2676ed6c6d1abafa3d6a

PATH=$PATH:/usr/local/bin:/usr/local/sbin

echo ""
echo "Running pre-push hook… (you can omit this with --no-verify, but don't)"

echo "* Moving to the project directory…"
_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
DIR=$( echo $_DIR | sed 's/\/.git\/hooks$//' )

echo "* Stashing non-staged changes so we avoid checking them…"
git diff --quiet
hadNoNonStagedChanges=$?

if ! [ $hadNoNonStagedChanges -eq 0 ]
then
git stash --keep-index -u > /dev/null
fi

echo "* Checking pre push conditions ('prep' SBT task)…"
sbt prep > /dev/null
canPush=$?

if [ $canPush -ne 0 ]
then
echo " [KO] Error :("
fi

echo "* Applying the stash with the non-staged changes…"
if ! [ $hadNoNonStagedChanges -eq 0 ]
then
sleep 1 && git stash pop --index > /dev/null & # sleep because otherwise commit fails when this leads to a merge conflict
fi

# Final result
echo ""

if [ $canPush -eq 0 ]
then
echo "[OK] Your code will be pushed young Padawan"
exit 0
else
echo "[KO] Cancelling push due to test code style error (run 'sbt prep' for more information)"
exit 1
fi
20 changes: 20 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import sbt._

object Dependencies {
private val prod = Seq(
"com.github.nscala-time" %% "nscala-time" % "2.32.0",
"com.lihaoyi" %% "pprint" % "0.9.0",
"org.apache.spark" %% "spark-core" % "3.5.0" % Provided,
"org.apache.spark" %% "spark-sql" % "3.5.0" % Provided,
"org.apache.spark" %% "spark-streaming" % "3.5.0",
"org.apache.spark" %% "spark-hive" % "3.5.0",
"io.delta" %% "delta-spark" % "3.1.0",
"org.apache.hadoop" % "hadoop-aws" % "3.2.2"
)
private val test = Seq(
"org.scalatest" %% "scalatest" % "3.2.19",
"org.mockito" %% "mockito-scala" % "1.16.42"
).map(_ % Test)

val all: Seq[ModuleID] = prod ++ test
}
15 changes: 15 additions & 0 deletions project/SbtAliases.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
object SbtAliases {
val aliases: Seq[(String, String)] = Seq(
"t" -> "test",
"to" -> "testOnly",
"tq" -> "testQuick",
"tsf" -> "testShowFailed",
"c" -> "compile",
"tc" -> "Test / compile",
"f" -> "scalafmt", // Format production files according to ScalaFmt
"fc" -> "scalafmtCheck", // Check if production files are formatted according to ScalaFmt
"tf" -> "Test / scalafmt", // Format test files according to ScalaFmt
"tfc" -> "Test / scalafmtCheck", // Check if test files are formatted according to ScalaFmt
"prep" -> ";c;tc;fc;tfc" // All the needed tasks before pushing to the repository (compile, compile test, format check in prod and test)
)
}
44 changes: 44 additions & 0 deletions project/Settings.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import sbt.Keys._
import sbt.io.syntax._
import sbt.{Compile, Test, TestFrameworks, Tests, Configuration => _}

object Settings {
val settings = Seq(
name := "spark-for-devs-course",
version := "0.1.0-SNAPSHOT",
scalaVersion := "2.12.12",
organization := "com.codely",
organizationName := "com.codely, Inc.",
organizationHomepage := Some(url("https://com.codely")),
// Custom folders path (remove the `/scala` default subdirectory)
Compile / scalaSource := file(
(baseDirectory.value / "src" / "main").toString
),
Test / scalaSource := file((baseDirectory.value / "src" / "test").toString),
// Compiler options
scalacOptions ++= Seq(
"-deprecation", // Warnings deprecation
"-feature", // Advise features
"-unchecked", // More warnings. Strict
"-Xlint", // More warnings when compiling
"-Ywarn-dead-code",
"-Ywarn-unused"
),
Test / scalacOptions += "-Xcheckinit", // Check against early initialization only in tests because it's expensive
javaOptions += "-Duser.timezone=UTC",
// Test options
Test / parallelExecution := false,
Test / testForkedParallel := false,
Test / fork := true,
Test / testOptions ++= Seq(
Tests.Argument(
TestFrameworks.ScalaTest,
"-u",
"target/test-reports"
), // Save test reports
Tests.Argument(
"-oDF"
) // Show full stack traces and time spent in each test
)
)
}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.9.8
sbt.version = 1.10.1
1 change: 0 additions & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ object FromCSVToSQL extends App {
.getOrCreate()

val pathNetflixFile =
"src/main/scala/com/codely/lesson_01__discover_apache_spark/video_01__from_excel_to_sql/data/netflix_titles.csv"
"src/main/com/codely/lesson_01__discover_apache_spark/video_01__from_excel_to_sql/data/netflix_titles.csv"

spark.read
.csv(pathNetflixFile)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.codely.lesson_01__discover_apache_spark.video_03__intro_domain_events_analysis
package com.codely.lesson_01__discover_apache_spark.video_04__intro_domain_events_analysis

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{col, desc, explode, lit, month}
Expand All @@ -15,7 +15,7 @@ private object HighPriceProductsPurchased extends App {
spark.sparkContext.setLogLevel("WARN")

val purchasedCompletedFilePath =
"src/main/scala/com/codely/lesson_01__discover_apache_spark/video_03__intro_domain_events_analysis/data/purchasecompleted.json"
"src/main/com/codely/lesson_01__discover_apache_spark/video_04__intro_domain_events_analysis/data/purchasecompleted.json"

spark.read
.format("json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object PlatformAccessAnalysis extends App {

// 2. Read data
val accessEventFilePath =
"src/main/scala/com/codely/lesson_01__discover_apache_spark/z_practical_exercise/data/accessevent.json"
"src/main/com/codely/lesson_01__discover_apache_spark/z_practical_exercise/data/accessevent.json"
val accessEventDF = spark.read.json(accessEventFilePath)

accessEventDF.show()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ object CartActivityAnalysis extends App {

val addedToCartDF =
spark.read.json(
"src/main/scala/com/codely/lesson_02__analyze_domain_events/video_01__analysis_products_added_to_cart/data/addedToCart.json"
"src/main/com/codely/lesson_02__analyze_domain_events/video_01__analysis_products_added_to_cart/data/addedToCart.json"
)

addedToCartDF
.select(col("userId"), col("timestamp"), col("products"))
addedToCartDF.select("userId", "timestamp", "products")
addedToCartDF.selectExpr("userId", "timestamp", "products")

import spark.implicits._
addedToCartDF.select($"userId", $"timestamp", $"products")
addedToCartDF.select('userId, 'timestamp, 'products)
addedToCartDF.select("userId", "timestamp", "products")
addedToCartDF.selectExpr("userId", "timestamp", "products")

addedToCartDF.filter(size(col("products")) === 1)
addedToCartDF.filter("size(products) == 1")
Expand Down Expand Up @@ -71,20 +71,22 @@ object CartActivityAnalysis extends App {
expr("(products[0].quantity * products[0].price) as Total")
)

addedToCartDF
.filter("size(products) == 1")
.select(
col("timestamp").as("EventPublished"),
col("userId"),
col("products"),
expr(
"(products[0].quantity * products[0].price) as Total"
val onlyOneProductAddedToCartDF =
addedToCartDF
.filter("size(products) == 1")
.select(
col("timestamp").as("EventPublished"),
col("userId"),
col("products"),
expr(
"(products[0].quantity * products[0].price) as Total"
)
)
)
.withColumn(
"date",
to_date(col("EventPublished"), "yyyy-MM-dd'T'HH:mm:ss'Z'")
)
.drop("EventPublished")
.show(false)
.withColumn(
"date",
to_date(col("EventPublished"), "yyyy-MM-dd'T'HH:mm:ss'Z'")
)
.drop("EventPublished")

onlyOneProductAddedToCartDF.show(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@ object UserActivityAnalysis extends App {
private def readJson(path: String): DataFrame = spark.read.json(path)

val viewedDF = readJson(
"src/main/scala/com/codely/lesson_02__analyze_domain_events/video_02_user_activity_analysis/data/productViewed.json"
"src/main/com/codely/lesson_02__analyze_domain_events/video_02_user_activity_analysis/data/productViewed.json"
)

val addedToCartDF = readJson(
"src/main/scala/com/codely/lesson_02__analyze_domain_events/video_02_user_activity_analysis/data/addedToCart.json"
"src/main/com/codely/lesson_02__analyze_domain_events/video_02_user_activity_analysis/data/addedToCart.json"
)

/* It fails due to the fact that the columns are not in the same order
/*
viewedDF
.union(addedToCartDF)
.show(false)
*/

/* It fails due to the fact there are missing columns
/*
viewedDF
.unionByName(addedToCartDF)
.show(false)
Expand Down
Loading

0 comments on commit 2a05a5a

Please sign in to comment.