From 92c909c16dd7f09ff590ffef66060d596ecce739 Mon Sep 17 00:00:00 2001 From: Roman Khassraf Date: Sat, 25 Nov 2017 09:00:40 +0100 Subject: [PATCH] [#3] Implement system info analysis --- adapter/grgsm/info_extractor.py | 2 + adapter/grgsm/systeminfo_extractor.py | 26 ++++++++++ plugins/analysis_plugin.py | 72 +++++++++++++++++++++++++-- 3 files changed, 96 insertions(+), 4 deletions(-) create mode 100644 adapter/grgsm/systeminfo_extractor.py diff --git a/adapter/grgsm/info_extractor.py b/adapter/grgsm/info_extractor.py index 116e5d3..c0d01a5 100644 --- a/adapter/grgsm/info_extractor.py +++ b/adapter/grgsm/info_extractor.py @@ -18,9 +18,11 @@ def __init__(self, timeslot, burst_file, mode, show_gprs): self.gsm_control_channels_decoder = grgsm.control_channels_decoder() self.gsm_extract_cmc = grgsm.extract_cmc() self.gsm_extract_immediate_assignment = grgsm.extract_immediate_assignment(False, not show_gprs, True) + self.gsm_extract_system_info = grgsm.extract_system_info() self.msg_connect((self.gsm_burst_file_source, 'out'), (self.gsm_burst_timeslot_filter, 'in')) self.msg_connect((self.gsm_burst_timeslot_filter, 'out'), (self.demapper, 'bursts')) self.msg_connect((self.demapper, 'bursts'), (self.gsm_control_channels_decoder, 'bursts')) self.msg_connect((self.gsm_control_channels_decoder, 'msgs'), (self.gsm_extract_cmc, 'msgs')) self.msg_connect((self.gsm_control_channels_decoder, 'msgs'), (self.gsm_extract_immediate_assignment, 'msgs')) + self.msg_connect((self.gsm_control_channels_decoder, 'msgs'), (self.gsm_extract_system_info, 'msgs')) diff --git a/adapter/grgsm/systeminfo_extractor.py b/adapter/grgsm/systeminfo_extractor.py new file mode 100644 index 0000000..df6f579 --- /dev/null +++ b/adapter/grgsm/systeminfo_extractor.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +import grgsm +from gnuradio import gr + + +class SystemInfoExtractor(gr.top_block): + def __init__(self, timeslot, burst_file, mode, show_gprs): + gr.top_block.__init__(self, "Top Block") + + self.gsm_burst_file_source = grgsm.burst_file_source(burst_file) + self.gsm_burst_timeslot_filter = grgsm.burst_timeslot_filter(timeslot) + + if mode == 'BCCH': + self.demapper = grgsm.gsm_bcch_ccch_demapper(timeslot_nr=timeslot, ) + elif mode == 'BCCH_SDCCH4': + self.demapper = grgsm.gsm_bcch_ccch_sdcch4_demapper(timeslot_nr=timeslot, ) + else: + self.demapper = grgsm.gsm_sdcch8_demapper(timeslot_nr=timeslot, ) + + self.gsm_control_channels_decoder = grgsm.control_channels_decoder() + self.gsm_extract_system_info = grgsm.extract_system_info() + + self.msg_connect((self.gsm_burst_file_source, 'out'), (self.gsm_burst_timeslot_filter, 'in')) + self.msg_connect((self.gsm_burst_timeslot_filter, 'out'), (self.demapper, 'bursts')) + self.msg_connect((self.demapper, 'bursts'), (self.gsm_control_channels_decoder, 'bursts')) + self.msg_connect((self.gsm_control_channels_decoder, 'msgs'), (self.gsm_extract_system_info, 'msgs')) diff --git a/plugins/analysis_plugin.py b/plugins/analysis_plugin.py index 6e4e49e..a0680bd 100755 --- a/plugins/analysis_plugin.py +++ b/plugins/analysis_plugin.py @@ -2,11 +2,13 @@ import os from adapter.grgsm.info_extractor import InfoExtractor +from adapter.grgsm.systeminfo_extractor import SystemInfoExtractor from adapter.grgsm.tmsi import TmsiCapture from core.plugin.interface import plugin, PluginBase, arg, cmd, subcmd, PluginError from core.util.text_utils import columnize -channel_modes = ['BCCH_SDCCH4', 'SDCCH8'] +channel_modes_cch = ['BCCH_SDCCH4', 'SDCCH8'] +channel_modes = ['BCCH'] + channel_modes_cch @plugin(name='Analysis Plugin', @@ -16,7 +18,7 @@ class AnalysisPlugin(PluginBase): def analyze(self, args): pass - @arg("-m", action="store", dest="mode", choices=channel_modes, help="Channel mode.", default="SDCCH8") + @arg("-m", action="store", dest="mode", choices=channel_modes_cch, help="Channel mode.", default="SDCCH8") @arg("-t", action="store", dest="timeslot", type=int, help="Timeslot of the CCCH.", default=0) @arg("--bursts", action="store_path", dest="bursts", help="bursts.") @subcmd(name='cipher', help='Analyze Cipher Mode Command messages in a capture.', parent="analyze") @@ -34,7 +36,7 @@ def cipher_mode_commands(self, args): self.printmsg("Framenumber: %s A5/%s" % (cmc_fnrs[i], cmc_a5vs[i])) @arg("--gprs-assignments", action="store_true", dest="gprs", help="Show GPRS related immediate assignments.") - @arg("-m", action="store", dest="mode", choices=channel_modes, help="Channel mode.", default="SDCCH8") + @arg("-m", action="store", dest="mode", choices=channel_modes_cch, help="Channel mode.", default="BCCH_SDCCH4") @arg("-t", action="store", dest="timeslot", type=int, help="Timeslot of the CCCH.", default=0) @arg("--bursts", action="store_path", dest="bursts", help="bursts.") @subcmd(name="immediate", help="Analyze Immediate Assignment messages in the capture file.", parent="analyze") @@ -74,7 +76,7 @@ def immediate_assignments(self, args): @arg("-v", action="store_true", dest="verbose", help="If set, the captured TMSI / IMSI are printed.") @arg("-o", action="store", dest="dest_file", help="If set, the captured TMSI / IMSI are stored in the specified file.") - @arg("-m", action="store", dest="mode", choices=channel_modes, help="Channel mode.", default="BCCH") + @arg("-m", action="store", dest="mode", choices=channel_modes_cch, help="Channel mode.", default="BCCH") @arg("-t", action="store", dest="timeslot", type=int, help="Timeslot of the CCCH.", default=0) @arg("--bursts", action="store_path", dest="bursts", help="bursts.") @subcmd(name="tmsi", help="Output TMSIs in a capture.", parent="analyze") @@ -140,3 +142,65 @@ def tmsi(self, args): self.printmsg("{} ({} times)".format(key, imsis[key])) os.remove("tmsicount.txt") + + @arg("-m", action="store", dest="mode", choices=channel_modes, help="Channel mode.", default="SDCCH8") + @arg("-t", action="store", dest="timeslot", type=int, help="Timeslot of the CCCH.", default=0) + @arg("--bursts", action="store_path", dest="bursts", help="bursts.") + @subcmd(name="system_info", help="Print system information in the capture file.", parent="analyze") + def system_information(self, args): + extractor = SystemInfoExtractor(args.timeslot, args.bursts, 'BCCH', False) + extractor.start() + extractor.wait() + + chans = extractor.gsm_extract_system_info.get_chans() # arfcn + pwrs = extractor.gsm_extract_system_info.get_pwrs() + cell_id = extractor.gsm_extract_system_info.get_cell_id() + lac = extractor.gsm_extract_system_info.get_lac() + mcc = extractor.gsm_extract_system_info.get_mcc() + mnc = extractor.gsm_extract_system_info.get_mnc() + ccch_conf = extractor.gsm_extract_system_info.get_ccch_conf() # 0 = ccch, not combined with SDCCHs + + class CellInfo: + def __init__(self, arfcn, pwr, ci, lac, mcc, mnc, ccch_conf, cell_arfcns, neighbour_arfcns): + self.arfcn = arfcn + self.pwr = str(pwr) + " dbm" + self.cell_identity = ci + self.location_are_code = lac + self.mcc = mcc + self.mnc = mnc + self.ccch_conf = ccch_conf + self.cell_arfcns = cell_arfcns + self.neighbour_arfcns = neighbour_arfcns + + found_cellinfos = dict() + + for i in range(0, len(chans)): + current_arfcn = chans[i] + if current_arfcn not in found_cellinfos: + channel_config = 'BCCH' if ccch_conf[i] == 0 else 'BCCH_SDCCH' if ccch_conf[i] == 1 else '' + cell_arfcns = extractor.gsm_extract_system_info.get_cell_arfcns(current_arfcn) + neighbour_arfcns = extractor.gsm_extract_system_info.get_neighbours(current_arfcn) + + current_cell_info = CellInfo(current_arfcn, pwrs[i], cell_id[i], lac[i], + mcc[i], mnc[i], channel_config, + cell_arfcns, neighbour_arfcns + ) + found_cellinfos[current_arfcn] = current_cell_info + + if len(found_cellinfos) < 1: + self.printmsg("No system information found.") + else: + strings = ["ARFCN", "PWR", "CI", "LAC", "MCC", "MNC", "CONFIG", "CELL ARFCNs", "NEIGHBOUR ARFCNs"] + for arfcn in found_cellinfos: + cell_info = found_cellinfos[arfcn] + strings.append(str(cell_info.arfcn)) + strings.append(str(cell_info.pwr)) + strings.append(str(cell_info.cell_identity)) + strings.append(str(cell_info.location_are_code)) + strings.append(str(cell_info.mcc)) + strings.append(str(cell_info.mnc)) + strings.append(str(cell_info.ccch_conf)) + strings.append(", ".join(str(entry) for entry in cell_info.cell_arfcns)) + strings.append(", ".join(str(entry) for entry in cell_info.neighbour_arfcns)) + + self.printmsg(columnize(strings, 9))