Skip to content

Commit

Permalink
Fix Timeouts (#239)
Browse files Browse the repository at this point in the history
* TigerGraph HTTP client timeouts are now handled

* Added some best practices on neptune cluster

* Removed print from static call linker

* Submitted changelog

* Added printlns

* Early stopping on empty changes

* Fixed potential issue with key pools

* Upgraded Slf4j

* Boosted keypool ranges

* Surrounded Neptune reset with more logging

* Fixed logging and build process

* Fixed order in DF test

* Changed logging level during tests
  • Loading branch information
DavidBakerEffendi authored Mar 1, 2022
1 parent eabd97b commit 451e04a
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 112 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Added

- More logging to the `clear` method in `NeptuneDriver`.

### Fixed

- Issue in `TigerGraphDriver` where HTTP client would timeout.
- Removed `slf4j.simple` from build that goes into JAR.

### Changed

- Using an explicit `GRAPHBINARY_V1D0` serializer for `NeptuneDriver` queries.
- Catching HTTP request failure when checking for node status in `NeptuneDriver`.

## [1.0.15] - 2022-02-24

### Added
Expand Down
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ val sttpVersion = "3.4.1"
val jacksonVersion = "2.13.1"
val scalajHttpVersion = "2.4.2"
val lz4Version = "1.8.0"
val slf4jVersion = "1.7.35"
val slf4jVersion = "1.7.36"
val logbackVersion = "1.2.10"
val scalatestVersion = "3.2.9"
val circeVersion = "0.14.1"

Expand Down Expand Up @@ -53,8 +54,8 @@ libraryDependencies ++= Seq(
"org.scalaj" % "scalaj-http_2.13" % scalajHttpVersion,
"org.lz4" % "lz4-java" % lz4Version,
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-simple" % slf4jVersion,
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"ch.qos.logback" % "logback-classic" % logbackVersion % Test,
"org.scalatest" %% "scalatest" % scalatestVersion % Test
) ++ Seq(
"io.circe" %% "circe-core",
Expand Down
104 changes: 55 additions & 49 deletions src/main/scala/com/github/plume/oss/Jimple2Cpg.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ class Jimple2Cpg {
val cpg = newEmptyCpg(outputPath)

val metaDataKeyPool = new IncrementalKeyPool(1, 100, driver.idInterval(1, 100))
val typesKeyPool = new IncrementalKeyPool(101, 1000100, driver.idInterval(101, 1000100))
val typesKeyPool = new IncrementalKeyPool(101, 2_000_100, driver.idInterval(101, 2_000_100))
val methodKeyPool =
new IncrementalKeyPool(
20001001,
30_001_001,
Long.MaxValue,
driver.idInterval(20001001, Long.MaxValue)
driver.idInterval(30_001_001, Long.MaxValue)
)

val sourceFileExtensions = Set(".class", ".jimple")
Expand All @@ -112,53 +112,59 @@ class Jimple2Cpg {

// Load classes into Soot
loadClassesIntoSoot(sourceFileNames)
if (!sootOnlyBuild) {
val codeToProcess = new PlumeDiffPass(sourceFileNames, driver).createAndApply()
if (sootOnlyBuild) return cpg
val codeToProcess = new PlumeDiffPass(sourceFileNames, driver).createAndApply()

if (codeToProcess.isEmpty) {
logger.info("No files have changed since last update. Exiting...")
return cpg
} else {
logger.info(s"Processing ${codeToProcess.size} new or changed program files")
logger.debug(s"Files to process are: $sourceFileNames")
}

// After the diff pass any changed types are removed. Remaining types should be black listed to avoid duplicates
val unchangedTypes = driver
.propertyFromNodes(NodeTypes.TYPE_DECL, PropertyNames.FULL_NAME)
.flatMap(_.get(PropertyNames.FULL_NAME))
.map(_.toString)
.toSet[String]
val unchangedNamespaces = driver
.propertyFromNodes(NodeTypes.NAMESPACE_BLOCK, PropertyNames.NAME)
.flatMap(_.get(PropertyNames.NAME))
.map(_.toString)
.toSet[String]

new PlumeMetaDataPass(cpg, language, Some(metaDataKeyPool), unchangedTypes)
.createAndApply(driver)

// Project Soot classes
val astCreator = new AstCreationPass(codeToProcess.toList, cpg, methodKeyPool)
astCreator.createAndApply(driver)
// Clear classes from Soot
G.reset()
new PlumeTypeNodePass(
astCreator.global.usedTypes.asScala.toList,
cpg,
Some(typesKeyPool),
unchangedTypes
).createAndApply(driver)

basePasses(cpg, driver, unchangedTypes, unchangedNamespaces).foreach(
_.createAndApply(driver)
)
controlFlowPasses(cpg).foreach(_.createAndApply(driver))
new PlumeReachingDefPass(cpg, unchangedTypes = unchangedTypes).createAndApply(driver)
new PlumeHashPass(cpg).createAndApply(driver)
driver match {
case x: OverflowDbDriver => x.removeExpiredPathsFromCache(unchangedTypes)
case _ =>
}

driver.buildInterproceduralEdges()
// After the diff pass any changed types are removed. Remaining types should be black listed to avoid duplicates
val unchangedTypes = driver
.propertyFromNodes(NodeTypes.TYPE_DECL, PropertyNames.FULL_NAME)
.flatMap(_.get(PropertyNames.FULL_NAME))
.map(_.toString)
.toSet[String]
val unchangedNamespaces = driver
.propertyFromNodes(NodeTypes.NAMESPACE_BLOCK, PropertyNames.NAME)
.flatMap(_.get(PropertyNames.NAME))
.map(_.toString)
.toSet[String]

new PlumeMetaDataPass(cpg, language, Some(metaDataKeyPool), unchangedTypes)
.createAndApply(driver)

// Project Soot classes
val astCreator = new AstCreationPass(codeToProcess.toList, cpg, methodKeyPool)
astCreator.createAndApply(driver)
// Clear classes from Soot
G.reset()
new PlumeTypeNodePass(
astCreator.global.usedTypes.asScala.toList,
cpg,
Some(typesKeyPool),
unchangedTypes
).createAndApply(driver)

basePasses(cpg, driver, unchangedTypes, unchangedNamespaces).foreach(
_.createAndApply(driver)
)
controlFlowPasses(cpg).foreach(_.createAndApply(driver))
new PlumeReachingDefPass(cpg, unchangedTypes = unchangedTypes).createAndApply(driver)
new PlumeHashPass(cpg).createAndApply(driver)
driver match {
case x: OverflowDbDriver => x.removeExpiredPathsFromCache(unchangedTypes)
case _ =>
}

driver.buildInterproceduralEdges()
cpg
} catch {
case e: Exception => e.printStackTrace(); throw e;
} finally {
clean()
}
Expand Down Expand Up @@ -197,15 +203,15 @@ class Jimple2Cpg {
nBlacklist: Set[String]
): Seq[PlumeCpgPassBase] = {
val namespaceKeyPool =
new IncrementalKeyPool(1000101, 2000200, driver.idInterval(1000101, 2000200))
new IncrementalKeyPool(1_000_101, 2_000_200, driver.idInterval(1_000_101, 2_000_200))
val filesKeyPool =
new IncrementalKeyPool(2000201, 3000200, driver.idInterval(2000201, 3000200))
new IncrementalKeyPool(2_000_201, 3_000_200, driver.idInterval(2_000_201, 3_000_200))
val typeDeclKeyPool =
new IncrementalKeyPool(3000201, 4000200, driver.idInterval(3000201, 4000200))
new IncrementalKeyPool(3_000_201, 4_000_200, driver.idInterval(3_000_201, 4_000_200))
val methodStubKeyPool =
new IncrementalKeyPool(4000101, 10001000, driver.idInterval(4000101, 10001000))
new IncrementalKeyPool(4_000_101, 10_001_000, driver.idInterval(4_000_101, 10_001_000))
val methodDecoratorKeyPool =
new IncrementalKeyPool(10001001, 20001000, driver.idInterval(10001001, 20001000))
new IncrementalKeyPool(10_001_001, 30_001_000, driver.idInterval(10_001_001, 30_001_000))
Seq(
new PlumeFileCreationPass(cpg, Some(filesKeyPool)),
new PlumeNamespaceCreator(cpg, Some(namespaceKeyPool), nBlacklist),
Expand Down
41 changes: 29 additions & 12 deletions src/main/scala/com/github/plume/oss/drivers/NeptuneDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import io.circe.generic.semiauto.deriveDecoder
import io.circe.{Decoder, jawn}
import org.apache.tinkerpop.gremlin.driver.Cluster
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection
import org.apache.tinkerpop.gremlin.driver.ser.Serializers
import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource
import org.slf4j.{Logger, LoggerFactory}
import scalaj.http.{Http, HttpOptions}
import sttp.model.Uri

import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.util.{Failure, Success, Try}

final class NeptuneDriver(
hostname: String,
Expand Down Expand Up @@ -44,6 +46,9 @@ final class NeptuneDriver(
.addContactPoints(hostname)
.port(port)
.enableSsl(true)
.maxInProcessPerConnection(32)
.maxSimultaneousUsagePerConnection(32)
.serializer(Serializers.GRAPHBINARY_V1D0)
.keyCertChainFile(keyCertChainFile)
.create()
}
Expand Down Expand Up @@ -77,42 +82,54 @@ final class NeptuneDriver(
val systemUri =
Uri("https", hostname, port)
.addPath(Seq("system"))
logger.info("Initiating database reset...")
val initResetResponse = Http(systemUri.toString())
.postForm(Seq("action" -> "initiateDatabaseReset"))
.option(HttpOptions.readTimeout(10000))
.option(HttpOptions.readTimeout(80000))
.asString
val token: String = jawn.decode[InitiateResetResponse](initResetResponse.body) match {
case Left(e) =>
e.printStackTrace()
throw new RuntimeException(s"Unable to initiate database reset! $e")
case Right(resetResponse: InitiateResetResponse) => resetResponse.payload.token
}
logger.info("Reset token acquired, performing database reset...")
val performResetResponse = Http(systemUri.toString())
.postForm(Seq("action" -> "performDatabaseReset", "token" -> token))
.option(HttpOptions.readTimeout(10000))
.option(HttpOptions.readTimeout(80000))
.asString
jawn.decode[PerformResetResponse](performResetResponse.body) match {
case Left(e) =>
logger.error("Unable to perform database reset!")
logger.error("Unable to perform database reset!", e)
throw e
case Right(resetResponse) =>
if (!resetResponse.status.contains("200"))
throw new RuntimeException("Unable to perform database reset!")
if (!resetResponse.status.contains("200")) {
throw new RuntimeException(s"Unable to perform database reset! $resetResponse")
}

val statusUri = Uri("https", hostname, port).addPath(Seq("status"))
Iterator
.continually(
jawn.decode[InstanceStatusResponse](
Http(statusUri.toString())
.option(HttpOptions.readTimeout(10000))
.asString
.body
)
Try(
jawn.decode[InstanceStatusResponse](
Http(statusUri.toString())
.option(HttpOptions.readTimeout(80000))
.asString
.body
)
) match {
case Failure(exception) => Left(exception)
case Success(value) => value
}
)
.takeWhile {
case Left(e) => logger.warn("Unable to obtain instance status", e); true
case Left(e) =>
e.printStackTrace(); logger.warn("Unable to obtain instance status", e); true
case Right(response) => response.status != "healthy"
}
.foreach(_ => Thread.sleep(5000))
}
logger.info("Database reset complete, re-connecting to cluster.")
cluster = connectToCluster
}
}
Expand Down
45 changes: 29 additions & 16 deletions src/main/scala/com/github/plume/oss/drivers/TigerGraphDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import sttp.model.{MediaType, Uri}
import java.io.{ByteArrayOutputStream, IOException, PrintStream}
import java.security.Permission
import scala.collection.mutable
import scala.concurrent.duration.{Duration, DurationInt}
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.util.Try
import scala.util.{Failure, Success, Try}

/** The driver used to communicate to a remote TigerGraph instance. One must build a schema on the first use of the database.
*/
Expand All @@ -30,16 +31,17 @@ final class TigerGraphDriver(
username: String = DEFAULT_USERNAME,
password: String = DEFAULT_PASSWORD,
timeout: Int = DEFAULT_TIMEOUT,
secure: Boolean = false,
scheme: String = "http",
txMax: Int = DEFAULT_TX_MAX,
authKey: String = ""
) extends IDriver
with ISchemaSafeDriver {

private val logger = LoggerFactory.getLogger(classOf[TigerGraphDriver])
private val scheme = if (secure) "https" else "http"
private val api = Uri(scheme, hostname, restPpPort)
private val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend()
private val logger = LoggerFactory.getLogger(classOf[TigerGraphDriver])
private val api = Uri(scheme, hostname, restPpPort)
private val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend(
SttpBackendOptions.connectionTimeout(timeout.milliseconds)
)

implicit val payloadEncoder: Encoder[PayloadBody] =
Encoder.forProduct2("vertices", "edges")(u => (u.vertices, u.edges))
Expand Down Expand Up @@ -358,6 +360,7 @@ final class TigerGraphDriver(
}

private def unpackUnboxingException(e: ResponseException[String, circe.Error]): Exception = {
e.printStackTrace()
e match {
case HttpError(body, statusCode) =>
logger.error(s"HTTP Error $statusCode: $body")
Expand All @@ -373,23 +376,33 @@ final class TigerGraphDriver(
params: Seq[(String, String)]
): Seq[Json] = {
val uri = buildUri(endpoint).addParams(params: _*)
val response = request()
.get(uri)
.response(asJson[TigerGraphResponse])
.send(backend)
unboxResponse(response)
Try(
request()
.get(uri)
.readTimeout(Duration.Inf)
.response(asJson[TigerGraphResponse])
.send(backend)
) match {
case Failure(e) => logger.error(s"HTTP GET Request error.", e); throw e
case Success(response) => unboxResponse(response)
}
}

private def get(
endpoint: String,
params: Map[String, Any] = Map.empty[String, Any]
): Seq[Json] = {
val uri = buildUri(endpoint, params)
val response = request()
.get(uri)
.response(asJson[TigerGraphResponse])
.send(backend)
unboxResponse(response)
Try(
request()
.get(uri)
.readTimeout(Duration.Inf)
.response(asJson[TigerGraphResponse])
.send(backend)
) match {
case Failure(e) => logger.error(s"HTTP GET Request error.", e); throw e
case Success(response) => unboxResponse(response)
}
}

private def post(endpoint: String, payload: PayloadBody): Seq[Json] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class IncrementalKeyPool(val first: Long, val last: Long, private val usedIds: S
throw new IllegalStateException("Call to `next` on invalidated IncrementalKeyPool.")
}
var n = cur.incrementAndGet()
while (n < last) {
while (n <= last) {
if (!usedIds.contains(n)) return n
else n = cur.incrementAndGet()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class PlumeDiffPass(filenames: List[String], driver: IDriver) {
.map(_.getAbsolutePath)
.toSeq
if (changedFiles.nonEmpty) {
logger.info(s"Detected changes in the following files: ${changedFiles.mkString(", ")}")
logger.debug(s"Detected changes in the following files: ${changedFiles.mkString(", ")}")
driver.removeSourceFiles(changedFiles: _*)
}
val newFiles = partIterator
Expand Down
Loading

0 comments on commit 451e04a

Please sign in to comment.