Skip to content

Commit

Permalink
♻️ refactor: Improve the documentation of FileSystemSource
Browse files Browse the repository at this point in the history
  • Loading branch information
dogukan10 committed Jun 13, 2024
1 parent ad0ab7a commit ae35de6
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ class FileDataSourceReader(spark: SparkSession) extends BaseDataSourceReader[Fil
* @throws NotImplementedError If the specified source format is not implemented.
*/
override def read(mappingSource: FileSystemSource, sourceSettings:FileSystemSourceSettings, schema: Option[StructType], timeRange: Option[(LocalDateTime, LocalDateTime)], limit: Option[Int] = Option.empty,jobId: Option[String] = Option.empty): DataFrame = {
// get format of the file
val sourceType = mappingSource.sourceType
// get the format of the file
val sourceType = mappingSource.inferFileFormat
// append the file format to the path if not exists
// for streaming jobs, we expect a directory name, so simply use the given path
val sourcePath =
if (sourceSettings.asStream || mappingSource.path.endsWith(s".${sourceType}"))
mappingSource.path
else mappingSource.path.concat(s".${sourceType}")
// Do not add context path if it is a hadoop path
// determine the final path
// if it is a Hadoop path (starts with "hdfs://"), construct the URI directly without adding the context path
val finalPath = if (sourceSettings.dataFolderPath.startsWith("hdfs://")) {
new URI(s"${sourceSettings.dataFolderPath.stripSuffix("/")}/${sourcePath.stripPrefix("/")}").toString
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,41 @@ trait FhirMappingSourceContext extends Serializable {
}

/**
* Context/configuration for one of the source of the mapping that will read the source data from file system
* Context/configuration for one of the sources of the mapping that will read the source data from the file system.
*
* For batch jobs, you should either provide a file name in the "path" field with a file extension or provide the file name
* without an extension in the "path" field and indicate the extension in the "fileFormat" field.
*
* Examples:
* - With extension in path:
* FileSystemSource(path = "data/patients.csv") => Will read "data/patients.csv" file
* - Without extension in path (specifying file format separately):
* FileSystemSource(path = "data/patients", fileFormat = Some("csv")) => Will read "data/patients.csv" file
*
* For streaming jobs, you should provide a folder name in the "path" field and a file format in the "fileFormat" field so that
* it can read the files with specified file formats in the given folder.
*
* Examples:
* - Providing folder name and file format:
* FileSystemSource(path = "data/streaming/patients", fileFormat = Some("json")) => Will read "json" files in "data/streaming/patients" folder
* @param path File path to the source file or folder, e.g., "patients.csv" or "patients".
* @param fileFormat Format of the file (csv | json | parquet) if it can not be inferred from path e.g. csv
* @param options Further options for the format (Spark Data source options for the format e.g. For csv -> https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option)
* @param fileFormat Format of the file (csv | json | parquet) if it cannot be inferred from the path, e.g., csv.
* @param options Further options for the format (Spark Data source options for the format, e.g., for csv -> https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option).
*/
case class FileSystemSource(path: String, fileFormat:Option[String] = None, options:Map[String, String] = Map.empty[String, String], override val preprocessSql: Option[String] = None) extends FhirMappingSourceContext {
/**
* Determines the source type based on the file format and path.
* Infers the file format based on the provided path and file format.
*
* This method attempts to determine the file format using the following logic:
* 1. If `fileFormat` is provided, it is used as the determined format.
* 2. If `fileFormat` is not provided and the `path` contains an extension (determined by the presence of a period),
* the extension of the `path` is used as the format.
* 3. If neither `fileFormat` is provided nor an extension is found in the `path`, a `MissingFileFormatException` is thrown.
*
* @return The determined source type.
* @throws MissingFileFormatException If the file format is missing or invalid.
* @return The inferred file format.
* @throws MissingFileFormatException If the file format cannot be determined.
*/
def sourceType: String = {
// the determined format
def inferFileFormat: String = {
var format = fileFormat
// split the path into segments based on the period ('.')
val pathSegments = path.split('.')
Expand Down

0 comments on commit ae35de6

Please sign in to comment.