Skip to content

Commit

Permalink
rsc: Recover from missing blobs (#1629)
Browse files Browse the repository at this point in the history
* rsc: Recover from missing blobs

* cleanup

* Update share/wake/lib/system/http.wake

Co-authored-by: Colin Schmidt <[email protected]>

---------

Co-authored-by: Colin Schmidt <[email protected]>
  • Loading branch information
V-FEXrt and colinschmidt authored Aug 13, 2024
1 parent 14c93d0 commit 8d33ed4
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 170 deletions.
18 changes: 10 additions & 8 deletions share/wake/lib/system/http.wake
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,17 @@ export def makeBinaryRequest (request: HttpRequest): Result Path Error =
# 2. change the permissions of that new path to write-protected
# which is then refected in the permissions of the path generated by this function preventing
# it from being overwritten. To work around this we need for force remove the file before
# downloading over the path.
# downloading over the path. After removing the file, it is touched so that the job always
# outputs a file even if curl fails to download the target. Without this wake-hash fails to
# hash a non-existant file, in the curl failure case.
def cleanupScript =
"""
rm -f %{destination}
touch %{destination}
"""

def cleanupJob =
makeExecPlan
(
"rm",
"-f",
destination,
)
Nil
makeShellPlan cleanupScript Nil
| setPlanLabel "http: rm {destination}"
| setPlanStdout logNever
| setPlanStderr logNever
Expand Down
328 changes: 166 additions & 162 deletions share/wake/lib/system/remote_cache_runner.wake
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,167 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: Result RunnerIn

Pass output

def rehydrateJob response label input job =
require (Match details) = response
else unreachable "two-constructor tuple must have one value"

def _ =
require True = shouldDebugRemoteCache Unit

def _ = breadcrumb "{label}: Found a match in the cache"
def _ = writeTempFile "remote.cache.lookup.hit" "label: {input.getRunnerInputLabel}"

True

def (
CacheSearchResponseMatch
outputSymlinks
outputDirs
outputFiles
stdoutBlob
stderrBlob
status
runtime
cputime
mem
ibytes
obytes
) = details

# Start these downloads now since they aren't written to disk
def stdoutDownload = rscApiGetStringBlob stdoutBlob
def stderrDownload = rscApiGetStringBlob stderrBlob

def doMakeDirectory (CacheSearchOutputDirectory path mode) =
# wake-format off
def cmd =
"mkdir",
"-m", mode.strOctal,
"-p", path,

require True =
makeExecPlan cmd Nil
| setPlanLabel "rsc: mkdir output dir {path}"
| setPlanPersistence Once
| runJobWith localRunner
| setJobInspectVisibilityHidden
| isJobOk
else failWithError "rsc: Failed to mkdir output dir: {path}"

Pass Unit

# We need to create directories from shallowest to deepest, each directory along the
# chain may have a different permission set and by creating short dirs first we
# ensure they don't incorrectly inheret the permissions of a subdir. This required
# ordering significantly decreases parallism, however this is mitigated by the fact
# that most outputs are files, not dirs.
def dirLoop (dirs: List CacheSearchOutputDirectory) = match dirs
Nil -> Pass Nil
h, t ->
require Pass dir = doMakeDirectory h
require Pass rest = dirLoop t

Pass (dir, rest)

def lenOrder lhs rhs =
def lhsLen = lhs.getCacheSearchOutputDirectoryPath.strlen
def rhsLen = rhs.getCacheSearchOutputDirectoryPath.strlen

if lhsLen == rhsLen then
EQ
else if lhsLen < rhsLen then
LT
else
GT

def orderedDirs =
outputDirs
| sortBy lenOrder

# We don't actually care about the result here but we need to ensure that all
# directories are created before potentially downloading files into them.
require Pass _ = dirLoop orderedDirs
else failWithError "rsc: Failed to make output directory"

# The path is downloaded directly, as is, because it is relative to the workspace.
# Everything besides Command is stored in the server as workspace relative
def doDownload (CacheSearchOutputFile path mode blob) = rscApiGetFileBlob blob path mode

# Link must point to path. We do the reverse here of what is done for posting a job
def doMakeSymlink (CacheSearchOutputSymlink path link) =
require True =
makeExecPlan
(
"ln",
# create symbolic link
"-s",
# overwrite <link> on disk if a file already exists at that path.
# Ideally, we could just fail but its very common to delete wake.db
# without cleaning all the outputs. This causes problems since the link
# would already exist on disk
"-f",
path,
link,
)
Nil
| setPlanLabel "rsc: symlink {link} to {path}"
| setPlanPersistence Once
| runJobWith localRunner
| setJobInspectVisibilityHidden
| isJobOk
else failWithError "rsc: Failed to link {link} to {path}"

Pass Unit

def outputFilesDownload =
outputFiles
| map doDownload

# Symlinks don't need to wait for files, the symlinks will just momentarily be invalid if created first
def outputSymlinksDownload =
outputSymlinks
| map doMakeSymlink

def resolvedOutputs =
outputFiles
| map getCacheSearchOutputFilePath

def resolvedSymlinks =
outputSymlinks
| map getCacheSearchOutputSymlinkLink

def resolvedDirectories =
outputDirs
| map getCacheSearchOutputDirectoryPath

def outputs = resolvedOutputs ++ resolvedDirectories ++ resolvedSymlinks
def predict = Usage status runtime cputime mem ibytes obytes
def inputs = map getPathName (input.getRunnerInputVisible)

require Pass stdout =
stdoutDownload
| addErrorContext "rsc: Failed to download stdout for '{label}'"

require Pass stderr =
stderrDownload
| addErrorContext "rsc: Failed to download stderr for '{label}'"

# We don't actually care about the result here but we need to ensure that all
# downloads have completed before returning.
require Pass _ =
outputFilesDownload
| findFail
| addErrorContext "rsc: Failed to download a blob"

require Pass _ =
outputSymlinksDownload
| findFail
| addErrorContext "rsc: Failed to create a symlink"

def _ = virtual job stdout stderr status runtime cputime mem ibytes obytes

Pass (RunnerOutput inputs outputs predict)

def doit job runnerInput = match runnerInput
Fail e ->
def _ = badlaunch job e
Expand Down Expand Up @@ -106,168 +267,11 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: Result RunnerIn
# If a match was found use it
require NoMatch = response
else
require (Match details) = response
else unreachable "two-constructor tuple must have one value"

def _ =
require True = shouldDebugRemoteCache Unit

def _ = breadcrumb "{label}: Found a match in the cache"

def _ =
writeTempFile "remote.cache.lookup.hit" "label: {input.getRunnerInputLabel}"

True

def (
CacheSearchResponseMatch
outputSymlinks
outputDirs
outputFiles
stdoutBlob
stderrBlob
status
runtime
cputime
mem
ibytes
obytes
) = details

# Start these downloads now since they aren't written to disk
def stdoutDownload = rscApiGetStringBlob stdoutBlob
def stderrDownload = rscApiGetStringBlob stderrBlob

def doMakeDirectory (CacheSearchOutputDirectory path mode) =
# wake-format off
def cmd =
"mkdir",
"-m", mode.strOctal,
"-p", path,

require True =
makeExecPlan cmd Nil
| setPlanLabel "rsc: mkdir output dir {path}"
| setPlanPersistence Once
| runJobWith localRunner
| setJobInspectVisibilityHidden
| isJobOk
else failWithError "rsc: Failed to mkdir output dir: {path}"

Pass Unit

# We need to create directories from shallowest to deepest, each directory along the
# chain may have a different permission set and by creating short dirs first we
# ensure they don't incorrectly inheret the permissions of a subdir. This required
# ordering significantly decreases parallism, however this is mitigated by the fact
# that most outputs are files, not dirs.
def dirLoop (dirs: List CacheSearchOutputDirectory) = match dirs
Nil -> Pass Nil
h, t ->
require Pass dir = doMakeDirectory h
require Pass rest = dirLoop t

Pass (dir, rest)

def lenOrder lhs rhs =
def lhsLen = lhs.getCacheSearchOutputDirectoryPath.strlen
def rhsLen = rhs.getCacheSearchOutputDirectoryPath.strlen

if lhsLen == rhsLen then
EQ
else if lhsLen < rhsLen then
LT
else
GT

def orderedDirs =
outputDirs
| sortBy lenOrder

# We don't actually care about the result here but we need to ensure that all
# directories are created before potentially downloading files into them.
require Pass _ = dirLoop orderedDirs
else failWithError "rsc: Failed to make output directory"

# The path is downloaded directly, as is, because it is relative to the workspace.
# Everything besides Command is stored in the server as workspace relative
def doDownload (CacheSearchOutputFile path mode blob) =
rscApiGetFileBlob blob path mode

# Link must point to path. We do the reverse here of what is done for posting a job
def doMakeSymlink (CacheSearchOutputSymlink path link) =
require True =
makeExecPlan
(
"ln",
# create symbolic link
"-s",
# overwrite <link> on disk if a file already exists at that path.
# Ideally, we could just fail but its very common to delete wake.db
# without cleaning all the outputs. This causes problems since the link
# would already exist on disk
"-f",
path,
link,
)
Nil
| setPlanLabel "rsc: symlink {link} to {path}"
| setPlanPersistence Once
| runJobWith localRunner
| setJobInspectVisibilityHidden
| isJobOk
else failWithError "rsc: Failed to link {link} to {path}"

Pass Unit

def outputFilesDownload =
outputFiles
| map doDownload

# Symlinks don't need to wait for files, the symlinks will just momentarily be invalid if created first
def outputSymlinksDownload =
outputSymlinks
| map doMakeSymlink

def resolvedOutputs =
outputFiles
| map getCacheSearchOutputFilePath

def resolvedSymlinks =
outputSymlinks
| map getCacheSearchOutputSymlinkLink

def resolvedDirectories =
outputDirs
| map getCacheSearchOutputDirectoryPath

def outputs = resolvedOutputs ++ resolvedDirectories ++ resolvedSymlinks
def predict = Usage status runtime cputime mem ibytes obytes

require Pass stdout =
stdoutDownload
| addErrorContext "rsc: Failed to download stdout for '{label}'"

require Pass stderr =
stderrDownload
| addErrorContext "rsc: Failed to download stderr for '{label}'"

def _ = virtual job stdout stderr status runtime cputime mem ibytes obytes
def inputs = map getPathName (input.getRunnerInputVisible)

# We don't actually care about the result here but we need to ensure that all
# downloads have completed before returning.
require Pass _ =
outputFilesDownload
| findFail
| addErrorContext "rsc: Failed to download a blob"

require Pass _ =
outputSymlinksDownload
| findFail
| addErrorContext "rsc: Failed to create a symlink"

Pass (RunnerOutput inputs outputs predict)
match (rehydrateJob response label input job)
Pass x -> Pass x
# If the job hydration fails for any reason just run the job as normal.
# There is no point in attempting to push since the server just said its cached
Fail _ -> baseDoIt job (Pass input)

def _ =
require True = shouldDebugRemoteCache Unit
Expand Down

0 comments on commit 8d33ed4

Please sign in to comment.