Skip to content

Commit

Permalink
add storage level to persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
derrickoswald committed Aug 31, 2016
1 parent 16004ae commit b83f3de
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 25 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,23 @@ cluster manager (8088), node manager (8042), JDBC ThriftServer2 (10000), standal

docker run -it -p 8032:8032 -p 8088:8088 -p 8042:8042 -p 4040:4040 -p 9000:9000 -p 10000:10000 -p 10001:10001 -p 50010:50010 -p 7077:7077 -p 8081:8080 -v /home/derrick/code/CIMScala/target:/opt/code -v /home/derrick/Documents/9code/nis/cim/cim_export:/opt/data --rm -h sandbox sequenceiq/spark:1.6.0 bash

There are a few settings I always do that are not part of the standard docker image, like enabling UTF8 characters in filenames, and adding an alias for ll. When executing from a remote client, the user needs to be a valid user on the cluster, and a member of *supergroup*. Any data files that are needed should be copied into hdfs. If using Spark standalone, you will need to start a master and slave process. So the usual setup commands are:

export LANG=en_US.utf8
alias ll="ls -al"

sudo groupadd supergroup
sudo useradd derrick
sudo usermod --append --groups supergroup derrick

hdfs dfsadmin -safemode leave
hdfs dfs -mkdir /data
hdfs dfs -put /opt/data/NIS_CIM_Export_sias_current_20160816_V7_bruegg.rdf /data
hdfs dfs -put /opt/data/KS_Leistungen.csv /data

/usr/local/spark-1.6.0-bin-hadoop2.6/sbin/start-master.sh
/usr/local/spark-1.6.0-bin-hadoop2.6/sbin/start-slave.sh -m 4g -c 2 spark://sandbox:7077

The spark shell (scala interpreter) allows interactive commands:

spark-shell --master yarn --deploy-mode client --driver-memory 1g --executor-memory 4g --executor-cores 2 --jars /opt/code/CIMScala-1.0-SNAPSHOT.jar
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/ch/ninecode/cim/CIMEdges.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.SQLUserDefinedType
import org.apache.spark.storage.StorageLevel

import ch.ninecode.model._

case class PreEdge (var id_seq_1: String, var cn_1: String, var id_seq_2: String, var cn_2: String, var id_equ: String, var clazz: String, var name: String, var aliasName: String, var container: String, var length: Double, var voltage: String, var normalOpen: Boolean, var ratedCurrent: Double, var location: String, val power: Double, val commissioned: String, val status: String) extends Serializable
class Extremum (val id_loc: String, var min_index: Int, var x1 : String, var y1 : String, var max_index: Int, var x2 : String, var y2 : String) extends Serializable
case class Edge (id_seq_1: String, id_seq_2: String, id_equ: String, clazz: String, name: String, aliasName: String, container: String, length: Double, voltage: String, normalOpen: Boolean, ratedCurrent: Double, power: Double, commissioned: String, val status: String, x1: String, y1: String, x2: String, y2: String)

class CIMEdges (val sqlContext: SQLContext) extends Serializable
class CIMEdges (val sqlContext: SQLContext, val storage: StorageLevel) extends Serializable
{
def get (name: String): RDD[Element] =
{
Expand Down Expand Up @@ -395,7 +396,7 @@ class CIMEdges (val sqlContext: SQLContext) extends Serializable

// persist it so the sample can get at it
edges.setName ("Edges")
edges.cache ()
edges.persist (storage)

// expose it
sqlContext.createDataFrame (edges).registerTempTable ("edges")
Expand Down
14 changes: 7 additions & 7 deletions src/main/scala/ch/ninecode/cim/CIMRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.apache.spark.sql.sources.OutputWriterFactory
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.StorageLevel

import scala.reflect._
import scala.reflect.runtime.universe._
Expand All @@ -33,16 +34,15 @@ class CIMRelation(
private val parameters: Map[String, String])
(@transient val sqlContext: SQLContext) extends HadoopFsRelation with Logging
{

// private val IgnoreFilesWithoutExtensionProperty = "avro.mapred.ignore.inputs.without.extension"
// private val recordName = parameters.getOrElse("recordName", "topLevelRecord")
// private val recordNamespace = parameters.getOrElse("recordNamespace", "")
// check for a storage level option
var _StorageLevel: StorageLevel = StorageLevel.fromString (parameters.getOrElse ("StorageLevel", "MEMORY_ONLY"))

logInfo ("paths: " + paths.mkString (","))
logInfo ("maybeDataSchema: " + maybeDataSchema.toString ())
logInfo ("userDefinedPartitionColumns: " + userDefinedPartitionColumns.toString ())
logInfo ("parameters: " + parameters.toString ())
logInfo ("sqlContext: " + sqlContext.toString ())
logInfo ("storage: " + _StorageLevel.description)

/**
* Specifies schema of actual data files. For partitioned relations, if one or more partitioned
Expand Down Expand Up @@ -126,7 +126,7 @@ class CIMRelation(

ret = rdd.asInstanceOf[RDD[Row]]
ret.setName ("Elements") // persist it
ret.cache ()
ret.persist (_StorageLevel)

// as a side effect, define all the other temporary tables
logInfo ("creating temporary tables")
Expand All @@ -143,15 +143,15 @@ class CIMRelation(
// and described in http://docs.scala-lang.org/overviews/reflection/thread-safety.html
// p.s. Scala's type system is a shit show of kludgy code
logInfo ("building " + subsetter.cls)
subsetter.make (sqlContext, rdd)
subsetter.make (sqlContext, rdd, _StorageLevel)
}
)

// set up edge graph if it's not an ISU file
if (!filename.contains ("ISU"))
{
logInfo ("making Edges RDD")
val cimedges = new CIMEdges (sqlContext)
val cimedges = new CIMEdges (sqlContext, _StorageLevel)
cimedges.make_edges (rdd)
}
}
Expand Down
26 changes: 10 additions & 16 deletions src/main/scala/ch/ninecode/cim/CIMSubsetter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.StorageLevel

import ch.ninecode.model.Element

case class SerializableObject (name: String) extends Serializable

/**
* Subclass extractor
* Extracts the given type of object from the full Element Resilient Distributes Dataset (RDD),
Expand All @@ -23,10 +22,6 @@ case class SerializableObject (name: String) extends Serializable
*/
class CIMSubsetter[A <: Product : ClassTag] (schema: StructType) extends Serializable
{
// try to avoid deadlock due to https://issues.scala-lang.org/browse/SI-6240
// and described in http://docs.scala-lang.org/overviews/reflection/thread-safety.html
val lock: AnyRef = SerializableObject ("dickhead")

def runtime_class = classTag[A].runtimeClass
def classname = runtime_class.getName
def cls: String = { classname.substring (classname.lastIndexOf (".") + 1) }
Expand All @@ -44,19 +39,18 @@ class CIMSubsetter[A <: Product : ClassTag] (schema: StructType) extends Seriali
case x: Element if (null != subclass (x)) =>
subclass (x)
}
def subset (rdd: RDD[Element]): RDD[Row] =
def subset (rdd: RDD[Element], storage: StorageLevel): RDD[Row] =
{
val subrdd = rdd.collect[A] (pf)
subrdd.name = cls
subrdd.cache ()
subrdd.persist (storage)
subrdd.asInstanceOf[RDD[Row]]
}
def make (sqlContext: SQLContext, rdd: RDD[Element]) =
lock.synchronized
{
val sub = subset (rdd)
// use the (Row, schema) form of createDataFrame, because all others rely on a TypeTag which is erased
val df = sqlContext.createDataFrame (sub, schema)
df.registerTempTable (cls)
}
def make (sqlContext: SQLContext, rdd: RDD[Element], storage: StorageLevel) =
{
val sub = subset (rdd, storage)
// use the (Row, schema) form of createDataFrame, because all others rely on a TypeTag which is erased
val df = sqlContext.createDataFrame (sub, schema)
df.registerTempTable (cls)
}
}

0 comments on commit b83f3de

Please sign in to comment.