Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added option to write data to csv files and iterate over it #49

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions azure/Kqlmagic/Kql_response_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from io import BytesIO, SEEK_SET, SEEK_END
import ijson
from .log import logger

class ResponseStream(object): #wrapper class to wrap a stream as a File object - for ijson
def __init__(self, request_iterator):
self._bytes = BytesIO()
self._iterator = request_iterator

def _load_all(self):
self._bytes.seek(0, SEEK_END)
for chunk in self._iterator:
self._bytes.write(chunk)

def _load_until(self, goal_position):
current_position = self._bytes.seek(0, SEEK_END)
while current_position < goal_position:
try:
current_position = self._bytes.write(next(self._iterator))
except StopIteration:
break

def tell(self):
return self._bytes.tell()

def read(self, size=None):
left_off_at = self._bytes.tell()
if size is None:
self._load_all()
else:
goal_position = left_off_at + size
self._load_until(goal_position)

self._bytes.seek(left_off_at)
return self._bytes.read(size)

def seek(self, position, whence=SEEK_SET):
if whence == SEEK_END:
self._load_all()
else:
self._bytes.seek(position, whence)

import csv
import itertools

class CSV_table_reader(list):
#a wrapper class for List that iterates over a csv file #
def __iter__(self):
self.i = 0
return self
def __next__(self):
try:
result = self.__getitem__(self.i)
self.i = self.i+1
return result
except:
raise StopIteration

def __init__(self, foldername):
from .display import Display
from .constants import Constants

self.foldername = f"{Display.showfiles_base_path}/{Display.showfiles_folder_name}/{foldername}"
self.buffer_size = Constants.STREAM_BUFFER_SIZE
self.buff_start = 0
self.current_csv_number = 1
self.row_buffer = self.initialize_buffer()

def initialize_buffer(self):
import time
now = time.time()
with open(f"{self.foldername}/{self.current_csv_number}.csv", "r") as infile:
r = csv.reader(infile)
tmp_buffer = []
for num in (itertools.islice(r, 0, self.buffer_size, None)):
tmp_buffer.append(num)
after = time.time()
print(f"init {self.buffer_size} took {after-now} s")
return tmp_buffer
def __getitem__(self, i):
start_edge = self.buff_start
k = (i % self.buffer_size)
end_edge = self.buff_start + self.buffer_size

if i>=start_edge and i<end_edge:
return self.row_buffer[k]
else:
try:
self.current_csv_number = (i // self.buffer_size )+1
self.row_buffer = self.initialize_buffer()
self.buff_start =(self.current_csv_number-1) * self.buffer_size
return self.row_buffer[k]
except FileNotFoundError:
raise IndexError

def __len__(self):
self.len = self.get_len()
return self.len

def get_len(self):
i =0
try:
while True:
self.__getitem__(i)
i+=1
except IndexError:
return i
return i
2 changes: 2 additions & 0 deletions azure/Kqlmagic/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class Constants(object):
SSO_ENV_VAR_NAME = f"{MAGIC_CLASS_NAME.upper()}_SSO_ENCRYPTION_KEYS"
SSO_DB_KEY_PREFIX = f"{MAGIC_CLASS_NAME.lower()}store/tokens/"

STREAM_BUFFER_SIZE = 10000

class Schema(object):
APPLICATION_INSIGHTS = "applicationinsights"
LOG_ANALYTICS = "loganalytics"
Expand Down
6 changes: 6 additions & 0 deletions azure/Kqlmagic/kql_magic.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,12 @@ class Kqlmagic(Magics, Configurable):
Abbreviation: usertag"""
)

data_stream = Bool(
False,
config=True,
help=f"""if set to True, data returned from Kusto is streamed to a file and read in chunks. Use this for consuming large amounts of Data."""
)

logger().debug("Kqlmagic:: - define class code")


Expand Down
63 changes: 58 additions & 5 deletions azure/Kqlmagic/kql_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import six
import pandas
from .log import logger


from .display import Display
Expand Down Expand Up @@ -104,7 +105,11 @@ def __init__(self, response, **kwargs):
self.completion_query_info = response.completion_query_info_results
self.completion_query_resource_consumption = response.completion_query_resource_consumption_results
self.dataSetCompletion = response.dataSetCompletion_results
self.tables = [KqlTableResponse(t, response.visualization_results.get(t.id, {})) for t in response.primary_results]
if kwargs.get("data_stream"):
from .Kql_response_wrapper import tables_gen
self.tables = tables_gen(response)
else:
self.tables = [KqlTableResponse(t, response.visualization_results.get(t.id, {})) for t in response.primary_results]


class KqlTableResponse(object):
Expand Down Expand Up @@ -169,16 +174,32 @@ def _map_columns_to_index(self, columns: list):
def returns_rows(self):
return self.data_table.rows_count > 0

def create_frame_from_files(self, foldername, pandas_df):
import glob
import dask.dataframe as dd

files = [f for f in glob.glob(f"{foldername}/*.csv", recursive=False)]
files = sorted(files, key = lambda x: x[:-4])
if not pandas_df:
return dd.read_csv(files, names=self.data_table.columns_name)
li = []
for filename in files:
df = pandas.read_csv(filename, index_col=None, header=0, names=self.data_table.columns_name)
li.append(df)
frame = pandas.concat(li, axis=0, ignore_index=True)
return frame

def to_dataframe(self, raise_errors=True):
"""Returns Pandas data frame."""

if self.data_table.columns_count == 0 or self.data_table.rows_count == 0:
# return pandas.DataFrame()
pass

frame = pandas.DataFrame(self.data_table.rows, columns=self.data_table.columns_name)

from .Kql_response_wrapper import CSV_table_reader
foldername = self.data_table.rows.foldername if isinstance(self.data_table.rows, CSV_table_reader) else None
if not foldername:
frame = pandas.DataFrame(self.data_table.rows, columns=self.data_table.columns_name)
else:
frame = self.create_frame_from_files(foldername, pandas_df=True)
for (idx, col_name) in enumerate(self.data_table.columns_name):
col_type = self.data_table.columns_type[idx].lower()
if col_type == "timespan":
Expand All @@ -203,7 +224,39 @@ def to_dataframe(self, raise_errors=True):
frame[col_name] = frame[col_name].astype(pandas_type, errors="raise" if raise_errors else "ignore")
return frame

def to_dask(self, raise_errors=True):
"""Returns Dask data frame."""
if self.data_table.columns_count == 0 or self.data_table.rows_count == 0:
pass
import dask.dataframe as dd
from .Kql_response_wrapper import CSV_table_reader
foldername = self.data_table.rows.foldername if isinstance(self.data_table.rows, CSV_table_reader) else None
if not foldername:
frame = dd.from_pandas(self.to_dataframe(),npartitions=1)
return frame
frame = self.create_frame_from_files(foldername, pandas_df=False)

for (idx, col_name) in enumerate(self.data_table.columns_name):
col_type = self.data_table.columns_type[idx].lower()
if col_type in self.KQL_TO_DATAFRAME_DATA_TYPES:
pandas_type = self.KQL_TO_DATAFRAME_DATA_TYPES[col_type]
# NA type promotion
if pandas_type == "int64" or pandas_type == "int32":
if frame[col_name].isnull().any() or frame[col_name].isna().any() or frame[col_name].isin(["nan"]).any():
pandas_type = "object"
break

elif pandas_type == "bool":
if frame[col_name].isnull().any() or frame[col_name].isna().any() or frame[col_name].isin(["nan"]).any():
pandas_type = "object"
break

frame[col_name] = frame[col_name].astype(pandas_type)

return frame



@staticmethod
def _dynamic_to_object(value):
try:
Expand Down
25 changes: 15 additions & 10 deletions azure/Kqlmagic/kql_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
from datetime import timedelta, datetime
import re
import json
from .log import logger
import ijson

from .Kql_response_wrapper import CSV_table_reader


import adal
Expand Down Expand Up @@ -50,7 +54,7 @@ class KqlResponseTable(six.Iterator):

def __init__(self, id, response_table):
self.id = id
self.rows = response_table["Rows"]
self.rows = CSV_table_reader(response_table["Rows"]) if isinstance(response_table["Rows"], str) else response_table["Rows"]
self.columns = response_table["Columns"]
self.index2column_mapping = []
self.index2type_mapping = []
Expand All @@ -59,7 +63,7 @@ def __init__(self, id, response_table):
ctype = c["ColumnType"] if "ColumnType" in c else c["DataType"]
self.index2type_mapping.append(ctype)
self.row_index = 0
self._rows_count = sum([1 for r in self.rows if isinstance(r,list)]) # len(self.rows)
self._rows_count = len(self.rows) #sum([1 for r in self.rows if isinstance(r,list)])
# Here we keep converter functions for each type that we need to take special care (e.g. convert)

# index MUST be lowercase !!!
Expand Down Expand Up @@ -194,12 +198,13 @@ def __init__(self, json_response, endpoint_version="v1"):
self.dataSetCompletion = [f for f in json_response if f["FrameType"] == "DataSetCompletion"]
else:
self.all_tables = self.json_response["Tables"]
tables_num = self.json_response["Tables"].__len__()
last_table = self.json_response["Tables"][tables_num - 1]
tables_num = self.all_tables.__len__()
last_table = self.all_tables[tables_num - 1]
if tables_num < 2:
self.tables = []
else:
self.tables = [self.json_response["Tables"][r[0]] for r in last_table["Rows"] if r[2] == "GenericResult" or r[2] == "PrimaryResult"]
rows_last_table = CSV_table_reader(last_table["Rows"]) if isinstance(last_table["Rows"], str) else last_table["Rows"]
self.tables = [self.all_tables[r[0]] for r in rows_last_table if r[2] == "GenericResult" or r[2] == "PrimaryResult"]
if len(self.tables) == 0:
self.tables = self.all_tables[:1]
self.primary_results = [KqlResponseTable(idx, t) for idx, t in enumerate(self.tables)]
Expand Down Expand Up @@ -240,7 +245,7 @@ def visualization_results(self):
value = row[value_idx]
self.visualization[row[id_idx]] = self._dynamic_to_object(value)
else:
tables_num = self.json_response["Tables"].__len__()
tables_num = self.all_tables.__len__()
if tables_num > 1:
last_table = self.json_response["Tables"][tables_num - 1]
for row in last_table["Rows"]:
Expand All @@ -266,9 +271,9 @@ def completion_query_info_results(self):
value = row[payload_idx]
return self._dynamic_to_object(value)
else:
tables_num = self.json_response["Tables"].__len__()
tables_num = self.all_tables.__len__()
if tables_num > 1:
last_table = self.json_response["Tables"][tables_num - 1]
last_table = self.all_tables[tables_num - 1]
for r in last_table["Rows"]:
if r[2] == "QueryStatus":
t = self.json_response["Tables"][r[0]]
Expand All @@ -294,9 +299,9 @@ def completion_query_resource_consumption_results(self):
value = row[payload_idx]
return self._dynamic_to_object(value)
else:
tables_num = self.json_response["Tables"].__len__()
tables_num = self.all_tables.__len__()
if tables_num > 1:
last_table = self.json_response["Tables"][tables_num - 1]
last_table = self.all_tables[tables_num - 1]
for r in last_table["Rows"]:
if r[2] == "QueryStatus":
t = self.json_response["Tables"][r[0]]
Expand Down
Loading