Skip to content

Commit

Permalink
Automatically pump os.proc streams when SystemStreams are redirec…
Browse files Browse the repository at this point in the history
…ted (#3275)

Depends on com-lihaoyi/os-lib#283

This moves the subprocess stream handling logic out of
`Jvm.spawnSubprocess` and makes it apply to all `os.proc` invocations,
greatly reducing the room for error. With this, `Jvm.spawnSubprocess`
becomes a very thin wrapper around `os.proc.spawn`. We also rely
directly on OS-Lib's own pumper threads to pump to our destination,
rather than having them pump into in-memory buffers and then spawning
our own pumper threads to pump from those buffers to the destination

I spent some time looking into how to do the stdout/err handling at the
process level, but couldn't find any reasonable mechanism to do so that
allows us to preserve the ordering of the stdout/stderr. This is the
original motivation to squishing it into one stream via
`ProxyOutputStream`/`ProxyStreamPumper` and is important because
otherwise you find e.g. stack traces out of order with printlns, which
makes debugging very difficult. Might be possible using some
socket/fifo/pipe cleverness, but not as part of this PR

Added an integration test to assert on the subtleties of stdout, stderr,
and their inherited alternatives. This PR is required for the test to
pass
  • Loading branch information
lihaoyi authored Jul 22, 2024
1 parent 38bfbc5 commit 7147d76
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 62 deletions.
3 changes: 2 additions & 1 deletion build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ object Deps {
val junitInterface = ivy"com.github.sbt:junit-interface:0.13.3"
val lambdaTest = ivy"de.tototec:de.tobiasroeser.lambdatest:0.8.0"
val log4j2Core = ivy"org.apache.logging.log4j:log4j-core:2.23.1"
val osLib = ivy"com.lihaoyi::os-lib:0.10.2"
val osLib = ivy"com.lihaoyi::os-lib:0.10.3"
val pprint = ivy"com.lihaoyi::pprint:0.9.0"
val mainargs = ivy"com.lihaoyi::mainargs:0.7.0"
val millModuledefsVersion = "0.10.9"
Expand Down Expand Up @@ -618,6 +618,7 @@ object main extends MillStableScalaModule with BuildInfo {
)

object api extends MillStableScalaModule with BuildInfo {
def moduleDeps = Seq(client)
def buildInfoPackageName = "mill.api"
def buildInfoMembers = Seq(
BuildInfo.Value("millVersion", millVersion(), "Mill version."),
Expand Down
18 changes: 18 additions & 0 deletions integration/feature/subprocess-stdout/repo/build.sc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import mill._


def inheritInterleaved = T {
for (i <- Range.inclusive(1, 9)) {
println("print stdout" + i)
os.proc("echo", "proc stdout" + i).call(stdout = os.Inherit)
System.err.println("print stderr" + i)
os.proc("bash", "-c", s"echo proc stderr${i} >&2").call(stderr = os.Inherit)
}
}

def inheritRaw = T{
println("print stdoutRaw")
os.proc("echo", "proc stdoutRaw").call(stdout = os.InheritRaw)
System.err.println("print stderrRaw")
os.proc("bash", "-c", "echo proc stderrRaw >&2").call(stderr = os.InheritRaw)
}
67 changes: 67 additions & 0 deletions integration/feature/subprocess-stdout/repo/mill
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/usr/bin/env sh

# This is a wrapper script, that automatically download mill from GitHub release pages
# You can give the required mill version with MILL_VERSION env variable
# If no version is given, it falls back to the value of DEFAULT_MILL_VERSION

set -e

if [ -z "${DEFAULT_MILL_VERSION}" ] ; then
DEFAULT_MILL_VERSION=0.11.6
fi

if [ -z "$MILL_VERSION" ] ; then
if [ -f ".mill-version" ] ; then
MILL_VERSION="$(head -n 1 .mill-version 2> /dev/null)"
elif [ -f ".config/mill-version" ] ; then
MILL_VERSION="$(head -n 1 .config/mill-version 2> /dev/null)"
elif [ -f "mill" ] && [ "$0" != "mill" ] ; then
MILL_VERSION=$(grep -F "DEFAULT_MILL_VERSION=" "mill" | head -n 1 | cut -d= -f2)
else
MILL_VERSION=$DEFAULT_MILL_VERSION
fi
fi

if [ "x${XDG_CACHE_HOME}" != "x" ] ; then
MILL_DOWNLOAD_PATH="${XDG_CACHE_HOME}/mill/download"
else
MILL_DOWNLOAD_PATH="${HOME}/.cache/mill/download"
fi
MILL_EXEC_PATH="${MILL_DOWNLOAD_PATH}/${MILL_VERSION}"

version_remainder="$MILL_VERSION"
MILL_MAJOR_VERSION="${version_remainder%%.*}"; version_remainder="${version_remainder#*.}"
MILL_MINOR_VERSION="${version_remainder%%.*}"; version_remainder="${version_remainder#*.}"

if [ ! -s "$MILL_EXEC_PATH" ] ; then
mkdir -p "$MILL_DOWNLOAD_PATH"
if [ "$MILL_MAJOR_VERSION" -gt 0 ] || [ "$MILL_MINOR_VERSION" -ge 5 ] ; then
ASSEMBLY="-assembly"
fi
DOWNLOAD_FILE=$MILL_EXEC_PATH-tmp-download
MILL_VERSION_TAG=$(echo $MILL_VERSION | sed -E 's/([^-]+)(-M[0-9]+)?(-.*)?/\1\2/')
MILL_DOWNLOAD_URL="https://repo1.maven.org/maven2/com/lihaoyi/mill-dist/$MILL_VERSION/mill-dist-$MILL_VERSION.jar"
curl --fail -L -o "$DOWNLOAD_FILE" "$MILL_DOWNLOAD_URL"
chmod +x "$DOWNLOAD_FILE"
mv "$DOWNLOAD_FILE" "$MILL_EXEC_PATH"
unset DOWNLOAD_FILE
unset MILL_DOWNLOAD_URL
fi

if [ -z "$MILL_MAIN_CLI" ] ; then
MILL_MAIN_CLI="${0}"
fi

MILL_FIRST_ARG=""

# first arg is a long flag for "--interactive" or starts with "-i"
if [ "$1" = "--bsp" ] || [ "${1#"-i"}" != "$1" ] || [ "$1" = "--interactive" ] || [ "$1" = "--no-server" ] || [ "$1" = "--repl" ] || [ "$1" = "--help" ] ; then
# Need to preserve the first position of those listed options
MILL_FIRST_ARG=$1
shift
fi

unset MILL_DOWNLOAD_PATH
unset MILL_VERSION

exec $MILL_EXEC_PATH $MILL_FIRST_ARG -D "mill.main.cli=${MILL_MAIN_CLI}" "$@"
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package mill.integration

import utest._

object SubprocessStdoutTests extends IntegrationTestSuite {
val tests: Tests = Tests {
initWorkspace()

test {
val res1 = evalStdCombined("inheritInterleaved").out
// Make sure that when a lot of printed/inherited stdout/stderr is printed
// in quick succession, the output ordering is preserved and it doesn't get
// jumbled up
assert(
res1.contains(
s"""print stdout1
|proc stdout1
|print stderr1
|proc stderr1
|print stdout2
|proc stdout2
|print stderr2
|proc stderr2
|print stdout3
|proc stdout3
|print stderr3
|proc stderr3
|print stdout4
|proc stdout4
|print stderr4
|proc stderr4
|print stdout5
|proc stdout5
|print stderr5
|proc stderr5
|print stdout6
|proc stdout6
|print stderr6
|proc stderr6
|print stdout7
|proc stdout7
|print stderr7
|proc stderr7
|print stdout8
|proc stdout8
|print stderr8
|proc stderr8
|print stdout9
|proc stdout9
|print stderr9
|proc stderr9""".stripMargin
)
)

// Make sure subprocess output that isn't captures by all of Mill's stdout/stderr/os.Inherit
// redirects still gets pikced up from the stdout/stderr log files and displayed. They may
// be out of order from the original Mill stdout/stderr, but they should still at least turn
// up in the console somewhere and not disappear
//
val res2 = evalStdCombined("inheritRaw").out
if (integrationTestMode == "fork") {
// For `fork` tests, which represent `-i`/`--interactive`/`--no-server`, the output should
// be properly ordered since it all comes directly from the stdout/stderr of the same process
assert(
res2.contains(
"""print stdoutRaw
|proc stdoutRaw
|print stderrRaw
|proc stderrRaw""".stripMargin
)
)
} else {
// Note that it should be out of order, because both `print`s will be captured and logged first,
// whereas the two `proc` outputs will get sent to their respective log files and only noticed
// a few milliseconds later as the files are polled for updates
assert(
res2.contains(
"""print stdoutRaw
|print stderrRaw
|proc stdoutRaw
|proc stderrRaw""".stripMargin
)
)
}
}
}
}
33 changes: 26 additions & 7 deletions integration/src/mill/integration/IntegrationTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import mill.resolve.SelectMode
import mill.runner.RunnerState
import os.{Path, Shellable}
import utest._

import collection.mutable
import scala.util.control.NonFatal

object IntegrationTestSuite {
Expand Down Expand Up @@ -38,20 +38,39 @@ abstract class IntegrationTestSuite extends TestSuite {
}

def evalTimeoutStdout(timeout: Long, s: Shellable*): IntegrationTestSuite.EvalResult = {
val output = mutable.Buffer.empty[String]
val error = mutable.Buffer.empty[String]

evalTimeoutStdout0(timeout, output, error, s)

}

val output = Seq.newBuilder[String]
val error = Seq.newBuilder[String]
val processOutput = os.ProcessOutput.Readlines(output += _)
val processError = os.ProcessOutput.Readlines(error += _)
def evalTimeoutStdout0(
timeout: Long,
output: mutable.Buffer[String],
error: mutable.Buffer[String],
s: Seq[Shellable]
): IntegrationTestSuite.EvalResult = {

val processOutput = os.ProcessOutput.Readlines(s => synchronized(output.append(s)))
val processError = os.ProcessOutput.Readlines(s => synchronized(error.append(s)))

val result = evalFork(processOutput, processError, s, timeout)

IntegrationTestSuite.EvalResult(
result,
output.result().mkString("\n"),
error.result().mkString("\n")
synchronized(output.mkString("\n")),
synchronized(error.mkString("\n"))
)
}

// Combines stdout and stderr into a single stream; useful for testing
// against the combined output and also asserting on ordering
def evalStdCombined(s: Shellable*): IntegrationTestSuite.EvalResult = {
val combined = mutable.Buffer.empty[String]
evalTimeoutStdout0(-1, combined, combined, s)
}

val millReleaseFileOpt: Option[Path] =
Option(System.getenv("MILL_TEST_LAUNCHER")).map(os.Path(_, os.pwd))

Expand Down
23 changes: 21 additions & 2 deletions main/api/src/mill/api/SystemStreams.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mill.api

import java.io.{InputStream, PrintStream}
import java.io.{InputStream, OutputStream, PrintStream}
import mill.main.client.InputPumper

/**
* Represents a set of streams that look similar to those provided by the
Expand Down Expand Up @@ -48,6 +49,18 @@ object SystemStreams {

def originalErr: PrintStream = original.err

private class PumpedProcessInput extends os.ProcessInput {
def redirectFrom = ProcessBuilder.Redirect.PIPE
def processInput(processIn: => os.SubProcess.InputStream): Some[InputPumper] = Some(
new InputPumper(() => System.in, () => processIn, true, () => true)
)
}

private class PumpedProcessOutput(dest: OutputStream) extends os.ProcessOutput {
def redirectTo = ProcessBuilder.Redirect.PIPE
def processOutput(processOut: => os.SubProcess.OutputStream): Some[InputPumper] =
Some(new InputPumper(() => processOut, () => dest, false, () => true))
}
def withStreams[T](systemStreams: SystemStreams)(t: => T): T = {
val in = System.in
val out = System.out
Expand All @@ -59,7 +72,13 @@ object SystemStreams {
Console.withIn(systemStreams.in) {
Console.withOut(systemStreams.out) {
Console.withErr(systemStreams.err) {
t
os.Inherit.in.withValue(new PumpedProcessInput) {
os.Inherit.out.withValue(new PumpedProcessOutput(System.out)) {
os.Inherit.err.withValue(new PumpedProcessOutput(System.err)) {
t
}
}
}
}
}
}
Expand Down
21 changes: 13 additions & 8 deletions main/client/src/mill/main/client/InputPumper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,34 @@

import java.io.InputStream;
import java.io.OutputStream;
import java.util.function.Supplier;

public class InputPumper implements Runnable{
private InputStream src;
private OutputStream dest;
private Supplier<InputStream> src0;
private Supplier<OutputStream> dest0;

private Boolean checkAvailable;
private java.util.function.BooleanSupplier runningCheck;
public InputPumper(InputStream src,
OutputStream dest,
public InputPumper(Supplier<InputStream> src,
Supplier<OutputStream> dest,
Boolean checkAvailable){
this(src, dest, checkAvailable, () -> true);
}
public InputPumper(InputStream src,
OutputStream dest,
public InputPumper(Supplier<InputStream> src,
Supplier<OutputStream> dest,
Boolean checkAvailable,
java.util.function.BooleanSupplier runningCheck){
this.src = src;
this.dest = dest;
this.src0 = src;
this.dest0 = dest;
this.checkAvailable = checkAvailable;
this.runningCheck = runningCheck;
}

boolean running = true;
public void run() {
InputStream src = src0.get();
OutputStream dest = dest0.get();

byte[] buffer = new byte[1024];
try{
while(running){
Expand Down
2 changes: 1 addition & 1 deletion main/client/src/mill/main/client/MillClientMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public static int run(
InputStream outErr = ioSocket.getInputStream();
OutputStream in = ioSocket.getOutputStream();
ProxyStreamPumper outPump = new ProxyStreamPumper(outErr, stdout, stderr);
InputPumper inPump = new InputPumper(stdin, in, true);
InputPumper inPump = new InputPumper(() -> stdin, () -> in, true);
Thread outThread = new Thread(outPump, "outPump");
outThread.setDaemon(true);
Thread inThread = new Thread(inPump, "inPump");
Expand Down
Loading

0 comments on commit 7147d76

Please sign in to comment.