diff --git a/src/ParquetMsg.chpl b/src/ParquetMsg.chpl index 366428e0be3..e9c267990ba 100644 --- a/src/ParquetMsg.chpl +++ b/src/ParquetMsg.chpl @@ -46,6 +46,8 @@ module ParquetMsg { // Undocumented for now, just for internal experiments private config const batchSize = getEnvInt("ARKOUDA_SERVER_PARQUET_BATCH_SIZE", 8192); + private config const parallelWriteThreshold = 512*1024*1024 / numBytes(int); + extern var ARROWINT64: c_int; extern var ARROWINT32: c_int; extern var ARROWUINT64: c_int; @@ -417,6 +419,7 @@ module ParquetMsg { dtype, compression, errMsg): int; var dtypeRep = toCDtype(dtype); + var doParallel = if A.size > parallelWriteThreshold then true else false; var prefix: string; var extension: string; @@ -454,10 +457,32 @@ module ParquetMsg { valPtr = c_ptrTo(locArr); } if mode == TRUNCATE || !filesExist { - if c_writeColumnToParquet(myFilename.localize().c_str(), valPtr, 0, - dsetname.localize().c_str(), locDom.size, rowGroupSize, - dtypeRep, compression, c_ptrTo(pqErr.errMsg)) == ARROWERROR { - pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName()); + if !doParallel { + if c_writeColumnToParquet(myFilename.localize().c_str(), valPtr, 0, + dsetname.localize().c_str(), locDom.size, rowGroupSize, + dtypeRep, compression, c_ptrTo(pqErr.errMsg)) == ARROWERROR { + pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName()); + } + } else { + var fileSizes: [0..#loc.maxTaskPar] int = locDom.size/loc.maxTaskPar; + // First file has the extra elements if it isn't evenly divisible by maxTaskPar + fileSizes[0] += locDom.size - ((locDom.size/loc.maxTaskPar)*loc.maxTaskPar); + + var offsets = + scan fileSizes; + + forall i in fileSizes.domain { + var suffix = '%04i'.format(idx): string; + var parSuffix = '%04i'.format(i): string; + const parFilename = filename + "_LOCALE" + suffix + "_CORE" + parSuffix + ".parquet"; + var oi = if i == 0 then i else offsets[i-1]; + var coreArr = locArr[oi..#(fileSizes[i])]; + var pqErr = new parquetErrorMsg(); + if c_writeColumnToParquet(parFilename.localize().c_str(), c_ptrTo(coreArr), 0, + dsetname.localize().c_str(), coreArr.size, rowGroupSize, + dtypeRep, compression, c_ptrTo(pqErr.errMsg)) == ARROWERROR { + pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName()); + } + } } } else { if c_appendColumnToParquet(myFilename.localize().c_str(), valPtr,