Skip to content

Commit

Permalink
Fix csv parser
Browse files Browse the repository at this point in the history
Ensure all characters are decoded
  • Loading branch information
pityka committed Jun 2, 2023
1 parent 0ef16f9 commit 31c1e4e
Show file tree
Hide file tree
Showing 3 changed files with 286 additions and 132 deletions.
268 changes: 140 additions & 128 deletions saddle-io/src/main/scala/org/saddle/io/csv/DataBuffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,7 @@ private[csv] final class DataBuffer1(
private var outputChars: Array[Char] = null

// -2 means quote unclosed
// -1 means initial value
private var outputLength = -1

private val outputFrom = Array.ofDim[Int](bufferSize + 2)
Expand Down Expand Up @@ -1183,77 +1184,83 @@ private[csv] final class DataBuffer1(
} else ((outputChars, outputFrom, outputTo, outputLength))

private def fillBuffer(): Boolean = {
if (!data.hasNext) false
else {
filledNewData = true
if (!data.hasNext) {
false
} else {
val next = data.next()

quoteMask.zero()
lfMask.zero()
fieldSeparatorMask.zero()

if (commonCSV)
elementWiseEqual3CSV_LF(
a = next,
o1 = quoteMask,
o2 = lfMask,
o3 = fieldSeparatorMask
)
else if (commonTSV)
elementWiseEqual3TSV(
a = next,
o1 = quoteMask,
o2 = lfMask,
o3 = fieldSeparatorMask
)
else
elementWiseEqual3Variable(
a = next,
t1 = quoteChar,
o1 = quoteMask,
t2 = recordSeparator,
o2 = lfMask,
t3 = fieldSeparator,
o3 = fieldSeparatorMask
if (!next.hasRemaining()) {
outputLength = 0
true
} else {
filledNewData = true

quoteMask.zero()
lfMask.zero()
fieldSeparatorMask.zero()

if (commonCSV)
elementWiseEqual3CSV_LF(
a = next,
o1 = quoteMask,
o2 = lfMask,
o3 = fieldSeparatorMask
)
else if (commonTSV)
elementWiseEqual3TSV(
a = next,
o1 = quoteMask,
o2 = lfMask,
o3 = fieldSeparatorMask
)
else
elementWiseEqual3Variable(
a = next,
t1 = quoteChar,
o1 = quoteMask,
t2 = recordSeparator,
o2 = lfMask,
t3 = fieldSeparator,
o3 = fieldSeparatorMask
)

val nInQuote: BitSet = prefixSumXor(quoteMask)
nInQuote.negateInplace()

val (structuralMask: BitSet, recordSeparatorMask: BitSet) = {

lfMask &= nInQuote
fieldSeparatorMask &= nInQuote

val structuralMask = lfMask | fieldSeparatorMask

(structuralMask, lfMask)
}
outputChars = next.array()
assert(next.arrayOffset() == 0)
outputLength = toIndices1(
chars = next,
structuralMask = structuralMask,
recordSeparatorMask = recordSeparatorMask,
quoteMask = quoteMask,
from = outputFrom,
to = outputTo
)

val nInQuote: BitSet = prefixSumXor(quoteMask)
nInQuote.negateInplace()

val (structuralMask: BitSet, recordSeparatorMask: BitSet) = {

lfMask &= nInQuote
fieldSeparatorMask &= nInQuote

val structuralMask = lfMask | fieldSeparatorMask

(structuralMask, lfMask)
}
outputChars = next.array()
assert(next.arrayOffset() == 0)
outputLength = toIndices1(
chars = next,
structuralMask = structuralMask,
recordSeparatorMask = recordSeparatorMask,
quoteMask = quoteMask,
from = outputFrom,
to = outputTo
)
if (recordSeparatorMask.contains(next.limit() - 1)) {
lineClosed = true
} else {
lineClosed = false
}

if (recordSeparatorMask.contains(next.limit() - 1)) {
lineClosed = true
} else {
lineClosed = false
}
if (next.position() == 0 && lastPos == 0) {
bufferTooShort = true
false
} else {
lastPos = next.position()
true
}

if (next.position() == 0 && lastPos == 0) {
bufferTooShort = true
false
} else {
lastPos = next.position()
true
}

}
}

Expand Down Expand Up @@ -1319,76 +1326,81 @@ private[csv] final class DataBuffer2(
private def fillBuffer(): Boolean = {
if (!data.hasNext) false
else {
filledNewData = true
val next = data.next()
if (!next.hasRemaining()) {
outputLength = 0
true
} else {
filledNewData = true

quoteMask.zero()
crMask.zero()
lfMask.zero()
fieldSeparatorMask.zero()

if (rfcCompatible)
elementWiseEqual4RFC(
a = next,
o1 = quoteMask,
o2 = lfMask,
o3 = crMask,
o4 = fieldSeparatorMask
)
else
elementWiseEqual4Variable(
a = next,
t1 = quoteChar,
o1 = quoteMask,
t2 = recordSeparator2,
o2 = lfMask,
t3 = recordSeparator1,
o3 = crMask,
t4 = fieldSeparator,
o4 = fieldSeparatorMask
)

val nInQuote: BitSet = prefixSumXor(quoteMask)
nInQuote.negateInplace()

val (structuralMask: BitSet, recordSeparatorMask: BitSet) = {

crMask &= nInQuote
lfMask &= nInQuote
fieldSeparatorMask &= nInQuote

// overwrites crMask
makeMaskFromCrLfInPlace(crMask, lfMask)
val structuralMask = crMask | fieldSeparatorMask

(structuralMask, crMask)

quoteMask.zero()
crMask.zero()
lfMask.zero()
fieldSeparatorMask.zero()

if (rfcCompatible)
elementWiseEqual4RFC(
a = next,
o1 = quoteMask,
o2 = lfMask,
o3 = crMask,
o4 = fieldSeparatorMask
)
else
elementWiseEqual4Variable(
a = next,
t1 = quoteChar,
o1 = quoteMask,
t2 = recordSeparator2,
o2 = lfMask,
t3 = recordSeparator1,
o3 = crMask,
t4 = fieldSeparator,
o4 = fieldSeparatorMask
}
outputChars = next.array()
assert(next.arrayOffset() == 0)
outputLength = toIndices2(
chars = next,
structuralMask = structuralMask,
recordSeparatorMask = recordSeparatorMask,
quoteMask = quoteMask,
from = outputFrom,
to = outputTo
)

val nInQuote: BitSet = prefixSumXor(quoteMask)
nInQuote.negateInplace()

val (structuralMask: BitSet, recordSeparatorMask: BitSet) = {

crMask &= nInQuote
lfMask &= nInQuote
fieldSeparatorMask &= nInQuote

// overwrites crMask
makeMaskFromCrLfInPlace(crMask, lfMask)
val structuralMask = crMask | fieldSeparatorMask

(structuralMask, crMask)

}
outputChars = next.array()
assert(next.arrayOffset() == 0)
outputLength = toIndices2(
chars = next,
structuralMask = structuralMask,
recordSeparatorMask = recordSeparatorMask,
quoteMask = quoteMask,
from = outputFrom,
to = outputTo
)
if (recordSeparatorMask.contains(next.limit() - 1)) {
lineClosed = true
} else {
lineClosed = false
}

if (recordSeparatorMask.contains(next.limit() - 1)) {
lineClosed = true
} else {
lineClosed = false
}
if (next.position() == 0 && lastPos == 0) {
bufferTooShort = true
false
} else {
lastPos = next.position()
true
}

if (next.position() == 0 && lastPos == 0) {
bufferTooShort = true
false
} else {
lastPos = next.position()
true
}

}
}

Expand Down
13 changes: 9 additions & 4 deletions saddle-io/src/main/scala/org/saddle/io/csv/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ package object csv {
var eof = false
var decFlushed = false
charset.reset()
def fillBuffer() = if (!eof) {
def fillBuffer(): Unit = if (!eof) {
var count = channel.read(buffer)
while (count >= 0 && buffer.remaining > 0) {
count = channel.read(buffer)
Expand All @@ -165,7 +165,8 @@ package object csv {
s"Decoder error $result, buffer=${format(buffer)}, charbuffer='${charBuffer}'"
)
buffer.compact
if (count < 0) {
if (count < 0 && result == CoderResult.UNDERFLOW) {

eof = true
val result = charset.flush(charBuffer)
charBuffer.flip
Expand All @@ -177,9 +178,13 @@ package object csv {

} else {
charBuffer.compact
charset.flush(charBuffer)
val result = charset.flush(charBuffer)
charBuffer.flip
decFlushed = true
if (result == CoderResult.UNDERFLOW) { decFlushed = true }
else
throw new RuntimeException(
s"Decoder flush error or overflow for the second time ($result), buffer=${format(buffer)}, charbuffer='${charBuffer}. If this is an overflow then the caller must provide a larger buffer.'"
)
}
new Iterator[CharBuffer] {
def hasNext = !decFlushed
Expand Down
Loading

0 comments on commit 31c1e4e

Please sign in to comment.