Skip to content

Commit

Permalink
exportAllIslands
Browse files Browse the repository at this point in the history
  • Loading branch information
derrickoswald committed Oct 11, 2017
1 parent 63cc2be commit 06a20bc
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 12 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>ch.ninecode.cim</groupId>
<artifactId>CIMReader</artifactId>
<!-- version>${version.dependency.scala}-${version.dependency.spark}-CIMREADER_VERSION</version -->
<version>2.11-2.2.0-2.3.0</version>
<version>2.11-2.1.1-2.3.0</version>
<name>${project.artifactId}</name>
<description>Expose CIM data files as Spark RDD</description>
<inceptionYear>2015</inceptionYear>
Expand Down Expand Up @@ -76,7 +76,7 @@
<version.dependency.scalalibrary>2.11.8</version.dependency.scalalibrary>
<version.dependency.scalatest>3.0.3</version.dependency.scalatest>
<version.dependency.scopt>3.6.0</version.dependency.scopt>
<version.dependency.spark>2.2.0</version.dependency.spark>
<version.dependency.spark>2.1.1</version.dependency.spark>
</properties>
<dependencies>
<dependency>
Expand Down
41 changes: 32 additions & 9 deletions src/main/scala/ch/ninecode/cim/CIMExport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,29 @@ import ch.ninecode.model._
* val terminals = sc.getPersistentRDDs.filter(_._2.name == "Terminal").head._2.asInstanceOf[RDD[Terminal]]
* val NSpin = terminals.filter (x => x.ConductingEquipment == "TRA7872" && x.ACDCTerminal.sequenceNumber == 2)
* println (NSpin.first.TopologicalNode)
* MUI452395_topo
* MUI452395_island
*
* // get the name of the trafokreis TopologicalIsland
* val nodes = sc.getPersistentRDDs.filter(_._2.name == "TopologicalNode").head._2.asInstanceOf[RDD[TopologicalNode]]
* val NSnode = nodes.filter (_.id == "MUI452395_topo")
* val NSnode = nodes.filter (_.id == "MUI452395_island")
* println (NSnode.first.TopologicalIsland)
* TRA7872_terminal_2_topo
* TRA7872_terminal_2_island
*
* // export the reduced CIM file
* val export = new CIMExport (spark)
* export.exportIsland ("TRA7872_terminal_2_topo", "TRA7872.rdf")
* export.exportIsland ("TRA7872_terminal_2_island", "TRA7872.rdf")
* }}}
*
*/
class CIMExport (spark: SparkSession) extends CIMRDD with Serializable
{
implicit val session: SparkSession = spark
implicit val log: Logger = LoggerFactory.getLogger (getClass)
val configuration: Configuration = spark.sparkContext.hadoopConfiguration
val hdfs: FileSystem = FileSystem.get (configuration)

def merge (source: String, destination: String): Unit =
{
val configuration: Configuration = spark.sparkContext.hadoopConfiguration
val hdfs: FileSystem = FileSystem.get (configuration)
FileUtil.copyMerge (hdfs, new Path (source), hdfs, new Path (destination), false, configuration, null)
}

Expand All @@ -113,19 +113,28 @@ class CIMExport (spark: SparkSession) extends CIMRDD with Serializable
<md:Model.profile>https://github.com/derrickoswald/CIMReader</md:Model.profile>
</md:FullModel>"""
val tailer = """</rdf:RDF>"""

// setup
val configuration: Configuration = spark.sparkContext.hadoopConfiguration
val hdfs = FileSystem.get (URI.create (configuration.get ("fs.defaultFS")), configuration)
val directory: Path = new Path (hdfs.getWorkingDirectory, temp)
hdfs.delete (directory, true)
val file = new Path (filename)
hdfs.delete (file, false)
// write the file
val txt = directory.toUri.toString
val head = spark.sparkContext.makeRDD (List[String] (header))
val tail = spark.sparkContext.makeRDD (List[String] (tailer))
val guts = elements.map (_.export)
val ss = head.union (guts).union (tail)
ss.saveAsTextFile (txt)
merge (txt, file.toUri.toString)
// clean up temporary directory
hdfs.delete (directory, true)
// delete the stupid .crc file
val index = filename.lastIndexOf ("/")
val crc = if (-1 != index) filename.substring (0, index + 1) + "." + filename.substring (index + 1) + ".crc" else "." + filename + ".crc"
hdfs.delete (new Path (crc), false)
}

/**
Expand All @@ -135,7 +144,8 @@ class CIMExport (spark: SparkSession) extends CIMRDD with Serializable
*/
def exportIsland (island: String, filename: String): Unit =
{
val someislands = get[TopologicalIsland].filter (_.id == island)
val allislands = get[TopologicalIsland]
val someislands = allislands.filter (_.id == island)
if (someislands.isEmpty())
log.error (island + " not found")
else
Expand Down Expand Up @@ -254,7 +264,7 @@ class CIMExport (spark: SparkSession) extends CIMRDD with Serializable

// get the elements
val elements = get[Element]("Elements").keyBy (_.id).join (ids).map (_._2._1)
export (elements, filename, island)
export (elements, filename, island, "/tmp/" + island + ".rdf")
}
}

Expand All @@ -272,4 +282,17 @@ class CIMExport (spark: SparkSession) extends CIMRDD with Serializable
val elements = get[Element]("Elements")
export (elements, filename, about)
}
}

/**
* Export every topological island.
* @param directory The name of the directory to write the CIM files.
* @return the number of islands processed
*/
def exportAllIslands (directory: String = "simulation/"): Int =
{
val dir = if (directory.endsWith ("/")) directory else directory + "/"
val allislands = get[TopologicalIsland].map (_.id).collect
val islands = allislands.map (island { exportIsland (island, dir + island + ".rdf"); 1})
islands.sum
}
}
18 changes: 17 additions & 1 deletion src/test/scala/ch/ninecode/cim/CIMExportSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,22 @@ voltage +
val elements2 = ntp.process (true)
println (elements2.count + " elements")
val export = new CIMExport (spark)
export.exportIsland ("TRA5200_terminal_2_topo", "target/" + "TRA5200" + ".rdf")
export.exportIsland ("TRA5200_terminal_2_island", "target/" + "TRA5200" + ".rdf")
}

test ("ExportAllIslands")
{
implicit spark: SparkSession

val filename =
PRIVATE_FILE_DEPOT + "bkw_cim_export_haelig" + ".rdf"
val options = new util.HashMap[String, String] ().asInstanceOf[util.Map[String,String]]
val elements = readFile (filename, options)
println (elements.count + " elements")
val ntp = new CIMNetworkTopologyProcessor (spark, org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
val elements2 = ntp.process (true)
println (elements2.count + " elements")
val export = new CIMExport (spark)
export.exportAllIslands ("target/simulation")
}
}

0 comments on commit 06a20bc

Please sign in to comment.