Skip to content

Commit

Permalink
[GH-75] Too many put xxx into map log
Browse files Browse the repository at this point in the history
The old implementation list all jobs in the opCenter which is
unnecessary.  We should only focus on the job submitted by the
plugin.

Remove the full poll in the queue status update.  Add the job
entry when we submit it.

Remove the code related to the full job poll and clean up the logs.
  • Loading branch information
jealous committed Aug 20, 2024
1 parent b723887 commit 195a341
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 266 deletions.
12 changes: 8 additions & 4 deletions plugins/nf-float/src/main/com/memverge/nextflow/FloatBin.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import java.util.regex.Pattern
@Slf4j
class FloatBin {
private static final binName = 'float'
private static logged = false

/**
* This function checks if the float binary is available in the plugin
Expand Down Expand Up @@ -38,11 +39,11 @@ class FloatBin {
}
ret = targetDir.resolve(binName)
try {
log.info "try downloading $src to $ret"
log.info "[FLOAT] try downloading $src to $ret"
Global.download(src, ret)
ret.setExecutable(true)
} catch (Exception ex) {
log.warn("download ${binName} failed: ${ex.message}")
log.warn("[FLOAT] download ${binName} failed: ${ex.message}")
return Paths.get(binName)
}
}
Expand All @@ -67,11 +68,14 @@ class FloatBin {
for (String path : paths) {
def floatPath = Paths.get(path).resolve(binName)
if (Files.exists(floatPath)) {
log.info "found float binary in $path"
if (!logged) {
log.info "[FLOAT] found float binary in $path"
logged = true
}
return floatPath
}
}
log.info "${binName} binary not found"
log.warn "[FLOAT] ${binName} binary not found"
return null
}
}
18 changes: 10 additions & 8 deletions plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.exception.AbortOperationException
import nextflow.io.BucketParser
import org.apache.commons.lang.StringUtils
import nextflow.processor.TaskId

/**
* @author Cedric Zhuang <[email protected]>
Expand Down Expand Up @@ -194,7 +194,7 @@ class FloatConf {
}

private static void warnDeprecated(String deprecated, String replacement) {
log.warn "[float] config option `$deprecated` " +
log.warn "[FLOAT] config option `$deprecated` " +
"is no longer supported, " +
"use `$replacement` instead"
}
Expand All @@ -213,11 +213,13 @@ class FloatConf {
}
}

List<String> getCliPrefix(String address = "") {
validate()
if (StringUtils.length(address) == 0) {
address = addresses[0]
List<String> getCliPrefix(TaskId id) {
if (id == null) {
id = new TaskId(0)
}
final address = addresses[id.intValue() % (addresses.size())]
validate()

def bin = FloatBin.get(address)
List<String> ret = [
bin.toString(),
Expand All @@ -232,8 +234,8 @@ class FloatConf {
return ret
}

String getCli(String address = "") {
return getCliPrefix(address).join(" ")
String getCli(TaskId id) {
return getCliPrefix(id).join(" ")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import nextflow.util.ServiceName

import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.util.stream.Collectors

/**
* Float Executor with a shared file system
Expand All @@ -51,7 +50,7 @@ class FloatGridExecutor extends AbstractGridExecutor {

synchronized FloatJobs getFloatJobs() {
if (_floatJobs == null) {
_floatJobs = new FloatJobs(floatConf.addresses)
_floatJobs = new FloatJobs(getRunName())
}
return _floatJobs
}
Expand All @@ -72,7 +71,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
}

protected String getHeaderScript(TaskRun task) {
log.info "[float] switch task ${task.id} to ${task.workDirStr}"
log.info "[FLOAT] switch task ${task.id} to ${task.workDirStr}"
floatJobs.setWorkDir(task.id, task.workDir)

final path = Escape.path(task.workDir)
Expand Down Expand Up @@ -115,11 +114,10 @@ class FloatGridExecutor extends AbstractGridExecutor {
}

private void syncFloatBin() {
final oc = floatJobs.getOc("")
def cmd = floatConf.getCliPrefix(oc)
def cmd = floatConf.getCliPrefix(null)
cmd << "release" << "sync"
def res = Global.execute(cmd)
log.info "[float] sync the float binary, $res"
log.info "[FLOAT] sync the float binary, $res"
}

private int getMemory(TaskRun task) {
Expand Down Expand Up @@ -169,19 +167,14 @@ class FloatGridExecutor extends AbstractGridExecutor {
return ret
}

private List<String> getCmdPrefixForJob(String floatJobID) {
final oc = floatJobs.getOc(floatJobID)
return floatConf.getCliPrefix(oc)
}

FloatJob getJob(TaskId taskId) {
def nfJobID = floatJobs.getNfJobID(taskId)
def job = floatJobs.nfJobID2job.get(nfJobID)
def job = floatJobs.getJob(taskId)
if (job == null) {
log.warn "[float] job not found for nextflow task $taskId"
log.warn "[FLOAT] job not found for nextflow task $taskId: $nfJobID"
return null
}
def cmd = getCmdPrefixForJob(job.floatJobID)
def cmd = floatConf.getCliPrefix(taskId)
cmd << 'show'
cmd << '-j'
cmd << job.floatJobID
Expand All @@ -193,28 +186,16 @@ class FloatGridExecutor extends AbstractGridExecutor {
job = FloatJob.parse(res.out)
job = floatJobs.updateJob(job)
} else {
log.warn "[float] failed to retrieve job status $nfJobID, " +
log.warn "[FLOAT] failed to retrieve job status $nfJobID, " +
"float: ${job.floatJobID}, details: ${res.out}"
}
} catch (Exception e) {
log.warn "[float] failed to retrieve job status $nfJobID, " +
log.warn "[FLOAT] failed to retrieve job status $nfJobID, " +
"float: ${job.floatJobID}", e
}
return job
}

/**
* Retrieve the prefix of the submit command.
* use round robin for multiple op-center
*
* @return
*/
private List<String> getSubmitCmdPrefix(Integer index) {
final addresses = floatConf.addresses
final address = addresses[index % (addresses.size())]
return floatConf.getCliPrefix(address)
}

String toLogStr(List<String> floatCmd) {
def ret = floatCmd.join(" ")
final toReplace = [
Expand All @@ -232,7 +213,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
}

private static def warnDeprecated(String deprecated, String replacement) {
log.warn1 "[float] process `$deprecated` " +
log.warn1 "[FLOAT] process `$deprecated` " +
"is no longer supported, " +
"use $replacement instead"
}
Expand Down Expand Up @@ -264,7 +245,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
volumes << floatConf.getInputVolume(src.uri)
}
def ret = volumes.unique() - ""
log.info "[float] volumes to mount for ${task.id}: ${toLogStr(ret)}"
log.info "[FLOAT] volumes to mount for ${task.id}: ${toLogStr(ret)}"
return ret
}

Expand All @@ -282,9 +263,17 @@ class FloatGridExecutor extends AbstractGridExecutor {
: [:]
}

String getRunName() {
// replace _ with - to avoid float cli error
return session.runName
.replaceAll("_", "-")
.toLowerCase()
}

private Map<String, String> getCustomTags(TaskRun task) {
final result = new LinkedHashMap<String, String>(10)
result[FloatConf.NF_JOB_ID] = floatJobs.getNfJobID(task.id)
def nfJobID = floatJobs.getNfJobID(task.id)
result[FloatConf.NF_JOB_ID] = nfJobID
result[FloatConf.NF_SESSION_ID] = "uuid-${session.uniqueId}".toString()
result[FloatConf.NF_TASK_NAME] = task.name
result[FloatConf.FLOAT_INPUT_SIZE] = getInputFileSize(task).toString()
Expand All @@ -300,8 +289,9 @@ class FloatGridExecutor extends AbstractGridExecutor {
result[FloatConf.NF_PROJECT_NAME] = projName
}
}
if (session.runName) {
result[FloatConf.NF_RUN_NAME] = session.runName
def runName = getRunName()
if (runName) {
result[FloatConf.NF_RUN_NAME] = runName
}
final resourceLabels = task.config.getResourceLabels()
if (resourceLabels)
Expand Down Expand Up @@ -351,7 +341,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
} else {
log.info("got container image of the task ${container}")
}
def cmd = getSubmitCmdPrefix(task.index)
def cmd = floatConf.getCliPrefix(task.id)
cmd << 'submit'
getMountVols(task).forEach { cmd << '--dataVolume' << it }
cmd << '--image' << container
Expand All @@ -372,7 +362,8 @@ class FloatGridExecutor extends AbstractGridExecutor {
cmd << '--extraContainerOpts'
cmd << '--privileged'
}
getCustomTags(task).each { key, val ->
def tags = getCustomTags(task)
tags.each { key, val ->
cmd << '--customTag' << "${toTag(key)}:${toTag(val)}".toString()
}
if (task.config.getMachineType()) {
Expand All @@ -397,7 +388,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
cmd << '--vmPolicy' << '[onDemand=true]'
}
cmd.addAll(getExtra(task))
log.info "[float] submit job: ${toLogStr(cmd)}"
log.info "[FLOAT] submit job: ${toLogStr(cmd)}"
return cmd
}

Expand Down Expand Up @@ -491,86 +482,57 @@ class FloatGridExecutor extends AbstractGridExecutor {
* @return The command line to be used to kill the specified job
*/
List<List<String>> killTaskCommands(def jobId) {
def jobIds
def floatJobIDs
if (jobId instanceof Collection) {
jobIds = jobId
floatJobIDs = jobId
} else {
jobIds = [jobId]
floatJobIDs = [jobId]
}
List<List<String>> ret = []
jobIds.forEach {
for (def it : floatJobIDs) {
def floatJobID = it.toString()
def cmd = getCmdPrefixForJob(floatJobID)
def taskID = floatJobs.getTaskID(floatJobID)
if (taskID == null) {
log.warn "[FLOAT] task id not found for float job id: $floatJobID"
return
}
def cmd = floatConf.getCliPrefix(taskID)
cmd << 'cancel'
cmd << '-j'
cmd << floatJobID
cmd << '-f'
log.info "[float] cancel job: ${toLogStr(cmd)}"
log.info "[FLOAT] cancel job: ${toLogStr(cmd)}"
ret.add(cmd)
}
return ret
}

private List<String> getCmdPrefix0() {
final addresses = floatConf.addresses
final address = addresses.first()
return floatConf.getCliPrefix(address)
}

@Override
protected List<String> getKillCommand() {
def cmd = getCmdPrefix0()
def cmd = floatConf.getCliPrefix(null)
cmd << 'cancel'
cmd << '-j'
log.info "[float] cancel job: ${toLogStr(cmd)}"
log.info "[FLOAT] cancel job: ${toLogStr(cmd)}"
return cmd
}

/**
* @return The status for all the scheduled and running jobs
*/
@Override
protected Map<String, QueueStatus> getQueueStatus0(queue) {
return queueStatus
}

Map<String, QueueStatus> getQueueStatus() {
final cmdMap = queueStatusCommands()
floatJobs.updateStatus(cmdMap)
log.debug "[float] collecting job status completes."
return nfJobID2Status
}

protected Map<String, List<String>> queueStatusCommands() {
return floatConf.addresses.stream().collect(
Collectors.toMap(
oc -> oc.toString(),
oc -> getQueueCmdOfOC(oc.toString())))
protected Map<String, QueueStatus> getQueueStatus0(_) {
log.debug "[FLOAT] collecting job status completes."
return toStatusMap(floatJobs.getNfJobID2Job())
}

@Override
protected List<String> queueStatusCommand(Object queue) {
return getQueueCmdOfOC()
return []
}

@Override
protected Map<String, QueueStatus> parseQueueStatus(String s) {
def stMap = floatJobs.parseQStatus(s)
return toStatusMap(stMap)
}

private List<String> getQueueCmdOfOC(String oc = "") {
def cmd = floatConf.getCliPrefix(oc)
cmd << 'list'
cmd << '--format'
cmd << 'json'
log.info "[float] query job status: ${toLogStr(cmd)}"
return cmd
}

private Map<String, QueueStatus> getNfJobID2Status() {
Map<String, FloatJob> stMap = floatJobs.getNfJobID2job()
return toStatusMap(stMap)
return getQueueStatus0(null)
}

private static Map<String, QueueStatus> toStatusMap(Map<String, FloatJob> stMap) {
Expand All @@ -585,10 +547,10 @@ class FloatGridExecutor extends AbstractGridExecutor {
FloatStatus getJobStatus(TaskRun task) {
def job = getJob(task.id)
if (!job) {
log.info "[float] task status unknown, job not found for ${task.id}"
log.info "[FLOAT] task status unknown, job not found for ${task.id}"
return FloatStatus.UNKNOWN
}
log.info "[float] task id: ${task.id}, nf-job-id: $job.nfJobID, " +
log.info "[FLOAT] task id: ${task.id}, nf-job-id: $job.nfJobID, " +
"float-job-id: $job.floatJobID, float status: $job.status"
return job.status
}
Expand Down
Loading

0 comments on commit 195a341

Please sign in to comment.