-
-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathbuild_model_input.py
182 lines (152 loc) · 7.78 KB
/
build_model_input.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import logging
import click
import pyspark
import numpy as np
from petastorm.codecs import CompressedNdarrayCodec, ScalarCodec
from petastorm.unischema import Unischema, UnischemaField
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
from utils import init_local_spark, feature_min_max, normalise, change_df_schema, save_parquet_for_petastorm_parquet
class FeatureComposer:
def __init__(self, spark: SparkSession, df: pyspark.sql.DataFrame):
self.spark = spark
self.df = df
self.feature_column = (
'mean_duration', 'mean_packet', 'mean_num_of_bytes', 'mean_packet_rate', 'mean_byte_rate', 'std_duration',
'std_packet', 'std_num_of_bytes', 'std_packet_rate', 'std_byte_rate', 'entropy_protocol', 'entropy_dst_ip',
'entropy_src_port', 'entropy_dst_port', 'entropy_flags', 'proportion_src_port', 'proportion_dst_port',
)
self.feature_compose_udf = udf(self.feature_compose, 'array<double>')
@staticmethod
def feature_compose(
mean_duration: float, mean_packet: float, mean_num_of_bytes: float, mean_packet_rate: float,
mean_byte_rate: float, std_duration: float, std_packet: float, std_num_of_bytes: float,
std_packet_rate: float, std_byte_rate: float, entropy_protocol: float, entropy_dst_ip: float,
entropy_src_port: float, entropy_dst_port: float, entropy_flags: float, proportion_src_port: list,
proportion_dst_port: list
) -> list:
"""
Compose the feature array
:param mean_duration: mean duration
:param mean_packet: mean packet
:param mean_num_of_bytes: mean number of bytes
:param mean_packet_rate: mean packet rate
:param mean_byte_rate: mean byte rate
:param std_duration: std duration
:param std_packet: std packet
:param std_num_of_bytes: std number of bytes
:param std_packet_rate: std packet rate
:param std_byte_rate: std byte rate
:param entropy_protocol: entropy of protocol
:param entropy_dst_ip: entropy of dest ip
:param entropy_src_port: entropy of src ip
:param entropy_dst_port: entropy of dest port
:param entropy_flags: entropy of flags
:param proportion_src_port: proportion of src common ports
:param proportion_dst_port: proportion of dest common port
:type mean_duration: float
:type mean_packet: float
:type mean_num_of_bytes: float
:type mean_packet_rate: float
:type mean_byte_rate: float
:type std_duration: float
:type std_packet: float
:type std_num_of_bytes: float
:type std_packet_rate: float
:type std_byte_rate: float
:type entropy_protocol: float
:type entropy_dst_ip: float
:type entropy_src_port: float
:type entropy_dst_port: float
:type entropy_flags: float
:type proportion_src_port: list
:type proportion_dst_port: list
:return: feature array
:rtype list
"""
# normalise
mean_duration = normalise(mean_duration, *feature_min_max.get('mean_duration'))
mean_packet = normalise(mean_packet, *feature_min_max.get('mean_packet'))
mean_num_of_bytes = normalise(mean_num_of_bytes, *feature_min_max.get('mean_num_of_bytes'))
mean_packet_rate = normalise(mean_packet_rate, *feature_min_max.get('mean_packet_rate'))
mean_byte_rate = normalise(mean_byte_rate, *feature_min_max.get('mean_byte_rate'))
std_duration = normalise(std_duration, *feature_min_max.get('std_duration'))
std_packet = normalise(std_packet, *feature_min_max.get('std_packet'))
std_num_of_bytes = normalise(std_num_of_bytes, *feature_min_max.get('std_num_of_bytes'))
std_packet_rate = normalise(std_packet_rate, *feature_min_max.get('std_packet_rate'))
std_byte_rate = normalise(std_byte_rate, *feature_min_max.get('std_byte_rate'))
entropy_protocol = normalise(entropy_protocol, *feature_min_max.get('entropy_protocol'))
entropy_dst_ip = normalise(entropy_dst_ip, *feature_min_max.get('entropy_dst_ip'))
entropy_src_port = normalise(entropy_src_port, *feature_min_max.get('entropy_src_port'))
entropy_dst_port = normalise(entropy_dst_port, *feature_min_max.get('entropy_dst_port'))
entropy_flags = normalise(entropy_flags, *feature_min_max.get('entropy_flags'))
feature_arr = [
mean_duration, mean_packet, mean_num_of_bytes, mean_packet_rate, mean_byte_rate, std_duration, std_packet,
std_num_of_bytes, std_packet_rate, std_byte_rate, entropy_protocol, entropy_dst_ip, entropy_src_port,
entropy_dst_port, entropy_flags,
]
feature_arr.extend(proportion_src_port)
feature_arr.extend(proportion_dst_port)
return feature_arr
def transform(self, remove_malicious=True, remove_null_label=True) -> pyspark.sql.DataFrame:
df = (
self.df
# compose feature
.withColumn('features', self.feature_compose_udf(*self.feature_column))
)
if remove_null_label:
df = df.filter(col('label').isNotNull())
if remove_malicious:
df = df.filter(col('label') == 'background')
# select only time_window, src_ip, feature and label columns
df = df.select(
'time_window', 'src_ip', 'features', 'label',
)
return df
@click.command()
@click.option('--train', help='path to the directory containing train feature parquet files', required=True)
@click.option('--test', help='path to the directory containing test feature parquet files', required=True)
@click.option('--target_train', help='path to the directory to persist train model input files', required=True)
@click.option('--target_test', help='path to the directory to persist test model input files', required=True)
def main(train: str, test: str, target_train: str, target_test: str):
# initialise logger
logger = logging.getLogger(__file__)
logger.addHandler(logging.StreamHandler())
logger.setLevel('INFO')
logger.info('Initialising local spark')
spark = init_local_spark()
logger.info('Preparing schema')
# petastorm schema
schema = Unischema(
'data_schema', [
UnischemaField('time_window', np.str, (), ScalarCodec(StringType()), False),
UnischemaField('src_ip', np.str, (), ScalarCodec(StringType()), False),
UnischemaField('feature', np.float32, (1, 69), CompressedNdarrayCodec(), False),
UnischemaField('label', np.str, (), ScalarCodec(StringType()), True),
]
)
# processing train
logger.info('Processing train parquet files')
logger.info('Read parquet')
train_feature_df = spark.read.parquet(train)
logger.info('Composing features...')
train_input = FeatureComposer(spark, train_feature_df).transform(remove_malicious=True, remove_null_label=True)
logger.info('Changing schema...')
train_input = change_df_schema(spark, schema, train_input)
logger.info('Persisting...')
save_parquet_for_petastorm_parquet(spark, train_input, target_train, schema)
logger.info('Train input done')
# processing test
logger.info('Processing test parquet files')
logger.info('Read parquet')
test_feature_df = spark.read.parquet(test)
logger.info('Composing features...')
test_input = FeatureComposer(spark, test_feature_df).transform(remove_malicious=False, remove_null_label=True)
logger.info('Changing schema...')
test_input = change_df_schema(spark, schema, test_input)
logger.info('Persisting...')
save_parquet_for_petastorm_parquet(spark, test_input, target_test, schema)
logger.info('Test input done')
if __name__ == '__main__':
main()