Skip to content

Commit

Permalink
🐛 Fix: Add file format to the path of mapping source if not exists
Browse files Browse the repository at this point in the history
  • Loading branch information
camemre49 authored and dogukan10 committed Apr 1, 2024
1 parent a845c78 commit dfd6c45
Showing 1 changed file with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,19 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
* @throws NotImplementedError If the specified source format is not implemented.
*/
override def read(mappingSource: FileSystemSource, sourceSettings:FileSystemSourceSettings, schema: Option[StructType], timeRange: Option[(LocalDateTime, LocalDateTime)], limit: Option[Int] = Option.empty,jobId: Option[String] = Option.empty): DataFrame = {

// get format of the file
val sourceType = mappingSource.sourceType
// append the file format to the path if not exists
val sourcePath =
if (sourceSettings.asStream || mappingSource.path.endsWith(s".${sourceType}"))
mappingSource.path
else mappingSource.path.concat(s".${sourceType}")
// Do not add context path if it is a hadoop path
val finalPath = if (sourceSettings.dataFolderPath.startsWith("hdfs://")) {
new URI(s"${sourceSettings.dataFolderPath.stripSuffix("/")}/${mappingSource.path.stripPrefix("/")}").toString
new URI(s"${sourceSettings.dataFolderPath.stripSuffix("/")}/${sourcePath.stripPrefix("/")}").toString
} else {
FileUtils.getPath(sourceSettings.dataFolderPath, mappingSource.path).toAbsolutePath.toString
FileUtils.getPath(sourceSettings.dataFolderPath, sourcePath).toAbsolutePath.toString
}


// validate whether the provided path is a directory when streaming is enabled in the source settings
if(sourceSettings.asStream && !new File(finalPath).isDirectory){
throw new IllegalArgumentException(s"$finalPath is not a directory. For streaming job, you should provide a directory.")
Expand All @@ -52,9 +56,9 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
// keeps the names of processed files by Spark
val processedFiles: mutable.HashSet[String] =mutable.HashSet.empty
//Based on source type
val resultDf = mappingSource.sourceType match {
val resultDf = sourceType match {
case SourceFileFormats.CSV | SourceFileFormats.TSV =>
val updatedOptions = mappingSource.sourceType match {
val updatedOptions = sourceType match {
case SourceFileFormats.TSV =>
// If the file format is tsv, use tab (\t) as separator by default if it is not set explicitly
mappingSource.options +
Expand Down

0 comments on commit dfd6c45

Please sign in to comment.