Skip to content

Commit

Permalink
Fix parallel fileReader.lines bugs (#25983)
Browse files Browse the repository at this point in the history
Fix two bugs in the parallel overloads of `fileReader.lines`:

* for `stripNewline=false`, the final `\n` was missing (due to
calculating fileReader's region's upper-bound incorrectly)
* when the file has fewer lines than the number of targetLocales, some
of the lines could be read multiple times

(fixes the same bugs present in ParallelIO's iterators)

- [x] local paratest
- [x] gasnet paratest

[ reviewed by @mppf ] - thanks!
  • Loading branch information
jeremiah-corrado authored Sep 24, 2024
2 parents de79f7a + 64ac575 commit 9d3eeb8
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 48 deletions.
56 changes: 32 additions & 24 deletions modules/packages/ParallelIO.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ module ParallelIO {
param delim = b"\n";

const fMeta = try! open(filePath, ioMode.r),
fileBounds = 0..<(try! fMeta.size);
fileBounds = 0..(try! fMeta.size);

if tld.size == 0 || (tld.size == 1 && targetLocales.first == here) {
const byteOffsets = try! findDelimChunks(fMeta, delim, nTasks, fileBounds, header);
Expand All @@ -143,17 +143,21 @@ module ParallelIO {
} else {
const byteOffsets = try! findDelimChunks(fMeta, delim, tld.size, fileBounds, header);
coforall (loc, id) in zip(targetLocales, 0..) do on loc {
const locBounds = byteOffsets[id]..byteOffsets[id+1],
locFile = try! open(filePath, ioMode.r),
locByteOffsets = try! findDelimChunks(locFile, delim, nTasks, locBounds, headerPolicy.noHeader);
const locBounds = byteOffsets[id]..byteOffsets[id+1];

coforall tid in 0..<nTasks {
const taskBounds = locByteOffsets[tid]..<locByteOffsets[tid+1],
r = try! locFile.reader(locking=false, region=taskBounds);
// if byteOffsets looks like [0, 10, 10, 14, 21], then don't try to read 10..10 (locale 1)
if locBounds.size > 1 {
const locFile = try! open(filePath, ioMode.r),
locByteOffsets = try! findDelimChunks(locFile, delim, nTasks, locBounds, headerPolicy.noHeader);

var line: lineType;
while (try! r.readLine(line, stripNewline=true)) do
yield line;
coforall tid in 0..<nTasks {
const taskBounds = locByteOffsets[tid]..<locByteOffsets[tid+1],
r = try! locFile.reader(locking=false, region=taskBounds);

var line: lineType;
while (try! r.readLine(line, stripNewline=true)) do
yield line;
}
}
}
}
Expand Down Expand Up @@ -240,7 +244,7 @@ module ParallelIO {
where tag == iterKind.standalone && (dt == string || dt == bytes)
{
const fMeta = try! open(filePath, ioMode.r),
fileBounds = 0..<(try! fMeta.size);
fileBounds = 0..(try! fMeta.size);

if tld.size == 0 || (tld.size == 1 && targetLocales.first == here) {
const byteOffsets = try! findDelimChunks(fMeta, delim, nTasks, fileBounds, header);
Expand All @@ -258,19 +262,23 @@ module ParallelIO {
} else {
const byteOffsets = try! findDelimChunks(fMeta, delim, tld.size, fileBounds, header);
coforall (loc, id) in zip(targetLocales, 0..) do on loc {
const locBounds = byteOffsets[id]..<byteOffsets[id+1],
locFile = try! open(filePath, ioMode.r),
locByteOffsets = try! findDelimChunks(locFile, delim, nTasks, locBounds, headerPolicy.noHeader);

coforall tid in 0..<nTasks {
var des: deserializerType;
const taskBounds = locByteOffsets[tid]..locByteOffsets[tid+1],
r = try! locFile.reader(locking=false, region=taskBounds, deserializer=des);

var item: t;
while (try! r.read(item)) {
yield item;
try! r.advanceThrough(delim);
const locBounds = byteOffsets[id]..<byteOffsets[id+1];

// if byteOffsets looks like [0, 10, 10, 14, 21], then don't try to read 10..10 (locale 1)
if locBounds.size > 1 {
const locFile = try! open(filePath, ioMode.r),
locByteOffsets = try! findDelimChunks(locFile, delim, nTasks, locBounds, headerPolicy.noHeader);

coforall tid in 0..<nTasks {
var des: deserializerType;
const taskBounds = locByteOffsets[tid]..locByteOffsets[tid+1],
r = try! locFile.reader(locking=false, region=taskBounds, deserializer=des);

var item: t;
while (try! r.read(item)) {
yield item;
try! r.advanceThrough(delim);
}
}
}
}
Expand Down
46 changes: 24 additions & 22 deletions modules/standard/IO.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -7117,7 +7117,7 @@ iter fileReader.lines(
const f = chpl_fileFromReaderOrWriter(this),
myStart = qio_channel_start_offset_unlocked(this._channel_internal),
myEnd = qio_channel_end_offset_unlocked(this._channel_internal),
myBounds = myStart..<min(myEnd, try! f.size),
myBounds = myStart..min(myEnd, try! f.size),
nTasks = if dataParTasksPerLocale>0 then dataParTasksPerLocale
else here.maxTaskPar;
try {
Expand Down Expand Up @@ -7158,36 +7158,38 @@ iter fileReader.lines(
const f = chpl_fileFromReaderOrWriter(this),
myStart = qio_channel_start_offset_unlocked(this._channel_internal),
myEnd = qio_channel_end_offset_unlocked(this._channel_internal),
myBounds = myStart..<min(myEnd, try! f.size),
myBounds = myStart..min(myEnd, try! f.size),
fpath = try! f.path;

try {
// try to break the file into chunks and read the lines in parallel
// can fail if the file is too small to break into 'targetLocales.size' chunks
const byteOffsets = findFileChunks(f, targetLocales.size, myBounds);

coforall lid in 0..<targetLocales.size do on targetLocales[tld.orderToIndex(lid)] {
const locBounds = byteOffsets[lid]..byteOffsets[lid+1],
locFile = try! open(fpath, ioMode.r),
nTasks = if dataParTasksPerLocale>0 then dataParTasksPerLocale
else here.maxTaskPar;

try {
// try to break this locale's chunk into 'nTasks' chunks and read the lines in parallel
const locByteOffsets = findFileChunks(locFile, nTasks, locBounds);
coforall tid in 0..<nTasks {
const taskBounds = locByteOffsets[tid]..<locByteOffsets[tid+1],
r = try! locFile.reader(region=taskBounds);

if taskBounds.size > 0 {
var line: t;
while (try! r.readLine(line, stripNewline=stripNewline)) do
yield line;
const locBounds = byteOffsets[lid]..byteOffsets[lid+1];

// if byteOffsets looks like [0, 10, 10, 14, 21], then don't try to read 10..10 (locale 1)
if locBounds.size > 1 {
const locFile = try! open(fpath, ioMode.r),
nTasks = if dataParTasksPerLocale>0 then dataParTasksPerLocale
else here.maxTaskPar;
try {
// try to break this locale's chunk into 'nTasks' chunks and read the lines in parallel
const locByteOffsets = findFileChunks(locFile, nTasks, locBounds);
coforall tid in 0..<nTasks {
const taskBounds = locByteOffsets[tid]..<locByteOffsets[tid+1],
r = try! locFile.reader(region=taskBounds);

if taskBounds.size > 0 {
var line: t;
while (try! r.readLine(line, stripNewline=stripNewline)) do
yield line;
}
}
} catch {
// fall back to serial iteration for this locale if 'findFileChunks' fails
for line in locFile.reader(region=locBounds)._lines_serial(stripNewline, t) do yield line;
}
} catch {
// fall back to serial iteration for this locale if 'findFileChunks' fails
for line in locFile.reader(region=locBounds)._lines_serial(stripNewline, t) do yield line;
}
}
} catch {
Expand Down
30 changes: 29 additions & 1 deletion test/library/standard/IO/linesIterator/lines.chpl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use IO, FileSystem;

config const fileName = "lines.txt",
n = 10000;
n = 9999;

const nsum = (n * (n + 1)) / 2;

Expand Down Expand Up @@ -74,4 +74,32 @@ assert(dataFileSum(18.., false, bytes, [Locales.first,]) == nsum - 45);
assert(dataFileSum(18.., false, bytes, [Locales.last,]) == nsum - 45);
assert(dataFileSum(18.., false, bytes, none) == nsum - 45);


// skipping last 5 lines
// digit-len + \n + ('region' not inclusive)
const nBytes = 5 * ((n:string).size + 1) + 1,
fileLen = try! open(fileName, mode=ioMode.r).size,
toRead = fileLen - nBytes,
nsum2 = ((n - 5) * ((n - 5) + 1)) / 2;

assert(dataFileSum(..toRead, true, string, Locales) == nsum2);
assert(dataFileSum(..toRead, true, string, [Locales.first,]) == nsum2);
assert(dataFileSum(..toRead, true, string, [Locales.last,]) == nsum2);
assert(dataFileSum(..toRead, true, string, none) == nsum2);

assert(dataFileSum(..toRead, false, string, Locales) == nsum2);
assert(dataFileSum(..toRead, false, string, [Locales.first,]) == nsum2);
assert(dataFileSum(..toRead, false, string, [Locales.last,]) == nsum2);
assert(dataFileSum(..toRead, false, string, none) == nsum2);

assert(dataFileSum(..toRead, true, bytes, Locales) == nsum2);
assert(dataFileSum(..toRead, true, bytes, [Locales.first,]) == nsum2);
assert(dataFileSum(..toRead, true, bytes, [Locales.last,]) == nsum2);
assert(dataFileSum(..toRead, true, bytes, none) == nsum2);

assert(dataFileSum(..toRead, false, bytes, Locales) == nsum2);
assert(dataFileSum(..toRead, false, bytes, [Locales.first,]) == nsum2);
assert(dataFileSum(..toRead, false, bytes, [Locales.last,]) == nsum2);
assert(dataFileSum(..toRead, false, bytes, none) == nsum2);

remove(fileName);
2 changes: 1 addition & 1 deletion test/library/standard/IO/linesIterator/lines.execopts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
--n=15
--n=85
--n=10000
--n=9999
4 changes: 4 additions & 0 deletions test/library/standard/IO/linesIterator/small.chpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
use IO;

forall line in openReader("small.in").lines(targetLocales=Locales) do
write("Locale ", here.id, " read: ", line);
4 changes: 4 additions & 0 deletions test/library/standard/IO/linesIterator/small.comm-none.good
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Locale 0 read: 1
Locale 0 read: 2
Locale 0 read: 3
Locale 0 read: 4
4 changes: 4 additions & 0 deletions test/library/standard/IO/linesIterator/small.good
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Locale 0 read: 1
Locale 0 read: 2
Locale 1 read: 3
Locale 2 read: 4
4 changes: 4 additions & 0 deletions test/library/standard/IO/linesIterator/small.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
1
2
3
4
1 change: 1 addition & 0 deletions test/library/standard/IO/linesIterator/small.numlocales
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
4
1 change: 1 addition & 0 deletions test/library/standard/IO/linesIterator/small.prediff

0 comments on commit 9d3eeb8

Please sign in to comment.