Skip to content

Commit

Permalink
added exclusion tags for pom.xml and removed contents field
Browse files Browse the repository at this point in the history
  • Loading branch information
valamuri2020 committed Aug 30, 2024
1 parent 3a56a0a commit 7ae1038
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 20 deletions.
52 changes: 47 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -536,10 +536,25 @@
</exclusion>
</exclusions>
</dependency>
<!-- Apache Parquet Dependencies -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.12.3</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down Expand Up @@ -572,21 +587,48 @@
<version>1.12.3</version>
</dependency>

<!-- Additional dependencies for Hadoop -->
<!-- Necessary Hadoop Modules for Parquet -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<!-- Add other exclusions as necessary -->
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<artifactId>hadoop-client-api</artifactId>
<version>3.3.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<artifactId>hadoop-client-runtime</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.hadoop.util.HadoopInputFile;

import java.util.ArrayList;

Expand Down Expand Up @@ -138,7 +132,6 @@ private void initializeParquetReader(java.nio.file.Path path) throws IOException
// Initialize lists to store data read from the Parquet file
vectors = new ArrayList<>();
ids = new ArrayList<>();
contents = new ArrayList<>();

Group record;
// Read each record from the Parquet file
Expand All @@ -147,10 +140,6 @@ private void initializeParquetReader(java.nio.file.Path path) throws IOException
String docid = record.getString("docid", 0);
ids.add(docid);

// Extract the contents (String) from the record
String content = record.getString("contents", 0);
contents.add(content);

// Extract the vector (double[]) from the record
Group vectorGroup = record.getGroup("vector", 0); // Access the 'vector' field
int vectorSize = vectorGroup.getFieldRepetitionCount(0); // Get the number of elements in the vector
Expand Down Expand Up @@ -182,11 +171,10 @@ protected synchronized void readNext() throws IOException, NoSuchElementExceptio

// Get the current document's ID, contents, and vector
String id = ids.get(currentIndex);
String content = contents.get(currentIndex);
double[] vector = vectors.get(currentIndex);

// Create a new Document object with the retrieved data
bufferedRecord = new ParquetDenseVectorCollection.Document(id, vector, content);
bufferedRecord = new ParquetDenseVectorCollection.Document(id, vector, "");

currentIndex++;
}
Expand Down
6 changes: 4 additions & 2 deletions src/main/python/parquet/json_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ def convert_file_to_parquet(input_file_path: str, output_file_path: str) -> int:
try:
data = read_jsonl_file(input_file_path)
df = pd.DataFrame(data)
# contents is a placeholder field, can be dropped
df.drop(columns=["contents"], inplace=True)

# Write to Parquet
df.to_parquet(output_file_path, index=False)
return len(df)
except Exception as e:
logging.error(f"Error converting {input_file_path} to Parquet: {e}")
raise RuntimeError(
f"Error converting {input_file_path} to Parquet: {e}")
raise RuntimeError(f"Error converting {input_file_path} to Parquet: {e}")


def validate_parquet_conversion(input_file_path: str, parquet_file_path: str) -> bool:
Expand All @@ -60,6 +61,7 @@ def validate_parquet_conversion(input_file_path: str, parquet_file_path: str) ->
# Read original JSONL data
jsonl_data = read_jsonl_file(input_file_path)
jsonl_df = pd.DataFrame(jsonl_data)
jsonl_df.drop(columns=["contents"], inplace=True)

# Read Parquet data
parquet_df = pd.read_parquet(parquet_file_path)
Expand Down

0 comments on commit 7ae1038

Please sign in to comment.