Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Launch dotnet backend jvm bridge process ondemand #562

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c599bee
[fix] Fix broken maven link
laneser Jun 21, 2020
6d6d61a
Merge pull request #1 from bridgewell/feature/FixUbuntuBuildLink
laneser Jun 21, 2020
394b11f
Launch jvm bridge if needed at SparkSession starting
laneser Jun 21, 2020
6513349
Merge pull request #2 from bridgewell/feature/AddJvmBridgeHelper
laneser Jun 21, 2020
c376c0e
[fix] Add more test and fix Listening TCP issues.
laneser Jun 21, 2020
a07e781
Fix broken maven link in window build doc
laneser Jun 22, 2020
b50ef33
Merge pull request #3 from bridgewell/feature/FixMavenBrokenLink
laneser Jun 22, 2020
3e98e1a
Merge pull request #4 from dotnet/master
laneser Jun 22, 2020
309ef00
Merge pull request #5 from dotnet/master
laneser Jun 22, 2020
2f04f5a
Merge pull request #6 from dotnet/master
laneser Jun 22, 2020
3580249
Merge pull request #7 from bridgewell/dotnetspark_master
laneser Jun 22, 2020
a246740
Merge branch 'dotnetspark_master' into feature/AddJvmBridgeHelper
laneser Jun 22, 2020
ffabc53
Remove integration test for unit test may not pass at env not ready.
laneser Jun 22, 2020
32d50aa
Add logger to show jvm bridge launch info.
laneser Jun 22, 2020
70add1d
Add logger to show jvm bridge launch info.
laneser Jun 22, 2020
790bb09
Revert "Add logger to show jvm bridge launch info."
laneser Jun 22, 2020
d3f5dad
Merge branch 'master' into feature/AddJvmBridgeHelper
laneser Jun 23, 2020
b996852
Merge branch 'master' into feature/AddJvmBridgeHelper
laneser Jun 23, 2020
9cf3f6a
Merge pull request #8 from dotnet/master
laneser Jun 24, 2020
9f29649
Merge pull request #9 from dotnet/master
laneser Jun 24, 2020
624e6e8
Merge pull request #10 from bridgewell/dotnetspark_master
laneser Jun 24, 2020
e9b95a9
Merge branch 'master' into feature/AddJvmBridgeHelper
laneser Jun 24, 2020
28ef78a
Add logs for spark fixture processing
laneser Jun 24, 2020
fc2442c
Move JVMBridgeHelper to Utils and try to locate SparkHome
laneser Jun 25, 2020
14b6db0
Fix spark home is null at unit-test
laneser Jun 25, 2020
9bd43f2
Merge branch 'master' into feature/AddJvmBridgeHelper
laneser Jun 29, 2020
056f8ce
Merge branch 'master' into feature/AddJvmBridgeHelper
laneser Jul 1, 2020
2fad72b
Fix the jvm bridge helper dispose issue
laneser Jul 7, 2020
e89505b
Merge branch 'feature/AddJvmBridgeHelper' of https://github.com/bridg…
laneser Jul 7, 2020
f7b1016
restore the Stop()
laneser Jul 7, 2020
a98004b
Merge branch 'master' into feature/AddJvmBridgeHelper
laneser Jul 8, 2020
cba370d
Merge branch 'master' into feature/AddJvmBridgeHelper
laneser Jul 12, 2020
d1b84bf
Merge branch 'master' into feature/AddJvmBridgeHelper
laneser Jul 31, 2020
c664cb9
Merge branch 'master' into feature/AddJvmBridgeHelper
laneser Aug 10, 2020
ac77f2a
Merge branch 'master' into feature/AddJvmBridgeHelper
laneser Aug 14, 2020
a476fd8
Merge branch 'master' into feature/AddJvmBridgeHelper
laneser Aug 29, 2020
18cb343
Merge branch 'master' into feature/AddJvmBridgeHelper
laneser Sep 2, 2020
eb64e67
Merge branch 'master' into feature/AddJvmBridgeHelper
laneser Sep 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/csharp/Microsoft.Spark.E2ETest/SparkFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Runtime.InteropServices;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Services;
using Microsoft.Spark.UnitTest.TestUtils;
using Xunit;

Expand All @@ -21,6 +22,10 @@ namespace Microsoft.Spark.E2ETest
/// </summary>
public sealed class SparkFixture : IDisposable
{

private readonly ILoggerService _logger =
LoggerServiceFactory.GetLogger(typeof(SparkFixture));

/// <summary>
/// The names of environment variables used by the SparkFixture.
/// </summary>
Expand Down Expand Up @@ -112,11 +117,14 @@ public SparkFixture()
Spark.SparkContext.SetLogLevel(DefaultLogLevel);

Jvm = ((IJvmObjectReferenceProvider)Spark).Reference.Jvm;

_logger.LogInfo("SparkSession created.");
}

public void Dispose()
{
Spark.Dispose();
_logger.LogInfo("SparkSession disposed.");

// CSparkRunner will exit upon receiving newline from
// the standard input stream.
Expand Down
86 changes: 86 additions & 0 deletions src/csharp/Microsoft.Spark.UnitTest/Utils/JVMBridgeHelperTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.NetworkInformation;
using System.Net;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Network;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using Microsoft.Spark.UnitTest.TestUtils;
using Microsoft.Spark.Utils;
using Moq;
using Xunit;

namespace Microsoft.Spark.UnitTest
{
public class JVMBridgeHelperTests
{
[Theory]
[InlineData("1,2,3", 0, false)]
[InlineData("5567,5568,5569", 0, true)]
[InlineData("1,2,3", 5567, false)]
[InlineData("5567,5568,5569", 5567, true)]
[InlineData("1234,5678", 1234, true)]
public void IsDotnetBackendPortUsingTest(string testListeningPorts, int backendport, bool expect)
{
var envkey = "DOTNETBACKEND_PORT";
var oldenvValue = Environment.GetEnvironmentVariable(envkey);
try
{
if (backendport == 0)
{
Environment.SetEnvironmentVariable(envkey, null);
}
else
{
Environment.SetEnvironmentVariable(envkey, backendport.ToString());
}

var listeningEndpoints = testListeningPorts
.Split(",").Select(x => new IPEndPoint(0, Convert.ToInt32(x))).ToArray();

var ipinfo = new Mock<IPGlobalProperties>();
ipinfo.Setup(m => m.GetActiveTcpListeners())
.Returns(listeningEndpoints);

var ret = JVMBridgeHelper.IsDotnetBackendPortUsing(ipinfo.Object);
Assert.Equal(ret, expect);
}
finally
{
Environment.SetEnvironmentVariable(envkey, oldenvValue);
}
}

/// <summary>
/// The main test case of JVM bridge helper,
/// is simply run it and close it.
/// </summary>
[Fact]
public void JVMBridgeHelperMainPathTest()
{
using (var helper = new JVMBridgeHelper())
{
// now we should be able to connect to JVM bridge
// or if system environment is not good, we should not failed.
}
}

[Fact]
public void JVMBridgeHelperWithoutSpark()
{
var oldhome = Environment.GetEnvironmentVariable("SPARK_HOME");
Environment.SetEnvironmentVariable("SPARK_HOME", null);
using (var helper = new JVMBridgeHelper())
{
}
Environment.SetEnvironmentVariable("SPARK_HOME", oldhome);
}
}
}
19 changes: 18 additions & 1 deletion src/csharp/Microsoft.Spark/Sql/SparkSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Sql.Streaming;
using Microsoft.Spark.Sql.Types;
using Microsoft.Spark.Utils;

namespace Microsoft.Spark.Sql
{
Expand All @@ -27,6 +28,12 @@ public sealed class SparkSession : IDisposable, IJvmObjectReferenceProvider
private static readonly string s_sparkSessionClassName =
"org.apache.spark.sql.SparkSession";

/// <summary>
/// The created jvm process
/// </summary>
/// <returns>The created jvm bridge process helper.</returns>
private static JVMBridgeHelper s_jvmbridge = null;

/// <summary>
/// Constructor for SparkSession.
/// </summary>
Expand Down Expand Up @@ -59,7 +66,17 @@ internal SparkSession(JvmObjectReference jvmObject)
/// Creates a Builder object for SparkSession.
/// </summary>
/// <returns>Builder object</returns>
public static Builder Builder() => new Builder();
public static Builder Builder()
{
// We could try to detect if we don't have jvm bridge,
// call the helper to launch jvm bridge process.
if ((s_jvmbridge == null) &&
(JVMBridgeHelper.IsDotnetBackendPortUsing() == false))
{
s_jvmbridge = new JVMBridgeHelper();
}
return new Builder();
}

/// Note that *ActiveSession() APIs are not exposed because these APIs work with a
/// thread-local variable, which stores the session variable. Since the Netty server
Expand Down
163 changes: 163 additions & 0 deletions src/csharp/Microsoft.Spark/Utils/JVMBridgeHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.IO;
using System.Linq;
using System.Net.NetworkInformation;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Runtime.InteropServices;
using Microsoft.Spark.Services;
using Microsoft.Spark.Interop;

namespace Microsoft.Spark.Utils
{
/// <summary>
/// An helper to launch dotnet jvm if needed
/// </summary>
internal class JVMBridgeHelper : IDisposable
{
/// <summary>
/// Customization for JVM Bridge jar file
/// If not exists, the helper will find out the jar in $DOTNET_WORKER_DIR folder.
/// </summary>
internal static string JVMBridgeJarEnvName = "DOTNET_BRIDGE_JAR";

/// <summary>
/// Generate spark settings for the running system
/// </summary>
internal static SparkSettings sparkSettings = new SparkSettings();

/// <summary>
/// DotnetRunner classname
/// </summary>
private static string RunnerClassname =
"org.apache.spark.deploy.dotnet.DotnetRunner";

private static string RunnerReadyMsg =
".NET Backend running debug mode. Press enter to exit";

private static string RunnerAddressInUseMsg =
"java.net.BindException: Address already in use";


private static int maxWaitTimeoutMS = 60000;

private readonly ILoggerService _logger =
LoggerServiceFactory.GetLogger(typeof(JVMBridgeHelper));

/// <summary>
/// The running jvm bridge process , null means no such process
/// </summary>
private Process jvmBridge;

/// <summary>
/// Detect if we already have the runner by checking backend port is using or not.
/// </summary>
/// <param name="customIPGlobalProperties">custom IPGlobalProperties, null for System.Net.NetworkInformation</param>
/// <returns> True means backend port is occupied by the runner.</returns>
internal static bool IsDotnetBackendPortUsing(
IPGlobalProperties customIPGlobalProperties = null)
{
var backendport = SparkEnvironment.ConfigurationService.GetBackendPortNumber();
var listeningEndpoints =
(customIPGlobalProperties ?? IPGlobalProperties.GetIPGlobalProperties())
.GetActiveTcpListeners();
return listeningEndpoints.Any(p => p.Port == backendport);
}

internal JVMBridgeHelper()
{
var jarpath = locateBridgeJar();
if (string.IsNullOrWhiteSpace(jarpath) ||
string.IsNullOrWhiteSpace(sparkSettings.SPARK_SUBMIT))
{
// Cannot find correct launch informations, give up.
return;
}
var arguments = $"--class {RunnerClassname} {jarpath} debug";
var startupinfo = new ProcessStartInfo
{
FileName = sparkSettings.SPARK_SUBMIT,
Arguments = arguments,
RedirectStandardOutput = true,
RedirectStandardInput = true,
UseShellExecute = false,
CreateNoWindow = true,
};

jvmBridge = new Process() { StartInfo = startupinfo };
_logger.LogInfo($"Launch JVM Bridge : {sparkSettings.SPARK_SUBMIT} {arguments}");
jvmBridge.Start();

// wait until we see .net backend started
Task<string> message;
while ((message = jvmBridge.StandardOutput.ReadLineAsync()) != null)
{
if (message.Wait(maxWaitTimeoutMS) == false)
{
// wait timeout , giveup
break;
}

if (message.Result.Contains(RunnerReadyMsg))
{
// launched successfully!
jvmBridge.StandardOutput.ReadToEndAsync();
_logger.LogInfo($"Launch JVM Bridge ready");
return;
}
if (message.Result.Contains(RunnerAddressInUseMsg))
{
// failed to start for port is using, give up.
jvmBridge.StandardOutput.ReadToEndAsync();
break;
}
}
// wait timeout , or failed to startup
// give up.
jvmBridge.Close();
jvmBridge = null;
}

/// <summary>
/// Locate
/// </summary>
private string locateBridgeJar()
{
var jarpath = Environment.GetEnvironmentVariable(JVMBridgeJarEnvName);
if (string.IsNullOrWhiteSpace(jarpath) == false)
{
return jarpath;
}

var workdir = Environment.GetEnvironmentVariable(
ConfigurationService.WorkerDirEnvVarName);
if ((workdir != null) && Directory.Exists(workdir))
{
// let's find the approicate jar in the work dirctory.
var jarfile = new DirectoryInfo(workdir)
.GetFiles("microsoft-spark-*.jar")
.FirstOrDefault();
if (jarfile != null)
{
return Path.Combine(jarfile.DirectoryName, jarfile.Name);
}
}

return string.Empty;
}

public void Dispose()
{
if (jvmBridge != null)
{
jvmBridge.StandardInput.WriteLine("\n");
jvmBridge.WaitForExit(maxWaitTimeoutMS);
_logger.LogInfo($"JVM Bridge disposed.");
}
}
}
}
Loading