diff --git a/README.md b/README.md index 06d16489c..970b1419b 100644 --- a/README.md +++ b/README.md @@ -37,8 +37,12 @@ Submit your implementation by Jan 31 2024 and become part of the leaderboard! | # | Result (m:s:ms) | Implementation | Submitter | |---|-----------------|--------------------|---------------| -| 1.| 02:08.845| [CalculateAverage_royvanrijn.java](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_royvanrijn.java)| Roy van Rijn| -| 2.| 04:13.449| [CalculateAverage.java](https://github.com/gunnarmorling/onebrc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage.java) (baseline)| Gunnar Morling| +| 1.| 00:23.366| [link](https://github.com/gunnarmorling/1brc/pull/5/)| [Roy van Rijn](https://github.com/royvanrijn)| +| 2.| 00:38.510| [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_bjhara.java)| [Hampus Ram](https://github.com/bjhara)| +| 3.| 00:50.547| [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_padreati.java)| [Aurelian Tutuianu](https://github.com/padreati)| +| 4.| 02:08.315| [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_itaske.java)| [itaske](https://github.com/itaske)| +| 5.| 02:08.650| [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_kuduwa_keshavram.java)| [Kuduwa Keshavram](https://github.com/kuduwa_keshavram)| +| 6.| 04:13.449| [link](https://github.com/gunnarmorling/onebrc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage.java) (baseline)| [Gunnar Morling](https://github.com/gunnarmorling)| See [below](#entering-the-challenge) for instructions how to enter the challenge with your own implementation. @@ -86,7 +90,10 @@ Execute the following steps to run the challenge: The following rules and limits apply: -* Any Java distribution provided by [SDKMan](https://sdkman.io/jdks) as well as early access builds available on openjdk.net may be used (including EA builds for OpenJDK projects like Valhalla). +* Any of these Java distributions may be used: + * Any builds provided by [SDKMan](https://sdkman.io/jdks) + * Early access builds available on openjdk.net may be used (including EA builds for OpenJDK projects like Valhalla) + * Builds on [builds.shipilev.net](https://builds.shipilev.net/openjdk-jdk-lilliput/) If you want to use a build not available via these channels, reach out to discuss whether it can be considered. * No external library dependencies may be used * Implementations must be provided as a single source file @@ -105,7 +112,7 @@ To submit your own implementation to 1BRC, follow these steps: * (Optional) If you'd like to use native binaries (GraalVM), adjust the _pom.xml_ file so that it builds that binary. * Create a pull request against the upstream repository, clearly stating * The name of your implementation class. - * The JDK build to use (of not specified, the latest OpenJDK 21 upstream build will be used). + * The JDK build to use (if not specified, the latest OpenJDK 21 upstream build will be used). * The execution time of the program on your system and specs of the same (CPU, number of cores, RAM). This is for informative purposes only, the official runtime will be determined as described below. * I will run the program and determine its performance as described in the next section, and enter the result to the scoreboard. @@ -142,11 +149,21 @@ A: No, this challenge is focussed on Java only. Feel free to inofficially share _Q: Can I use non-JVM languages and/or tools?_\ A: No, this challenge is focussed on Java only. Feel free to inofficially share interesting implementations and results though. For instance it would be interesting to see how DuckDB fares with this task. -_Q: Why_ 1️⃣🐝🏎️ _?_\ -A: It's the abbreviation of the project name: **One** **B**illion **R**ow **C**hallenge. +_Q: Can I use JNI?_\ +A: Submissions must be implemented in Java; JNI requires glue code written in C/C++, so it cannot be used. You could use AOT compilation of Java code via GraalVM though, either by AOT-compiling the entire application, or by creating a native library which you then invoke using the new Java FFI (https://openjdk.org/jeps/442[JEP 442]). + +_Q: What is the encoding of the measurements.txt file?_\ +A: The file is encoded with UTF-8. _Q: Can I make assumptions on the names of the weather stations showing up in the data set?_\ -A: No, while only a fixed set of station names is used by the data set generator, any solution should work with arbitrary UTF-8 station names. +A: No, while only a fixed set of station names is used by the data set generator, any solution should work with arbitrary UTF-8 station names +(for the sake of simplicity, names are guaranteed to contain no `;` character). + +_Q: Can I copy code from other submissions?_ +A: Yes, you can. The primary focus of the challenge is about learning something new, rather than "winning". When you do so, please give credit to the relevant source submissions. Please don't re-submit other entries with no or only trivial improvements. + +_Q: Why_ 1️⃣🐝🏎️ _?_\ +A: It's the abbreviation of the project name: **One** **B**illion **R**ow **C**hallenge. ## License diff --git a/calculate_average_artpar.sh b/calculate_average_artpar.sh new file mode 100755 index 000000000..d1876af11 --- /dev/null +++ b/calculate_average_artpar.sh @@ -0,0 +1,20 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +JAVA_OPTS="" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_artpar diff --git a/calculate_average_bjhara.sh b/calculate_average_bjhara.sh new file mode 100755 index 000000000..474ec22e1 --- /dev/null +++ b/calculate_average_bjhara.sh @@ -0,0 +1,20 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +JAVA_OPTS="" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_bjhara diff --git a/calculate_average_itaske.sh b/calculate_average_itaske.sh new file mode 100755 index 000000000..25f725f4e --- /dev/null +++ b/calculate_average_itaske.sh @@ -0,0 +1,20 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +JAVA_OPTS="" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_patrick diff --git a/calculate_average_kuduwa-keshavram.sh b/calculate_average_kuduwa-keshavram.sh new file mode 100755 index 000000000..369deebd7 --- /dev/null +++ b/calculate_average_kuduwa-keshavram.sh @@ -0,0 +1,20 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +JAVA_OPTS="" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_kuduwa_keshavram diff --git a/calculate_average_padreati.sh b/calculate_average_padreati.sh new file mode 100755 index 000000000..c35e7a55f --- /dev/null +++ b/calculate_average_padreati.sh @@ -0,0 +1,20 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +JAVA_OPTS="--enable-preview --add-modules jdk.incubator.vector" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_padreati diff --git a/evaluate.sh b/evaluate.sh index 78b418385..35b417a1f 100755 --- a/evaluate.sh +++ b/evaluate.sh @@ -15,8 +15,15 @@ # limitations under the License. # +if [ -z "$1" ] + then + echo "Usage: evaluate.sh " + exit 1 +fi + +mvn clean verify for i in {1..5} do - time ./calculate_average.sh + ./calculate_average_$1.sh done diff --git a/pom.xml b/pom.xml index 401e8260c..f54d1f195 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,11 @@ 3.8.1 true + + --enable-preview + --add-modules + java.base,jdk.incubator.vector + diff --git a/prepare_evaluation.sh b/prepare_evaluation.sh new file mode 100755 index 000000000..250279ff8 --- /dev/null +++ b/prepare_evaluation.sh @@ -0,0 +1,28 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +if [ -z "$1" ] + then + echo "Usage: prepare_evaluation.sh " + exit 1 +fi + + +git checkout -b $1 + +git pull https://github.com/$1/1brc.git +# git pull git@github.com:$1/1brc.git diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_artpar.java b/src/main/java/dev/morling/onebrc/CalculateAverage_artpar.java new file mode 100644 index 000000000..bf9e6a2c8 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_artpar.java @@ -0,0 +1,380 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import jdk.incubator.vector.DoubleVector; +import jdk.incubator.vector.VectorOperators; +import jdk.incubator.vector.VectorSpecies; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.*; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +public class CalculateAverage_artpar { + + public static final int N_THREADS = 8; + private static final String FILE = "./measurements.txt"; + private static final Map nameStringMap = new ConcurrentHashMap<>(1024 * 1024); + private static final Map tempStringMap = new ConcurrentHashMap<>(1024 * 1024); + private static final Map hashToDouble = new ConcurrentHashMap<>(1024 * 1024); + private static final VectorSpecies SPECIES = DoubleVector.SPECIES_PREFERRED; + + public static void main(String[] args) throws IOException { + long start = Instant.now().toEpochMilli(); + Path measurementFile = Paths.get(FILE); + OpenOption openOptions = StandardOpenOption.READ; + + long fileSize = Files.size(measurementFile); + + long expectedChunkSize = fileSize / N_THREADS; + + ExecutorService threadPool = Executors.newFixedThreadPool(N_THREADS); + + long chunkStartPosition = 0; + FileInputStream fis = new FileInputStream(measurementFile.toFile()); + List>> futures = new ArrayList<>(); + long bytesReadCurrent = 0; + + try (FileChannel fileChannel = FileChannel.open(measurementFile, StandardOpenOption.READ)) { + for (int i = 0; i < N_THREADS; i++) { + + long chunkSize = expectedChunkSize; + chunkSize = fis.skip(chunkSize); + + bytesReadCurrent += chunkSize; + while (((char) fis.read()) != '\n' && bytesReadCurrent < fileSize) { + chunkSize++; + bytesReadCurrent++; + } + + // System.out.println("[" + chunkStartPosition + "] - [" + (chunkStartPosition + chunkSize) + " bytes"); + if (chunkStartPosition + chunkSize >= fileSize) { + chunkSize = fileSize - chunkStartPosition; + } + + MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, chunkStartPosition, + chunkSize); + + ReaderRunnable readerRunnable = new ReaderRunnable(mappedByteBuffer, StandardOpenOption.READ, + chunkStartPosition, chunkSize); + Future> future = threadPool.submit(readerRunnable::run); + futures.add(future); + chunkStartPosition = chunkStartPosition + chunkSize + 1; + } + } + + Map globalMap = futures.parallelStream() + .flatMap(future -> { + try { + return future.get().entrySet().stream(); + } + catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (existing, replacement) -> { + existing.combine(replacement); + return existing; + })); + + Map results = globalMap.entrySet().stream() + .parallel().map(e -> Map.entry(e.getKey(), e.getValue().finish())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + threadPool.shutdown(); + Map measurements = new TreeMap<>(results); + + System.out.println(measurements); + // long end = Instant.now().toEpochMilli(); + // System.out.println((end - start) / 1000); + + } + + public static double sum(double[] array) { + int i = 0; + double sum = 0.0; + + // Vectorized loop + for (; i < SPECIES.loopBound(array.length); i += SPECIES.length()) { + var v = DoubleVector.fromArray(SPECIES, array, i); + sum += v.reduceLanes(VectorOperators.ADD); + } + + // Scalar loop for remaining elements + for (; i < array.length; i++) { + sum += array[i]; + } + + return sum; + } + + private record Measurement(String station, double value) { + } + + private record ResultRow(double min, double mean, double max, long count) { + public String toString() { + return round(min) + "/" + round(mean) + "/" + round(max); + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + } + + private static class MeasurementAggregator { + private double min = Double.POSITIVE_INFINITY; + private double max = Double.NEGATIVE_INFINITY; + private double sum; + private long count; + private String name; + + public MeasurementAggregator() { + } + + public MeasurementAggregator(double min, double max, double sum, long count) { + this.min = min; + this.max = max; + this.sum = sum; + this.count = count; + } + + public String getName() { + return name; + } + + void add(Measurement measurement) { + min = Math.min(min, measurement.value()); + max = Math.max(max, measurement.value()); + sum += measurement.value(); + name = measurement.station; + count++; + } + + MeasurementAggregator combine(MeasurementAggregator other) { + min = Math.min(min, other.min); + max = Math.max(max, other.max); + sum += other.sum; + count += other.count; + return this; + } + + ResultRow finish() { + double mean = (count > 0) ? sum / count : 0; + return new ResultRow(min, mean, max, count); + } + } + + private static class ReaderRunnable { + private final MappedByteBuffer mappedByteBuffer; + private final OpenOption openOptions; + private final long chunkStartPosition; + private final long chunkSize; + private final Map results; + Map stationValueMap = new HashMap<>(); + Map stationIndexMap = new HashMap<>(); + + private ReaderRunnable(MappedByteBuffer mappedByteBuffer, OpenOption openOptions, long chunkStartPosition, long chunkSize) { + this.mappedByteBuffer = mappedByteBuffer; + this.openOptions = openOptions; + this.results = new HashMap<>(); + this.chunkStartPosition = chunkStartPosition; + this.chunkSize = chunkSize; + } + + public Map run() { + long start = Date.from(Instant.now()).getTime(); + int totalBytesRead = 0; + Map groupedMeasurements = new HashMap<>(); + + final int VECTOR_SIZE = 512; + final int VECTOR_SIZE_1 = VECTOR_SIZE - 1; + ByteBuffer nameBuffer = ByteBuffer.allocate(128); + String matchedStation = ""; + boolean readUntilSemiColon = true; + + while (mappedByteBuffer.hasRemaining()) { + byte b = mappedByteBuffer.get(); + totalBytesRead++; + if (readUntilSemiColon) { + if ((byte) b != ';') { + nameBuffer.put(b); + continue; + } + else { + readUntilSemiColon = false; + matchedStation = getNameStringFromBufferUsingBuffer(nameBuffer); + continue; + } + } + + if (b != '\n') { + nameBuffer.put(b); + } + else { + readUntilSemiColon = true; + String tempValue = getTempStringFromBufferUsingBuffer(nameBuffer); + + int tempValueHashCode = tempValue.hashCode(); + if (!hashToDouble.containsKey(tempValueHashCode)) { + hashToDouble.put(tempValueHashCode, Double.parseDouble(tempValue)); + } + double doubleValue = hashToDouble.get(tempValueHashCode); + + // Measurement measurement = new Measurement(matchedStation, doubleValue); + double[] array = stationValueMap.computeIfAbsent(matchedStation, (k) -> { + stationIndexMap.put(k, 0); + return new double[VECTOR_SIZE]; + }); + Integer index = stationIndexMap.get(matchedStation); + array[index] = doubleValue; + if (index == VECTOR_SIZE_1) { + + int i = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + double sum = 0; + long count = 0; + for (; i < SPECIES.loopBound(array.length); i += SPECIES.length()) { + // Vector operations + DoubleVector vector = DoubleVector.fromArray(SPECIES, array, i); + min = Math.min(min, vector.reduceLanes(VectorOperators.MIN)); + max = Math.max(max, vector.reduceLanes(VectorOperators.MAX)); + sum += vector.reduceLanes(VectorOperators.ADD); + count += vector.length(); + } + + // MeasurementAggregator ma = new MeasurementAggregator(min, max, sum, VECTOR_SIZE); + // groupedMeasurements.computeIfAbsent(matchedStation, k -> new MeasurementAggregator()) + // .combine(ma); + + int remainingCount = array.length - i; + for (; i < array.length; i++) { + min = Math.min(min, array[i]); + max = Math.max(max, array[i]); + sum += array[i]; + count++; + } + MeasurementAggregator ma = new MeasurementAggregator(min, max, sum, count); + // System.out.println("Sum ma [" + ma + "]"); + groupedMeasurements.computeIfAbsent(matchedStation, k -> new MeasurementAggregator()) + .combine(ma); + + stationIndexMap.put(matchedStation, 0); + continue; + } + stationIndexMap.put(matchedStation, index + 1); + } + } + + VectorSpecies species = DoubleVector.SPECIES_PREFERRED; + for (String stationName : stationIndexMap.keySet()) { + + Integer count = stationIndexMap.get(stationName); + if (count < 1) { + continue; + } + else if (count == 1) { + double[] array = stationValueMap.get(stationName); + double val = array[0]; + MeasurementAggregator ma = new MeasurementAggregator(val, val, val, 1); + groupedMeasurements.computeIfAbsent(stationName, k -> new MeasurementAggregator()) + .combine(ma); + } + else { + double[] array = stationValueMap.get(stationName); + double[] subArray = new double[count]; + System.arraycopy(array, 0, subArray, 0, count); + // Creating a DoubleVector from the array + // System.out.println("Create vector from [" + count + "] -> " + subArray.length); + DoubleVector doubleVector = DoubleVector.fromArray(species, subArray, 0); + double min = doubleVector.reduceLanes(VectorOperators.MIN); + double max = doubleVector.reduceLanes(VectorOperators.MAX); + double sum = doubleVector.reduceLanes(VectorOperators.ADD); + MeasurementAggregator ma = new MeasurementAggregator(min, max, sum, count); + groupedMeasurements.computeIfAbsent(stationName, k -> new MeasurementAggregator()) + .combine(ma); + } + + stationIndexMap.put(matchedStation, 0); + } + + long end = Date.from(Instant.now()).getTime(); + // System.out.println("Took [" + ((end - start) / 1000) + "s for " + totalBytesRead / 1024 + " kb"); + + return groupedMeasurements; + } + + private String getNameStringFromBufferUsingBuffer(ByteBuffer nameBuffer) { + nameBuffer.flip(); + + byte[] array = nameBuffer.array(); + int length = nameBuffer.limit(); + int byteArrayHashCode = hashCode(array, 0, length); + nameBuffer.flip(); + nameBuffer.clear(); + + if (!nameStringMap.containsKey(byteArrayHashCode)) { + String value = new String(array, 0, length, StandardCharsets.UTF_8); + nameStringMap.put(byteArrayHashCode, value); + return value; + } + + return nameStringMap.get(byteArrayHashCode); + } + + private int hashCode(byte[] array, int start, int length) { + if (array == null) { + return 0; + } + + int result = 1; + for (int i = start; i < start + length; i++) { + result = 31 * result + array[i]; + } + + return result; + } + + private String getTempStringFromBufferUsingBuffer(ByteBuffer nameBuffer) { + nameBuffer.flip(); + + byte[] array = nameBuffer.array(); + int length = nameBuffer.limit(); + int byteArrayHashCode = hashCode(array, 0, length); + nameBuffer.flip(); + nameBuffer.clear(); + + if (!tempStringMap.containsKey(byteArrayHashCode)) { + String value = new String(array, 0, length, StandardCharsets.UTF_8); + tempStringMap.put(byteArrayHashCode, value); + return value; + } + + return tempStringMap.get(byteArrayHashCode); + } + } +} diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_bjhara.java b/src/main/java/dev/morling/onebrc/CalculateAverage_bjhara.java new file mode 100644 index 000000000..8296db73d --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_bjhara.java @@ -0,0 +1,166 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.*; +import java.nio.*; +import java.nio.channels.*; +import java.nio.file.*; +import java.util.*; +import java.util.stream.*; + +public class CalculateAverage_bjhara { + private static final String FILE = "./measurements.txt"; + + private static class Measurement { + private double min = Double.POSITIVE_INFINITY; + private double max = Double.NEGATIVE_INFINITY; + private double sum; + private long count; + + public String toString() { + return round(min) + "/" + round(sum / count) + "/" + round(max); + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + + public static Measurement combine(Measurement m1, Measurement m2) { + var mres = new Measurement(); + mres.min = m1.min < m2.min ? m1.min : m2.min; + mres.max = m1.max > m2.max ? m1.max : m2.max; + mres.sum = m1.sum + m2.sum; + mres.count = m1.count + m2.count; + return mres; + } + } + + public static void main(String[] args) throws IOException { + try (FileChannel fileChannel = (FileChannel) Files.newByteChannel(Path.of(FILE), + EnumSet.of(StandardOpenOption.READ))) { + + var cities = splitFileChannel(fileChannel) + .parallel() + .map(CalculateAverage_bjhara::parseBuffer) + .collect(Collectors.reducing(CalculateAverage_bjhara::combineMaps)); + + var sortedCities = new TreeMap<>(cities.orElseThrow()); + System.out.println(sortedCities); + } + } + + private static Map combineMaps(Map map1, + Map map2) { + for (var entry : map2.entrySet()) { + map1.merge(entry.getKey(), entry.getValue(), Measurement::combine); + } + + return map1; + } + + private static Stream splitFileChannel(final FileChannel fileChannel) throws IOException { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator() { + private static final long CHUNK_SIZE = 1024 * 1024 * 10L; + + private final long size = fileChannel.size(); + private long start = 0; + + @Override + public boolean hasNext() { + return start < size; + } + + @Override + public ByteBuffer next() { + try { + MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, start, + Math.min(CHUNK_SIZE, size - start)); + + // don't split the data in the middle of lines + // find the closest previous newline + int realEnd = mappedByteBuffer.limit() - 1; + while (mappedByteBuffer.get(realEnd) != '\n') + realEnd--; + + realEnd++; + + mappedByteBuffer.limit(realEnd); + start += realEnd; + + return mappedByteBuffer; + } + catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + }, Spliterator.IMMUTABLE), false); + } + + private static Map parseBuffer(ByteBuffer bb) { + Map cities = new HashMap<>(); + + final int limit = bb.limit(); + final byte[] buffer = new byte[128]; + + while (bb.position() < limit) { + final int currentPosition = bb.position(); + + // find the ; separator + int separator = currentPosition; + while (separator != limit && bb.get(separator) != ';') + separator++; + + // find the end of the line + int end = separator + 1; + while (end != limit && !Character.isWhitespace((char) bb.get(end))) + end++; + + // get the name as a string + int nameLength = separator - currentPosition; + bb.get(buffer, 0, nameLength); + String city = new String(buffer, 0, nameLength); + + // get rid of the separator + bb.get(); + + // get the double value + int valueLength = end - separator - 1; + bb.get(buffer, 0, valueLength); + String valueStr = new String(buffer, 0, valueLength); + double value = Double.parseDouble(valueStr); + + // and get rid of the new line (handle both kinds) + byte newline = bb.get(); + if (newline == '\r') + bb.get(); + + // update the map with the new measurement + Measurement agg = cities.get(city); + if (agg == null) { + agg = new Measurement(); + cities.put(city, agg); + } + + agg.min = agg.min < value ? agg.min : value; + agg.max = agg.max > value ? agg.max : value; + agg.sum += value; + agg.count++; + } + + return cities; + } +} diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_itaske.java b/src/main/java/dev/morling/onebrc/CalculateAverage_itaske.java new file mode 100644 index 000000000..9e137012f --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_itaske.java @@ -0,0 +1,76 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.AbstractMap; +import java.util.Comparator; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.stream.Collectors; + +public class CalculateAverage_itaske { + + private static final String FILE = "./measurements.txt"; + + private record Measurement(long count, double sum, double min, double max) { + + Measurement(double value) { + this(1, value, value, value); + } + + + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append(round(min)); + builder.append("/"); + builder.append(round(sum/count)); + builder.append("/"); + builder.append(round(max)); + + return builder.toString(); + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + } + + public static void main(String[] args) throws IOException { + + Map resultMap = Files.lines(Path.of(FILE)).parallel() + .map(line -> { + int separatorIndex = line.indexOf(";"); + String key = line.substring(0, separatorIndex); + double value = Double.parseDouble(line.substring(separatorIndex + 1)); + return new AbstractMap.SimpleEntry<>(key, value); + }) + .collect(Collectors.toConcurrentMap( + entry -> entry.getKey(), + entry -> new Measurement(entry.getValue()), + ((measurement1, measurement2) -> new Measurement( + measurement1.count + measurement2.count, + measurement1.sum + measurement2.sum, + Math.min(measurement1.min, measurement2.min), + Math.max(measurement1.max, measurement2.max))))); + + System.out.print( + resultMap.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(Object::toString).collect(Collectors.joining(", ", "{", "}"))); + + } +} diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_kuduwa_keshavram.java b/src/main/java/dev/morling/onebrc/CalculateAverage_kuduwa_keshavram.java new file mode 100644 index 000000000..49c77c0a1 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_kuduwa_keshavram.java @@ -0,0 +1,79 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +public class CalculateAverage_kuduwa_keshavram { + + private static final String FILE = "./measurements.txt"; + + private record Measurement(double min, double max, double sum, long count) { + + Measurement(double initialMeasurement) { + this(initialMeasurement, initialMeasurement, initialMeasurement, 1); + } + + public static Measurement combineWith(Measurement m1, Measurement m2) { + return new Measurement( + m1.min < m2.min ? m1.min : m2.min, + m1.max > m2.max ? m1.max : m2.max, + m1.sum + m2.sum, + m1.count + m2.count); + } + + public String toString() { + return round(min) + "/" + round(sum / count) + "/" + round(max); + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + } + + public static void main(String[] args) throws IOException { + // long before = System.currentTimeMillis(); + Map resultMap = new ConcurrentHashMap<>(); + Files.lines(Path.of(FILE)) + .parallel() + .forEach( + line -> { + int pivot = line.indexOf(";"); + String key = line.substring(0, pivot); + Measurement measured = new Measurement(Double.parseDouble(line.substring(pivot + 1))); + Measurement existingMeasurement = resultMap.get(key); + if (existingMeasurement != null) { + resultMap.put(key, Measurement.combineWith(existingMeasurement, measured)); + } + else { + resultMap.put(key, measured); + } + }); + System.out.print("{"); + System.out.print( + resultMap.entrySet().stream() + .map(Object::toString) + .sorted() + .collect(Collectors.joining(", "))); + System.out.println("}"); + // System.out.println("Took: " + (System.currentTimeMillis() - before)); + } +} diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_padreati.java b/src/main/java/dev/morling/onebrc/CalculateAverage_padreati.java new file mode 100644 index 000000000..1a3273224 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_padreati.java @@ -0,0 +1,197 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.StructuredTaskScope; + +import jdk.incubator.vector.ByteVector; +import jdk.incubator.vector.VectorOperators; +import jdk.incubator.vector.VectorSpecies; + +public class CalculateAverage_padreati { + + private static final VectorSpecies species = ByteVector.SPECIES_PREFERRED; + private static final String FILE = "./measurements.txt"; + private static final int CHUNK_SIZE = 1024 * 1024; + + private record ResultRow(double min, double mean, double max) { + public String toString() { + return round(min) + "/" + round(mean) + "/" + round(max); + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + } + + private record MeasurementAggregator(double min, double max, double sum, long count) { + + public MeasurementAggregator(double seed) { + this(seed, seed, seed, 1); + } + + public MeasurementAggregator merge(MeasurementAggregator b) { + return new MeasurementAggregator( + Math.min(min, b.min), + Math.max(max, b.max), + sum + b.sum, + count + b.count + ); + } + + public ResultRow toResultRow() { + return new ResultRow(min, sum / count, max); + } + } + + public static void main(String[] args) throws IOException { + new CalculateAverage_padreati().run(); + } + + private void run() throws IOException { + File file = new File(FILE); + var splits = findFileSplits(); + List>> subtasks = new ArrayList<>(); + try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { + for (int i = 0; i < splits.size(); i++) { + long splitStart = splits.get(i); + long splitEnd = i < splits.size() - 1 ? splits.get(i + 1) : file.length() + 1; + subtasks.add(scope.fork(() -> chunkProcessor(file, splitStart, splitEnd))); + } + scope.join(); + scope.throwIfFailed(); + + var resultList = subtasks.stream().map(StructuredTaskScope.Subtask::get).toList(); + TreeMap measurements = collapseResults(resultList); + System.out.println(measurements); + + } + catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + private List findFileSplits() throws IOException { + var splits = new ArrayList(); + splits.add(0L); + + File file = new File(FILE); + long next = CHUNK_SIZE; + while (true) { + if (next >= file.length()) { + break; + } + try (FileInputStream fis = new FileInputStream(file)) { + long skip = fis.skip(next); + if (skip != next) { + throw new RuntimeException(); + } + // find first new line + while (true) { + int ch = fis.read(); + if (ch != '\n') { + next++; + continue; + } + break; + } + // skip eventual \\r + if (fis.read() == '\r') { + next++; + } + splits.add(next + 1); + next += CHUNK_SIZE; + } + } + return splits; + } + + public Map chunkProcessor(File source, long start, long end) throws IOException { + var map = new HashMap(); + byte[] buffer = new byte[(int) (end - start)]; + int len; + try (FileInputStream bis = new FileInputStream(source)) { + bis.skip(start); + len = bis.read(buffer, 0, buffer.length); + } + + List nlIndexes = new ArrayList<>(); + List commaIndexes = new ArrayList<>(); + + int loopBound = species.loopBound(len); + int i = 0; + + for (; i < loopBound; i += species.length()) { + ByteVector v = ByteVector.fromArray(species, buffer, i); + var mask = v.compare(VectorOperators.EQ, '\n'); + for (int j = 0; j < species.length(); j++) { + if (mask.laneIsSet(j)) { + nlIndexes.add(i + j); + } + } + mask = v.compare(VectorOperators.EQ, ';'); + for (int j = 0; j < species.length(); j++) { + if (mask.laneIsSet(j)) { + commaIndexes.add(i + j); + } + } + } + for (; i < len; i++) { + if (buffer[i] == '\n') { + nlIndexes.add(i); + } + if (buffer[i] == ';') { + commaIndexes.add(i); + } + } + + int startLine = 0; + for (int j = 0; j < nlIndexes.size(); j++) { + int endLine = nlIndexes.get(j); + int commaIndex = commaIndexes.get(j); + String key = new String(buffer, startLine, commaIndex - startLine); + double value = Double.parseDouble(new String(buffer, commaIndex + 1, endLine - commaIndex - 1)); + map.merge(key, new MeasurementAggregator(value), MeasurementAggregator::merge); + startLine = endLine + 1; + } + return map; + } + + private TreeMap collapseResults(List> resultList) { + HashMap aggregate = new HashMap<>(); + for (var map : resultList) { + for (var entry : map.entrySet()) { + aggregate.merge(entry.getKey(), entry.getValue(), MeasurementAggregator::merge); + } + } + TreeMap measurements = new TreeMap<>(); + for (var entry : aggregate.entrySet()) { + measurements.put(entry.getKey(), entry.getValue().toResultRow()); + } + return measurements; + } + +}