diff --git a/saddle-io/src/main/scala/org/saddle/io/csv/DataBuffer.scala b/saddle-io/src/main/scala/org/saddle/io/csv/DataBuffer.scala index b0a1bba8..d776f2a5 100644 --- a/saddle-io/src/main/scala/org/saddle/io/csv/DataBuffer.scala +++ b/saddle-io/src/main/scala/org/saddle/io/csv/DataBuffer.scala @@ -1136,6 +1136,7 @@ private[csv] final class DataBuffer1( private var outputChars: Array[Char] = null // -2 means quote unclosed + // -1 means initial value private var outputLength = -1 private val outputFrom = Array.ofDim[Int](bufferSize + 2) @@ -1183,77 +1184,83 @@ private[csv] final class DataBuffer1( } else ((outputChars, outputFrom, outputTo, outputLength)) private def fillBuffer(): Boolean = { - if (!data.hasNext) false - else { - filledNewData = true + if (!data.hasNext) { + false + } else { val next = data.next() - - quoteMask.zero() - lfMask.zero() - fieldSeparatorMask.zero() - - if (commonCSV) - elementWiseEqual3CSV_LF( - a = next, - o1 = quoteMask, - o2 = lfMask, - o3 = fieldSeparatorMask - ) - else if (commonTSV) - elementWiseEqual3TSV( - a = next, - o1 = quoteMask, - o2 = lfMask, - o3 = fieldSeparatorMask - ) - else - elementWiseEqual3Variable( - a = next, - t1 = quoteChar, - o1 = quoteMask, - t2 = recordSeparator, - o2 = lfMask, - t3 = fieldSeparator, - o3 = fieldSeparatorMask + if (!next.hasRemaining()) { + outputLength = 0 + true + } else { + filledNewData = true + + quoteMask.zero() + lfMask.zero() + fieldSeparatorMask.zero() + + if (commonCSV) + elementWiseEqual3CSV_LF( + a = next, + o1 = quoteMask, + o2 = lfMask, + o3 = fieldSeparatorMask + ) + else if (commonTSV) + elementWiseEqual3TSV( + a = next, + o1 = quoteMask, + o2 = lfMask, + o3 = fieldSeparatorMask + ) + else + elementWiseEqual3Variable( + a = next, + t1 = quoteChar, + o1 = quoteMask, + t2 = recordSeparator, + o2 = lfMask, + t3 = fieldSeparator, + o3 = fieldSeparatorMask + ) + + val nInQuote: BitSet = prefixSumXor(quoteMask) + nInQuote.negateInplace() + + val (structuralMask: BitSet, recordSeparatorMask: BitSet) = { + + lfMask &= nInQuote + fieldSeparatorMask &= nInQuote + + val structuralMask = lfMask | fieldSeparatorMask + + (structuralMask, lfMask) + } + outputChars = next.array() + assert(next.arrayOffset() == 0) + outputLength = toIndices1( + chars = next, + structuralMask = structuralMask, + recordSeparatorMask = recordSeparatorMask, + quoteMask = quoteMask, + from = outputFrom, + to = outputTo ) - val nInQuote: BitSet = prefixSumXor(quoteMask) - nInQuote.negateInplace() - - val (structuralMask: BitSet, recordSeparatorMask: BitSet) = { - - lfMask &= nInQuote - fieldSeparatorMask &= nInQuote - - val structuralMask = lfMask | fieldSeparatorMask - - (structuralMask, lfMask) - } - outputChars = next.array() - assert(next.arrayOffset() == 0) - outputLength = toIndices1( - chars = next, - structuralMask = structuralMask, - recordSeparatorMask = recordSeparatorMask, - quoteMask = quoteMask, - from = outputFrom, - to = outputTo - ) + if (recordSeparatorMask.contains(next.limit() - 1)) { + lineClosed = true + } else { + lineClosed = false + } - if (recordSeparatorMask.contains(next.limit() - 1)) { - lineClosed = true - } else { - lineClosed = false - } + if (next.position() == 0 && lastPos == 0) { + bufferTooShort = true + false + } else { + lastPos = next.position() + true + } - if (next.position() == 0 && lastPos == 0) { - bufferTooShort = true - false - } else { - lastPos = next.position() - true } - } } @@ -1319,76 +1326,81 @@ private[csv] final class DataBuffer2( private def fillBuffer(): Boolean = { if (!data.hasNext) false else { - filledNewData = true val next = data.next() + if (!next.hasRemaining()) { + outputLength = 0 + true + } else { + filledNewData = true + + quoteMask.zero() + crMask.zero() + lfMask.zero() + fieldSeparatorMask.zero() + + if (rfcCompatible) + elementWiseEqual4RFC( + a = next, + o1 = quoteMask, + o2 = lfMask, + o3 = crMask, + o4 = fieldSeparatorMask + ) + else + elementWiseEqual4Variable( + a = next, + t1 = quoteChar, + o1 = quoteMask, + t2 = recordSeparator2, + o2 = lfMask, + t3 = recordSeparator1, + o3 = crMask, + t4 = fieldSeparator, + o4 = fieldSeparatorMask + ) + + val nInQuote: BitSet = prefixSumXor(quoteMask) + nInQuote.negateInplace() + + val (structuralMask: BitSet, recordSeparatorMask: BitSet) = { + + crMask &= nInQuote + lfMask &= nInQuote + fieldSeparatorMask &= nInQuote + + // overwrites crMask + makeMaskFromCrLfInPlace(crMask, lfMask) + val structuralMask = crMask | fieldSeparatorMask + + (structuralMask, crMask) - quoteMask.zero() - crMask.zero() - lfMask.zero() - fieldSeparatorMask.zero() - - if (rfcCompatible) - elementWiseEqual4RFC( - a = next, - o1 = quoteMask, - o2 = lfMask, - o3 = crMask, - o4 = fieldSeparatorMask - ) - else - elementWiseEqual4Variable( - a = next, - t1 = quoteChar, - o1 = quoteMask, - t2 = recordSeparator2, - o2 = lfMask, - t3 = recordSeparator1, - o3 = crMask, - t4 = fieldSeparator, - o4 = fieldSeparatorMask + } + outputChars = next.array() + assert(next.arrayOffset() == 0) + outputLength = toIndices2( + chars = next, + structuralMask = structuralMask, + recordSeparatorMask = recordSeparatorMask, + quoteMask = quoteMask, + from = outputFrom, + to = outputTo ) - val nInQuote: BitSet = prefixSumXor(quoteMask) - nInQuote.negateInplace() - - val (structuralMask: BitSet, recordSeparatorMask: BitSet) = { - - crMask &= nInQuote - lfMask &= nInQuote - fieldSeparatorMask &= nInQuote - - // overwrites crMask - makeMaskFromCrLfInPlace(crMask, lfMask) - val structuralMask = crMask | fieldSeparatorMask - - (structuralMask, crMask) - - } - outputChars = next.array() - assert(next.arrayOffset() == 0) - outputLength = toIndices2( - chars = next, - structuralMask = structuralMask, - recordSeparatorMask = recordSeparatorMask, - quoteMask = quoteMask, - from = outputFrom, - to = outputTo - ) + if (recordSeparatorMask.contains(next.limit() - 1)) { + lineClosed = true + } else { + lineClosed = false + } - if (recordSeparatorMask.contains(next.limit() - 1)) { - lineClosed = true - } else { - lineClosed = false - } + if (next.position() == 0 && lastPos == 0) { + bufferTooShort = true + false + } else { + lastPos = next.position() + true + } - if (next.position() == 0 && lastPos == 0) { - bufferTooShort = true - false - } else { - lastPos = next.position() - true } - } } diff --git a/saddle-io/src/main/scala/org/saddle/io/csv/package.scala b/saddle-io/src/main/scala/org/saddle/io/csv/package.scala index c2e80c69..1fbfd66c 100644 --- a/saddle-io/src/main/scala/org/saddle/io/csv/package.scala +++ b/saddle-io/src/main/scala/org/saddle/io/csv/package.scala @@ -151,7 +151,7 @@ package object csv { var eof = false var decFlushed = false charset.reset() - def fillBuffer() = if (!eof) { + def fillBuffer(): Unit = if (!eof) { var count = channel.read(buffer) while (count >= 0 && buffer.remaining > 0) { count = channel.read(buffer) @@ -165,7 +165,8 @@ package object csv { s"Decoder error $result, buffer=${format(buffer)}, charbuffer='${charBuffer}'" ) buffer.compact - if (count < 0) { + if (count < 0 && result == CoderResult.UNDERFLOW) { + eof = true val result = charset.flush(charBuffer) charBuffer.flip @@ -177,9 +178,13 @@ package object csv { } else { charBuffer.compact - charset.flush(charBuffer) + val result = charset.flush(charBuffer) charBuffer.flip - decFlushed = true + if (result == CoderResult.UNDERFLOW) { decFlushed = true } + else + throw new RuntimeException( + s"Decoder flush error or overflow for the second time ($result), buffer=${format(buffer)}, charbuffer='${charBuffer}. If this is an overflow then the caller must provide a larger buffer.'" + ) } new Iterator[CharBuffer] { def hasNext = !decFlushed diff --git a/saddle-io/src/test/scala/org/saddle/io/csv/csv.test.scala b/saddle-io/src/test/scala/org/saddle/io/csv/csv.test.scala index fd0ef747..71457a03 100644 --- a/saddle-io/src/test/scala/org/saddle/io/csv/csv.test.scala +++ b/saddle-io/src/test/scala/org/saddle/io/csv/csv.test.scala @@ -96,6 +96,143 @@ class CSVSuite extends AnyFunSuite { ) ) } + test("lf on the 64th") { + val lf = "]" + val data = + s"""key,pivot,value${lf}0,"a","v1"${lf}0,"b","v2"${lf}0,"b","v3"${lf}1,"c","v4 "${lf}2,"a","v5"${lf}0,"c","v6"${lf}3,"d","v7"${lf}""" + val src = ByteChannel(data) + val buffer = new BufferCallback + org.saddle.io.csv.parse( + src, + buffer, + recordSeparator = lf, + bufferSize = 128 + ) + assert( + buffer.toList == List( + "key", + "pivot", + "value", + "0", + "a", + "v1", + "0", + "b", + "v2", + "0", + "b", + "v3", + "1", + "c", + "v4 ", + "2", + "a", + "v5", + "0", + "c", + "v6", + "3", + "d", + "v7" + ) + ) + } + test("lf on buffer break") { + 0 until 128 foreach { j => + (7 + j) to 128 foreach { i => + val lf = "]" + val insert = 0 until j map (_ => ' ') mkString + + val data = + s"""key,pivot,value${lf}0,"a","v1"${lf}0,"b","v2"${lf}0,"b","v3"${lf}1,"c","v4$insert"${lf}2,"a","v5"${lf}0,"c","v6"${lf}3,"d","v7"${lf}""" + val src = ByteChannel(data) + val buffer = new BufferCallback + val r = org.saddle.io.csv.parse( + src, + buffer, + recordSeparator = lf, + bufferSize = i + ) + assert(r.isEmpty, r.toString) + + assert( + buffer.toList == List( + "key", + "pivot", + "value", + "0", + "a", + "v1", + "0", + "b", + "v2", + "0", + "b", + "v3", + "1", + "c", + s"v4$insert", + "2", + "a", + "v5", + "0", + "c", + "v6", + "3", + "d", + "v7" + ) + ) + } + } + } + test("crlf on buffer break") { + 0 until 128 foreach { j => + (7 + j) to 128 foreach { i => + val insert = 0 until j map (_ => ' ') mkString + val lf = "[]" + val data = + s"""key,pivot,value${lf}0,"a","v1"${lf}0,"b","v2"${lf}0,"b","v3"${lf}1,"c","v4$insert"${lf}2,"a","v5"${lf}0,"c","v6"${lf}3,"d","v7"${lf}""" + val src = ByteChannel(data) + val buffer = new BufferCallback + val r = org.saddle.io.csv.parse( + src, + buffer, + recordSeparator = lf, + bufferSize = i + ) + assert(r.isEmpty, r.toString) + assert( + buffer.toList == List( + "key", + "pivot", + "value", + "0", + "a", + "v1", + "0", + "b", + "v2", + "0", + "b", + "v3", + "1", + "c", + s"v4$insert", + "2", + "a", + "v5", + "0", + "c", + "v6", + "3", + "d", + "v7" + ) + ) + } + } + } test("single line") { val crlf = "[]"