Skip to content

Commit

Permalink
Changed ConnectionStrategy.kt for usage of coroutines #451
Browse files Browse the repository at this point in the history
  • Loading branch information
mprzypasniak99 committed Jan 3, 2022
1 parent 3914c08 commit b6276a3
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 30 deletions.
1 change: 1 addition & 0 deletions cogboard-app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation("$v-auth-jwt")
implementation("$v-web-client")
implementation("$v-rx-java2")
implementation("$v-lang-kotlin-coroutines")
}
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.10.0")
implementation(kotlin("stdlib-jdk8"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import com.mongodb.client.model.Filters.or
import com.mongodb.client.model.Indexes
import com.mongodb.client.model.ReplaceOptions
import com.mongodb.client.model.Sorts.ascending
import io.vertx.core.AbstractVerticle
import io.vertx.core.json.JsonArray
import io.vertx.core.json.JsonObject
import io.vertx.core.logging.Logger
Expand All @@ -26,35 +25,40 @@ import com.cognifide.cogboard.logStorage.model.LogStorageConfiguration
import com.cognifide.cogboard.logStorage.model.LogVariableData
import com.cognifide.cogboard.logStorage.model.QuarantineRule
import com.cognifide.cogboard.logStorage.model.LogCollectionConfiguration
import io.vertx.kotlin.coroutines.CoroutineVerticle
import kotlinx.coroutines.runBlocking
import java.net.URI
import java.time.Instant

class LogStorage(
private val config: LogStorageConfiguration,
private val storageConfig: LogStorageConfiguration,
private val connection: ConnectionStrategy,
private val parserStrategy: LogParserStrategy,
var rules: List<QuarantineRule> = emptyList()
) : AbstractVerticle() {
) : CoroutineVerticle() {

/** Returns the list of regexes of enabled rules. */
private val enabledRegexes: List<Regex>
get() = rules.filter { it.enabled }.map { it.regex }

override fun start() {
val deploymentId: String
get() = deploymentID

override suspend fun start() {
super.start()
logsCollection?.createIndex(Indexes.descending(Log.SEQ))
}

/** Returns a logs collection associated with this widget. */
private val logsCollection: MongoCollection<Document>?
get() = database?.getCollection(config.id)
get() = database?.getCollection(storageConfig.id)

// Storage configuration

/** Returns a logs collection configuration associated with this widget (if present). */
private val collectionConfiguration: LogCollectionConfiguration?
get() = configCollection
?.find(eq(Log.ID, config.id))
?.find(eq(Log.ID, storageConfig.id))
?.first()
?.let { LogCollectionConfiguration.from(it) }

Expand All @@ -64,12 +68,12 @@ class LogStorage(
val options = ReplaceOptions().upsert(true)
configCollection
?.replaceOne(
eq(LogCollectionConfiguration.ID, config.id),
eq(LogCollectionConfiguration.ID, storageConfig.id),
configuration.toDocument(),
options
)
} else {
configCollection?.deleteOne((eq(LogCollectionConfiguration.ID, config.id)))
configCollection?.deleteOne((eq(LogCollectionConfiguration.ID, storageConfig.id)))
}
}

Expand Down Expand Up @@ -102,7 +106,7 @@ class LogStorage(
private fun deleteOldLogs() {
val collection = logsCollection ?: return
val now = Instant.now().epochSecond
val beforeTimestamp = now - (config.expirationDays * DAY_TO_TIMESTAMP)
val beforeTimestamp = now - (storageConfig.expirationDays * DAY_TO_TIMESTAMP)
try {
val result = collection.deleteMany(lt(Log.INSERTED_ON, beforeTimestamp))
LOGGER.debug("Deleted ${result.deletedCount} old logs")
Expand All @@ -117,9 +121,9 @@ class LogStorage(
val collection = logsCollection ?: return

val size = database
.runCommand(Document(STATS_COMMAND, config.id))
.runCommand(Document(STATS_COMMAND, storageConfig.id))
.getInteger(STATS_SIZE) ?: 0
val maxSize = config.fileSizeMB * MB_TO_BYTES
val maxSize = storageConfig.fileSizeMB * MB_TO_BYTES
if (size > 0 && size > maxSize) {
val deleteFactor = ((size - maxSize).toDouble() / size)
val logCount = collection.countDocuments()
Expand Down Expand Up @@ -156,14 +160,16 @@ class LogStorage(

/** Downloads new logs and filters them by quarantine rules. */
private fun downloadFilterLogs(skipFirstLines: Long? = null): List<Log> {
val logs = connection
.getLogs(skipFirstLines)
.mapNotNull { parserStrategy.parseLine(it) }
.toMutableList()

// Filter the logs by quarantine rules
filter(logs)

val logs = mutableListOf<Log>()
runBlocking {
logs.addAll(connection
.getLogs(skipFirstLines)
.mapNotNull { parserStrategy.parseLine(it) }
)

// Filter the logs by quarantine rules
filter(logs)
}
return logs
}

Expand All @@ -190,7 +196,10 @@ class LogStorage(
var newLogs: List<Log> = emptyList()

// Get the number of lines in the file
val fileLineCount = connection.getNumberOfLines() ?: 0
var fileLineCount: Long = 0
runBlocking {
fileLineCount = connection.getNumberOfLines() ?: 0
}

if (fileLineCount > 0 && fileLineCount > lastLine) {
// Download new logs and append them
Expand All @@ -208,7 +217,7 @@ class LogStorage(
seq += newLogs.size

// Save the new configuration
saveConfiguration(LogCollectionConfiguration(config.id, lastLine, seq))
saveConfiguration(LogCollectionConfiguration(this.storageConfig.id, lastLine, seq))

return newLogs
}
Expand Down Expand Up @@ -236,7 +245,7 @@ class LogStorage(

// Fetch the logs from the database and send them back
val response = prepareResponse(insertedLogs)
vertx?.eventBus()?.send(config.eventBusAddress, response)
vertx?.eventBus()?.send(storageConfig.eventBusAddress, response)
}

/** Deletes all data associated with the widget. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ abstract class ConnectionStrategy {
return setOf(AuthenticationType.BASIC)
}

abstract fun getNumberOfLines(): Long?
abstract fun getLogs(skipFirstLines: Long?): Collection<String>
abstract suspend fun getNumberOfLines(): Long?
abstract suspend fun getLogs(skipFirstLines: Long?): Collection<String>
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class SSHConnectionStrategy(val config: JsonObject) : ConnectionStrategy() {
return setOf(AuthenticationType.BASIC, AuthenticationType.SSH_KEY)
}

override fun getNumberOfLines(): Long? {
override suspend fun getNumberOfLines(): Long? {
val logFilePath = config.getString(Props.PATH) ?: return null

return SSHClient(prepareConfig(config))
Expand All @@ -21,7 +21,7 @@ class SSHConnectionStrategy(val config: JsonObject) : ConnectionStrategy() {
?.toLongOrNull()
}

override fun getLogs(skipFirstLines: Long?): Collection<String> {
override suspend fun getLogs(skipFirstLines: Long?): Collection<String> {
val logFilePath = config.getString(Props.PATH) ?: return emptyList()
val command = skipFirstLines?.let { "tail -n +${it + 1} $logFilePath" } ?: "cat $logFilePath"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ class HttpConnectionStrategy(
private val token: String
get() = config.endpointProp(Props.TOKEN)

override fun getNumberOfLines(): Long? {
TODO("Not yet implemented")
override suspend fun getNumberOfLines(): Long? {
TODO()
}

override fun getLogs(skipFirstLines: Long?): Collection<String> {
override suspend fun getLogs(skipFirstLines: Long?): Collection<String> {
TODO("Not yet implemented")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class LogViewerWidget(

override fun stop(): Widget {
logStorage?.delete()
logStorage?.deploymentID()?.let { vertx.undeploy(it) }
logStorage?.deploymentId?.let { vertx.undeploy(it) }
consumer?.unregister()
return super.stop()
}
Expand Down

0 comments on commit b6276a3

Please sign in to comment.