Skip to content

Commit

Permalink
Fixes issue #44 and #45
Browse files Browse the repository at this point in the history
  • Loading branch information
tk3369 committed Jul 4, 2018
1 parent 0750500 commit fa7dce0
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 66 deletions.
183 changes: 117 additions & 66 deletions src/SASLib.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,12 @@ include("Metadata.jl")
function _open(config::ReaderConfig)
# println("Opening $(config.filename)")
handler = Handler(config)
handler.compression = compression_method_none
handler.column_names_strings = []
handler.column_names = []
handler.column_symbols = []
# handler.columns = []
handler.column_formats = []
handler.current_page_data_subheader_pointers = []
handler.current_row_in_file_index = 0
handler.current_row_in_chunk_index = 0
handler.current_row_in_page_index = 0
handler.current_page = 0
_get_properties(handler)
_parse_metadata(handler)
_post_metadata_handler(handler)
init_handler(handler)
read_header(handler)
read_file_metadata(handler)
populate_column_names(handler)
check_user_column_types(handler)
read_first_page(handler)
return handler
end

Expand Down Expand Up @@ -217,7 +209,7 @@ end
# 1. First 288 bytes contain some important info e.g. header_length
# 2. Rest of the bytes in the header is just header_length - 288
#
function _get_properties(handler)
function read_header(handler)

# read header section
seekstart(handler.io)
Expand Down Expand Up @@ -385,11 +377,13 @@ function _get_properties(handler)
# println("os_name = $(handler.os_name)")
end

# Keep reading pages until a meta page is found
function _parse_metadata(handler)
# Read all pages to find metadata
# TODO however, this is inefficient since it reads a lot of data from disk
# TODO can we tell if metadata is complete and break out of loop early?
function read_file_metadata(handler)
# println3(handler, "IN: _parse_metadata")
done = false
while !done
i = 1
while true
# println3(handler, " filepos=$(position(handler.io)) page_length=$(handler.page_length)")
handler.cached_page = Base.read(handler.io, handler.page_length)
if length(handler.cached_page) <= 0
Expand All @@ -398,16 +392,16 @@ function _parse_metadata(handler)
if length(handler.cached_page) != handler.page_length
throw(FileFormatError("Failed to read a meta data page from the SAS file."))
end
done = _process_page_meta(handler)
# println("page $i = $(current_page_type_str(handler))")
_process_page_meta(handler)
i += 1
end
end

# Do this after finish reading metadata but before reading data
function _post_metadata_handler(handler)

# Check user's provided column types has keys that matches column symbols in the file
function check_user_column_types(handler)
# save a copy of column types in a case insensitive dict
handler.column_types_dict = CIDict{Symbol,Type}(handler.config.column_types)

# check column_types
for k in keys(handler.config.column_types)
if !case_insensitive_in(k, handler.column_symbols)
Expand All @@ -422,7 +416,7 @@ function _process_page_meta(handler)
pt = vcat([page_meta_type, page_amd_type], page_mix_types)
# println(" pt=$pt handler.current_page_type=$(handler.current_page_type)")
if handler.current_page_type in pt
# println3(handler, " current_page_type = $(pagetype(handler.current_page_type))")
# println3(handler, " current_page_type = $(current_page_type_str(handler))")
# println3(handler, " current_page = $(handler.current_page)")
# println3(handler, " $(concatenate(stringarray(currentpos(handler))))")
_process_page_metadata(handler)
Expand Down Expand Up @@ -742,25 +736,25 @@ function _process_columnname_subheader(handler, offset, length)
col_name_length, column_name_length_length)
# println(" i=$i col_len=$col_len")

name_str = handler.column_names_strings[idx+1]
# println(" i=$i name_str=$name_str")

name = transcode_metadata(name_str[col_offset+1:col_offset + col_len])

push!(handler.column_names, name)
push!(handler.column_symbols, Symbol(name))
println3(handler, " i=$i name=$name")
cnp = ColumnNamePointer(idx + 1, col_offset, col_len)
push!(handler.column_name_pointers, cnp)
end
end

function _process_columnattributes_subheader(handler, offset, length)
# println("IN: _process_columnattributes_subheader")
int_len = handler.int_length
column_attributes_vectors_count = fld(length - 2 * int_len - 12, int_len + 8)
handler.column_types = fill(column_type_none, column_attributes_vectors_count)
handler.column_data_lengths = fill(0::Int64, column_attributes_vectors_count)
handler.column_data_offsets = fill(0::Int64, column_attributes_vectors_count)
for i in 0:column_attributes_vectors_count-1
N = fld(length - 2 * int_len - 12, int_len + 8)
# println(" column_attributes_vectors_count = $column_attributes_vectors_count")

ty = fill(column_type_none, N)
len = fill(0::Int64, N)
off = fill(0::Int64, N)
# handler.column_types = fill(column_type_none, column_attributes_vectors_count)
# handler.column_data_lengths = fill(0::Int64, column_attributes_vectors_count)
# handler.column_data_offsets = fill(0::Int64, column_attributes_vectors_count)

for i in 0:N-1
col_data_offset = (offset + int_len +
column_data_offset_offset +
i * (int_len + 8))
Expand All @@ -769,20 +763,16 @@ function _process_columnattributes_subheader(handler, offset, length)
i * (int_len + 8))
col_types = (offset + 2 * int_len +
column_type_offset + i * (int_len + 8))

x = _read_int(handler, col_data_offset, int_len)
handler.column_data_offsets[i+1] = x

x = _read_int(handler, col_data_len, column_data_length_length)
handler.column_data_lengths[i+1] = x

j = i + 1
off[j] = _read_int(handler, col_data_offset, int_len)
len[j] = _read_int(handler, col_data_len, column_data_length_length)
x = _read_int(handler, col_types, column_type_length)
if x == 1
handler.column_types[i+1] = column_type_decimal
else
handler.column_types[i+1] = column_type_string
end
ty[j] = (x == 1) ? column_type_decimal : column_type_string
end

push!(handler.column_types, ty...)
push!(handler.column_data_lengths, len...)
push!(handler.column_data_offsets, off...)
end

function _process_columnlist_subheader(handler, offset, length)
Expand Down Expand Up @@ -879,7 +869,7 @@ function read_chunk(handler, nrows=0)
# ns > 0 && !handler.use_base_transcoder &&
# info("Note: encoding incompatible with UTF-8, reader will take more time")

_fill_column_indices(handler)
populate_column_indices(handler)

# allocate columns
handler.byte_chunk = Dict()
Expand All @@ -899,8 +889,13 @@ function read_chunk(handler, nrows=0)
# handler.current_page = 0
handler.current_row_in_chunk_index = 0

perf_read_data = @elapsed read_data(handler, nrows)
perf_chunk_to_data_frame = @elapsed rslt = _chunk_to_dataframe(handler, nrows)
tic()
read_data(handler, nrows)
perf_read_data = toq()

tic()
rslt = _chunk_to_dataframe(handler, nrows)
perf_chunk_to_data_frame = toq()

if handler.config.verbose_level > 1
println("Read data in ", perf_read_data, " msec")
Expand Down Expand Up @@ -962,25 +957,22 @@ function _read_next_page_content(handler)
_process_page_metadata(handler)
end

# println3(handler, " type=$(pagetype(handler.current_page_type))")
# println3(handler, " type=$(current_page_type_str(handler))")
if ! (handler.current_page_type in page_meta_data_mix_types)
# println3(handler, "page type not found $(handler.current_page_type)... reading next one")
return _read_next_page_content(handler)
end
return false
end

# function pagetype(value)
# if value == page_meta_type
# "META"
# elseif value == page_data_type
# "DATA"
# elseif value in page_mix_types
# "MIX"
# else
# "UNKNOWN"
# end
# end
# test -- copied from _read_next_page_content
function my_read_next_page(handler)
handler.current_page += 1
handler.current_page_data_subheader_pointers = []
handler.cached_page = Base.read(handler.io, handler.page_length)
_read_page_header(handler)
handler.current_row_in_page_index = 0
end

# convert Float64 value into Date object
function date_from_float(x::Vector{Float64})
Expand Down Expand Up @@ -1570,12 +1562,14 @@ case_insensitive_in(s::Symbol, ar::AbstractArray) =
lowercase(s) in [x isa Symbol ? lowercase(x) : x for x in ar]

# fill column indices as a dictionary (key = column index, value = column symbol)
function _fill_column_indices(handler)
function populate_column_indices(handler)
handler.column_indices = Vector{Tuple{Int64, Symbol, UInt8}}()
inflag = length(handler.config.include_columns) > 0
exflag = length(handler.config.exclude_columns) > 0
inflag && exflag && throw(ConfigError("You can specify either include_columns or exclude_columns but not both."))
processed = []
# println("handler.column_symbols = $(handler.column_symbols) len=$(length(handler.column_symbols))")
# println("handler.column_types = $(handler.column_types) len=$(length(handler.column_types))")
for j in 1:length(handler.column_symbols)
name = handler.column_symbols[j]
if inflag
Expand Down Expand Up @@ -1624,6 +1618,63 @@ function _determine_vendor(handler::Handler)
end
end

# Populate column names after all meta info is read
function populate_column_names(handler)
for cnp in handler.column_name_pointers
if cnp.index > length(handler.column_names_strings)
name = "unknown_$(cnp.index)_$(cnp.offset)"
else
name_str = handler.column_names_strings[cnp.index]
name = transcode_metadata(name_str[cnp.offset+1:cnp.offset + cnp.length])
end
push!(handler.column_names, name)
push!(handler.column_symbols, Symbol(name))
end
# println("column_names=", handler.column_names)
# println("column_symbols=", handler.column_symbols)
end

# Returns current page type as a string at the current state
function current_page_type_str(handler)
pt = _read_int(handler, handler.page_bit_offset, page_type_length)
return page_type_str(pt)
end

# Convert page type value to human readable string
function page_type_str(pt)
if pt == page_meta_type return "META"
elseif pt == page_amd_type return "AMD"
elseif pt == page_data_type return "DATA"
elseif pt in page_mix_types return "MIX"
else return "UNKNOWN $(pt)"
end
end

# Go back and read the first page again and be ready for read
# This is needed after all metadata is written and the system needs to rewind
function read_first_page(handler)
seek(handler.io, handler.header_length)
my_read_next_page(handler)
end

# Initialize handler object
function init_handler(handler)
handler.compression = compression_method_none
handler.column_names_strings = []
handler.column_names = []
handler.column_symbols = []
handler.column_name_pointers = []
handler.column_formats = []
handler.column_types = []
handler.column_data_lengths = []
handler.column_data_offsets = []
handler.current_page_data_subheader_pointers = []
handler.current_row_in_file_index = 0
handler.current_row_in_chunk_index = 0
handler.current_row_in_page_index = 0
handler.current_page = 0
end

Base.show(io::IO, h::Handler) = print(io, "SASLib.Handler[", h.config.filename, "]")

end # module
7 changes: 7 additions & 0 deletions src/Types.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ struct Column
length::Int64
end

struct ColumnNamePointer
index::Int
offset::Int
length::Int
end

# technically these fields may have lower precision (need casting?)
struct SubHeaderPointer
offset::Int64
Expand All @@ -51,6 +57,7 @@ mutable struct Handler
# column indices being read/returned
# tuple of column index, column symbol, column type
column_indices::Vector{Tuple{Int64, Symbol, UInt8}}
column_name_pointers::Vector{ColumnNamePointer}

current_page_data_subheader_pointers::Vector{SubHeaderPointer}
cached_page::Vector{UInt8}
Expand Down

0 comments on commit fa7dce0

Please sign in to comment.