Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FV changes #1177

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/csharp/Microsoft.Spark.Worker/Microsoft.Spark.Worker.csproj
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFrameworks>net461;net6.0</TargetFrameworks>
<TargetFrameworks Condition="'$(OS)' != 'Windows_NT'">net6.0</TargetFrameworks>
<TargetFramework>net6.0</TargetFramework>
<!--<TargetFrameworks Condition="'$(OS)' != 'Windows_NT'">net6.0</TargetFrameworks>-->
<RootNamespace>Microsoft.Spark.Worker</RootNamespace>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>
<PropertyGroup>
<RuntimeIdentifier>ubuntu.18.04-x64</RuntimeIdentifier>
</PropertyGroup>

<Target Condition="'$(RuntimeIdentifier)' == 'ubuntu.18.04-x64'" Name="PostBuild" AfterTargets="PostBuildEvent">
<Message Importance="High" Text="---------- Publishing the project to $(OutDir)publish ----------" />
<Exec Command="dotnet publish --runtime ubuntu.18.04-x64 --configuration $(ConfigurationName) --self-contained --no-build --output $(OutDir)publish" />
</Target>
<ItemGroup>
<InternalsVisibleTo Include="Microsoft.Spark.Worker.UnitTest" />
</ItemGroup>
Expand Down
25 changes: 25 additions & 0 deletions src/csharp/Microsoft.Spark.Worker/Microsoft.Spark.Worker.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.5.002.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Spark.Worker", "Microsoft.Spark.Worker.csproj", "{DF036F25-C811-469E-B440-1DE35E394500}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{DF036F25-C811-469E-B440-1DE35E394500}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DF036F25-C811-469E-B440-1DE35E394500}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DF036F25-C811-469E-B440-1DE35E394500}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DF036F25-C811-469E-B440-1DE35E394500}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D2FF370B-5FE3-47CD-B87D-DA883158316B}
EndGlobalSection
EndGlobal
61 changes: 41 additions & 20 deletions src/csharp/Microsoft.Spark.Worker/Processor/TaskContextProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,39 @@ internal TaskContext Process(Stream stream)
return (_version.Major, _version.Minor) switch
{
(2, 4) => TaskContextProcessorV2_4_X.Process(stream),
(3, _) => TaskContextProcessorV3_0_X.Process(stream),
(3, _) t when t.Minor < 4 => TaskContextProcessorV3_0_X.Process(stream),
(3, _) => TaskContextProcessorV3_5_X.Process(stream),
_ => throw new NotSupportedException($"Spark {_version} not supported.")
};
}

private static TaskContext ReadTaskContext(Stream stream)
private static TaskContext ReadTaskContext_2_x(Stream stream)
=> new()
{
return new TaskContext
{
StageId = SerDe.ReadInt32(stream),
PartitionId = SerDe.ReadInt32(stream),
AttemptNumber = SerDe.ReadInt32(stream),
AttemptId = SerDe.ReadInt64(stream)
};
}
IsBarrier = SerDe.ReadBool(stream),
Port = SerDe.ReadInt32(stream),
Secret = SerDe.ReadString(stream),

StageId = SerDe.ReadInt32(stream),
PartitionId = SerDe.ReadInt32(stream),
AttemptNumber = SerDe.ReadInt32(stream),
AttemptId = SerDe.ReadInt64(stream),
};

private static void ReadBarrierInfo(Stream stream)
// Needed for 3.3.4+, 3.4.x, 3.5.x
private static TaskContext ReadTaskContext_3_5(Stream stream)
=> new()
{
// Read barrier-related payload. Note that barrier is currently not supported.
SerDe.ReadBool(stream); // IsBarrier
SerDe.ReadInt32(stream); // BoundPort
SerDe.ReadString(stream); // Secret
}
IsBarrier = SerDe.ReadBool(stream),
Port = SerDe.ReadInt32(stream),
Secret = SerDe.ReadString(stream),

StageId = SerDe.ReadInt32(stream),
PartitionId = SerDe.ReadInt32(stream),
AttemptNumber = SerDe.ReadInt32(stream),
AttemptId = SerDe.ReadInt64(stream),
CPUs = SerDe.ReadInt32(stream)
};

private static void ReadTaskContextProperties(Stream stream, TaskContext taskContext)
{
Expand Down Expand Up @@ -77,8 +87,7 @@ private static class TaskContextProcessorV2_4_X
{
internal static TaskContext Process(Stream stream)
{
ReadBarrierInfo(stream);
TaskContext taskContext = ReadTaskContext(stream);
TaskContext taskContext = ReadTaskContext_2_x(stream);
ReadTaskContextProperties(stream, taskContext);

return taskContext;
Expand All @@ -89,8 +98,20 @@ private static class TaskContextProcessorV3_0_X
{
internal static TaskContext Process(Stream stream)
{
ReadBarrierInfo(stream);
TaskContext taskContext = ReadTaskContext(stream);
TaskContext taskContext = ReadTaskContext_2_x(stream);
ReadTaskContextResources(stream);
ReadTaskContextProperties(stream, taskContext);

return taskContext;
}
}

private static class TaskContextProcessorV3_5_X
{
internal static TaskContext Process(Stream stream)
{
TaskContext taskContext = ReadTaskContext_3_5(stream);
SerDe.ReadInt32(stream);
ReadTaskContextResources(stream);
ReadTaskContextProperties(stream, taskContext);

Expand Down
2 changes: 1 addition & 1 deletion src/csharp/Microsoft.Spark/Interop/SparkEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public static class SparkEnvironment
private static Version GetSparkVersion()
{
var sparkVersion = new Version((string)JvmBridge.CallStaticJavaMethod(
"org.apache.spark.deploy.dotnet.DotnetRunner",
"org.apache.spark.deploy.dotnet.FVRunner",
"SPARK_VERSION"));

string sparkVersionOverride =
Expand Down
25 changes: 25 additions & 0 deletions src/csharp/Microsoft.Spark/Microsoft.Spark.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.5.002.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Spark", "Microsoft.Spark.csproj", "{25BBA6DF-3095-4099-A2B3-935CAA014FED}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{25BBA6DF-3095-4099-A2B3-935CAA014FED}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{25BBA6DF-3095-4099-A2B3-935CAA014FED}.Debug|Any CPU.Build.0 = Debug|Any CPU
{25BBA6DF-3095-4099-A2B3-935CAA014FED}.Release|Any CPU.ActiveCfg = Release|Any CPU
{25BBA6DF-3095-4099-A2B3-935CAA014FED}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E49B541-20ED-4535-A9F2-1C05FDB3044F}
EndGlobalSection
EndGlobal
2 changes: 2 additions & 0 deletions src/csharp/Microsoft.Spark/TaskContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ internal class TaskContext

internal bool IsBarrier { get; set; }

internal int CPUs { get; set; }

internal int Port { get; set; }

internal string Secret { get; set; }
Expand Down
83 changes: 83 additions & 0 deletions src/scala/microsoft-spark-3-4/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.microsoft.scala</groupId>
<artifactId>microsoft-spark</artifactId>
<version>${microsoft-spark.version}</version>
</parent>
<artifactId>microsoft-spark-3-4_2.12</artifactId>
<inceptionYear>2019</inceptionYear>
<properties>
<encoding>UTF-8</encoding>
<scala.version>2.12.17</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.4.3</spark.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.8</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
</args>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the .NET Foundation under one or more agreements.
* The .NET Foundation licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/

package org.apache.spark.api.dotnet

import java.io.DataOutputStream

import org.apache.spark.internal.Logging

import scala.collection.mutable.Queue

/**
* CallbackClient is used to communicate with the Dotnet CallbackServer.
* The client manages and maintains a pool of open CallbackConnections.
* Any callback request is delegated to a new CallbackConnection or
* unused CallbackConnection.
* @param address The address of the Dotnet CallbackServer
* @param port The port of the Dotnet CallbackServer
*/
class CallbackClient(serDe: SerDe, address: String, port: Int) extends Logging {
private[this] val connectionPool: Queue[CallbackConnection] = Queue[CallbackConnection]()

private[this] var isShutdown: Boolean = false

final def send(callbackId: Int, writeBody: (DataOutputStream, SerDe) => Unit): Unit =
getOrCreateConnection() match {
case Some(connection) =>
try {
connection.send(callbackId, writeBody)
addConnection(connection)
} catch {
case e: Exception =>
logError(s"Error calling callback [callback id = $callbackId].", e)
connection.close()
throw e
}
case None => throw new Exception("Unable to get or create connection.")
}

private def getOrCreateConnection(): Option[CallbackConnection] = synchronized {
if (isShutdown) {
logInfo("Cannot get or create connection while client is shutdown.")
return None
}

if (connectionPool.nonEmpty) {
return Some(connectionPool.dequeue())
}

Some(new CallbackConnection(serDe, address, port))
}

private def addConnection(connection: CallbackConnection): Unit = synchronized {
assert(connection != null)
connectionPool.enqueue(connection)
}

def shutdown(): Unit = synchronized {
if (isShutdown) {
logInfo("Shutdown called, but already shutdown.")
return
}

logInfo("Shutting down.")
connectionPool.foreach(_.close)
connectionPool.clear
isShutdown = true
}
}
Loading