Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Commit

Permalink
Validate all example and sample applications (#1059)
Browse files Browse the repository at this point in the history
* Run validations for all Maven and sbt samples

* Fallback to dynver when CLOUDFLOW_VERSION env var is set but empty

* Remove duplicated jobs in GH actions

* Read project version from a file

* Split testing core, sbt samples, and maven samples in distinct jobs

* Fix sbt samples compilation issues

* Update scala-maven-plugin

* Do not avoid fail-fast strategy for gh action jobs

* Use ubuntu-20.04 instead of latest

* Do not use a file to store the project version

* Only run scalafmt if there is a configuration file for the sample

* Initialize sbt when getting CF version for Maven samples

* Test examples only on Java 8

Co-authored-by: Andrea Peruffo <[email protected]>
  • Loading branch information
Marcos Pereira and andreaTP authored Jul 30, 2021
1 parent c7e2391 commit 13c0cd6
Show file tree
Hide file tree
Showing 77 changed files with 516 additions and 443 deletions.
25 changes: 10 additions & 15 deletions .github/workflows/build-pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,29 @@ name: Build PRs
on:
pull_request:
jobs:
build-all:
runs-on: ubuntu-latest
run-scripted-tests:
runs-on: ubuntu-20.04
strategy:
matrix:
java: [ '8', '11', '13' ]
steps:
- name: Checkout
uses: actions/checkout@v2

- uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup java
uses: joschi/setup-jdk@v2
with:
java-version: ${{ matrix.java }}
architecture: x64

- name: Cache SBT and Coursier cache
uses: coursier/cache-action@v3
- name: Scalafmt check
run: |
cd core
sbt "scalafmtCheckAll; scalafmtSbtCheck"
- name: Build documentation
run: |
sudo snap install yq
cd docs
make html-author-mode
- name: Run integration tests

- name: Run scripted tests
run: |
git config --global user.email "[email protected]"
git config --global user.name "Cloudflow CI"
cd core
sbt -mem 2048 +test
sbt -mem 2048 +publishLocal cloudflow-sbt-plugin/scripted
11 changes: 5 additions & 6 deletions .github/workflows/build-test-tools.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ on:
branches: [main]

jobs:
build:
runs-on: ubuntu-18.04
build-binary:
runs-on: ubuntu-20.04
steps:
- name: Checkout
uses: actions/checkout@v2

- name: Fetch tags
run: git fetch --depth=100 origin +refs/tags/*:refs/tags/*
- uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Checkout GitHub merge
if: github.event.pull_request
Expand Down
5 changes: 2 additions & 3 deletions .github/workflows/build_and_publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ on:
- main
jobs:
build-docs:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- name: Checkout
uses: actions/[email protected]
- uses: actions/checkout@v2
- run: |
git fetch --no-tags --prune --depth=1 origin +refs/heads/*:refs/remotes/origin/*
git fetch --depth=1 origin +refs/tags/*:refs/tags/*
Expand Down
69 changes: 54 additions & 15 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
name: Build and Test

on:
# Trigger the workflow on push or pull request,
# push only for the main branch
Expand All @@ -7,13 +9,11 @@ on:
- main
pull_request:


name: Build and Test
jobs:
build-and-test-docs:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2.3.3
- uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Build documentation
Expand All @@ -23,23 +23,62 @@ jobs:
cd docs
make html-author-mode
grep -r "page unresolved" target/staging/docs/ && exit 1 || echo 'ok'
build-and-test-scala:
runs-on: ubuntu-latest
build-and-test-core:
name: "Cloudflow Core"
runs-on: ubuntu-20.04
strategy:
matrix:
java: [ '8', '11', '13' ]
steps:
- uses: actions/checkout@v2

- name: Setup environment
run: |
make -C .github/workflows/setup setup-local-all
- uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Cache SBT and Coursier cache
uses: coursier/cache-action@v3

- name: Scalafmt check
run: cd core && sbt "scalafmtCheckAll; scalafmtSbtCheck"

- name: build-and-test
env:
SBT_OPTS: -Xms512M -Xmx2048M -Xss2M -XX:MaxMetaspaceSize=1024M
run: |
export PATH="$HOME/opt/sbt/bin:$PATH"
export SBT_OPTS="-Xss256M"
cd scripts
./build-all.sh +test
run: ./scripts/build-core.sh +test

build-and-test-sbt-examples:
name: "sbt samples"
runs-on: ubuntu-20.04
strategy:
matrix:
java: [ '8' ]
steps:

- uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Cache SBT and Coursier cache
uses: coursier/cache-action@v3

- name: test-sbt-examples
run: ./scripts/build-sbt-examples.sh test

build-and-test-mvn-examples:
name: "mvn samples"
runs-on: ubuntu-20.04
strategy:
matrix:
java: [ '8' ]
steps:

- uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Cache SBT and Coursier cache
uses: coursier/cache-action@v3

- name: test-maven-examples
run: ./scripts/build-mvn-examples.sh test
2 changes: 1 addition & 1 deletion .github/workflows/release-drafter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:

jobs:
update_release_draft:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
# Drafts your next Release notes as Pull Requests are merged
# https://github.com/raboof/release-drafter/releases
Expand Down
8 changes: 0 additions & 8 deletions .github/workflows/setup/Makefile

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
val latestVersion = {
sys.env.get("CLOUDFLOW_VERSION").fold(
sys.env.get("CLOUDFLOW_VERSION").filter(_.nonEmpty).fold(
sbtdynver.DynVer(None, "-", "v")
.getGitDescribeOutput(new java.util.Date())
.fold(throw new Exception("Failed to retrieve version"))(_.version("-"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,14 @@ class CallRecordSplitSpec extends AnyWordSpec with Matchers with ScalaFutures wi
val streamlet = new CallRecordSplit

val instant = Instant.now.toEpochMilli / 1000
val past = Instant.now.minus(5000, ChronoUnit.DAYS).toEpochMilli / 1000

val cr1 = CallRecord("user-1", "user-2", "f", 10L, instant)
val cr2 = CallRecord("user-1", "user-2", "f", 15L, instant)
val cr3 = CallRecord("user-1", "user-2", "f", 18L, instant)

val source = Source(Vector(cr1, cr2, cr3))

val in = testkit.inletFromSource(streamlet.in, source)
val in = testkit.inletFromSource(streamlet.in, source)
val left = testkit.outletAsTap(streamlet.left)
val right = testkit.outletAsTap(streamlet.right)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
val latestVersion = {
sys.env.get("CLOUDFLOW_VERSION").fold(
sys.env.get("CLOUDFLOW_VERSION").filter(_.nonEmpty).fold(
sbtdynver.DynVer(None, "-", "v")
.getGitDescribeOutput(new java.util.Date())
.fold(throw new Exception("Failed to retrieve version"))(_.version("-"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.4.9
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
package connectedcar.actors

import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.{ ActorRef, Behavior }
import akka.actor.typed.scaladsl.Behaviors

import connectedcar.data.{ConnectedCarAgg, ConnectedCarERecord}
import connectedcar.data.{ ConnectedCarAgg, ConnectedCarERecord }

case class ConnectedCarERecordWrapper(record: ConnectedCarERecord, sender:ActorRef[ConnectedCarAgg])
case class ConnectedCarERecordWrapper(record: ConnectedCarERecord, sender: ActorRef[ConnectedCarAgg])

object ConnectedCarActor {
def apply(carId:String): Behavior[ConnectedCarERecordWrapper] = {
def updated(numberOfRecords: Int, driverName: String, carId:String, averageSpeed: Double, currentSpeed: Double): Behavior[ConnectedCarERecordWrapper] = {
Behaviors.receive { (context, message) => {
context.log.info(s"Update Received- CarId: ${carId} MessageCarId: ${message.record.carId} From Actor: ${message.sender.path}")
def apply(carId: String): Behavior[ConnectedCarERecordWrapper] = {
def updated(numberOfRecords: Int,
driverName: String,
carId: String,
averageSpeed: Double,
currentSpeed: Double): Behavior[ConnectedCarERecordWrapper] =
Behaviors.receive { (context, message) =>
context.log.info(s"Update Received- CarId: ${carId} MessageCarId: ${message.record.carId} From Actor: ${message.sender.path}")

val newAverage = ((averageSpeed * numberOfRecords) + message.record.speed) / (numberOfRecords + 1)
val newNumberOfRecords = numberOfRecords+1
val newAverage = ((averageSpeed * numberOfRecords) + message.record.speed) / (numberOfRecords + 1)
val newNumberOfRecords = numberOfRecords + 1

val newAgg = ConnectedCarAgg(message.record.carId, message.record.driver, newAverage, newNumberOfRecords)
val newAgg = ConnectedCarAgg(message.record.carId, message.record.driver, newAverage, newNumberOfRecords)

message.sender ! newAgg
message.sender ! newAgg

updated(newNumberOfRecords, message.record.driver, carId, newAverage, message.record.speed)
}
updated(newNumberOfRecords, message.record.driver, carId, newAverage, message.record.speed)
}
}

updated(0, "", carId, 0, 0.0)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package connectedcar.streamlets

import akka.cluster.sharding.typed.scaladsl.{Entity, EntityTypeKey}
import akka.cluster.sharding.typed.scaladsl.{ Entity, EntityTypeKey }
import akka.util.Timeout
import akka.kafka.ConsumerMessage.CommittableOffset
import akka.stream.scaladsl.SourceWithContext
import cloudflow.akkastream.scaladsl.{FlowWithCommittableContext, RunnableGraphStreamletLogic}
import cloudflow.akkastream.scaladsl.{ FlowWithCommittableContext, RunnableGraphStreamletLogic }
import cloudflow.streamlets.StreamletShape
import cloudflow.streamlets.avro.{AvroInlet, AvroOutlet}
import connectedcar.actors.{ConnectedCarActor, ConnectedCarERecordWrapper}
import cloudflow.streamlets.avro.{ AvroInlet, AvroOutlet }
import connectedcar.actors.{ ConnectedCarActor, ConnectedCarERecordWrapper }

import scala.concurrent.duration._
import cloudflow.akkastream.{AkkaStreamlet, Clustering}
import connectedcar.data.{ConnectedCarAgg, ConnectedCarERecord}
import cloudflow.akkastream.{ AkkaStreamlet, Clustering }
import connectedcar.data.{ ConnectedCarAgg, ConnectedCarERecord }

class ConnectedCarCluster extends AkkaStreamlet with Clustering {
val in = AvroInlet[ConnectedCarERecord]("in")
Expand All @@ -23,9 +23,7 @@ class ConnectedCarCluster extends AkkaStreamlet with Clustering {

val entity = Entity(typeKey)(createBehavior = entityContext => ConnectedCarActor(entityContext.entityId))

val source:SourceWithContext[
ConnectedCarERecord,
CommittableOffset, _] = shardedSourceWithCommittableContext(in, entity)
val source: SourceWithContext[ConnectedCarERecord, CommittableOffset, _] = shardedSourceWithCommittableContext(in, entity)

val sharding = clusterSharding()

Expand All @@ -34,9 +32,9 @@ class ConnectedCarCluster extends AkkaStreamlet with Clustering {
implicit val timeout: Timeout = 3.seconds
def flow =
FlowWithCommittableContext[ConnectedCarERecord]
.mapAsync(5)(msg {
val carActor = sharding.entityRefFor(typeKey, msg.carId.toString)
carActor.ask[ConnectedCarAgg](ref => ConnectedCarERecordWrapper(msg, ref))
})
.mapAsync(5) { msg
val carActor = sharding.entityRefFor(typeKey, msg.carId.toString)
carActor.ask[ConnectedCarAgg](ref => ConnectedCarERecordWrapper(msg, ref))
}
}
}
Original file line number Diff line number Diff line change
@@ -1,56 +1,45 @@
package connectedcar.streamlet

import akka.actor._
import akka.cluster.typed.{Cluster, Join}
import akka.stream._
import akka.cluster.typed.{ Cluster, Join }
import akka.stream.scaladsl._
import akka.testkit._
import org.scalatest._
import org.scalatest.wordspec._
import org.scalatest.matchers.must._
import org.scalatest.concurrent._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.akkastream.testkit._
import cloudflow.akkastream.testkit.scaladsl._
import connectedcar.data.{ConnectedCarAgg, ConnectedCarERecord}
import connectedcar.data._
import connectedcar.streamlets.ConnectedCarCluster
import connectedcar.streamlets.RawCarDataGenerator.generateCarERecord
import akka.actor.typed.scaladsl.adapter._

class ConnectedCarClusterTest extends AnyWordSpec with Matchers with BeforeAndAfterAll {

private implicit val system = ActorSystem("AkkaStreamletSpec")
private val cluster = Cluster(system.toTyped)
private val cluster = Cluster(system.toTyped)

override def beforeAll: Unit = {
override def beforeAll: Unit =
cluster.manager ! Join(cluster.selfMember.address)
}

override def afterAll: Unit = {
override def afterAll: Unit =
TestKit.shutdownActorSystem(system)
}

"A ConnectedCarCluster streamlet" should {

val testkit = AkkaStreamletTestKit(system)

"Allow for creating a 'flow processor'" in {
val record = generateCarERecord()
val data = Vector(record)
val data = Vector(record)

val agg = ConnectedCarAgg(record.carId, record.driver, record.speed, 1)
val agg = ConnectedCarAgg(record.carId, record.driver, record.speed, 1)
val expectedData = Vector(agg)
val source = Source(data)
val proc = new ConnectedCarCluster
val in = testkit.inletFromSource(proc.in, source)
val out = testkit.outletAsTap(proc.out)

testkit.run(proc, in, out, () {
out.probe.receiveN(1) mustBe expectedData.map(d proc.out.partitioner(d) -> d)
})
val source = Source(data)
val proc = new ConnectedCarCluster
val in = testkit.inletFromSource(proc.in, source)
val out = testkit.outletAsTap(proc.out)

testkit.run(proc, in, out, () out.probe.receiveN(1) mustBe expectedData.map(d proc.out.partitioner(d) -> d))

out.probe.expectMsg(Completed)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
val latestVersion = {
sys.env.get("CLOUDFLOW_VERSION").fold(
sys.env.get("CLOUDFLOW_VERSION").filter(_.nonEmpty).fold(
sbtdynver.DynVer(None, "-", "v")
.getGitDescribeOutput(new java.util.Date())
.fold(throw new Exception("Failed to retrieve version"))(_.version("-"))
Expand Down
Loading

0 comments on commit 13c0cd6

Please sign in to comment.