Skip to content

Commit

Permalink
[#3] Implement system info analysis
Browse files Browse the repository at this point in the history
  • Loading branch information
romankh committed Nov 25, 2017
1 parent 9e0d2cc commit 92c909c
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 4 deletions.
2 changes: 2 additions & 0 deletions adapter/grgsm/info_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ def __init__(self, timeslot, burst_file, mode, show_gprs):
self.gsm_control_channels_decoder = grgsm.control_channels_decoder()
self.gsm_extract_cmc = grgsm.extract_cmc()
self.gsm_extract_immediate_assignment = grgsm.extract_immediate_assignment(False, not show_gprs, True)
self.gsm_extract_system_info = grgsm.extract_system_info()

self.msg_connect((self.gsm_burst_file_source, 'out'), (self.gsm_burst_timeslot_filter, 'in'))
self.msg_connect((self.gsm_burst_timeslot_filter, 'out'), (self.demapper, 'bursts'))
self.msg_connect((self.demapper, 'bursts'), (self.gsm_control_channels_decoder, 'bursts'))
self.msg_connect((self.gsm_control_channels_decoder, 'msgs'), (self.gsm_extract_cmc, 'msgs'))
self.msg_connect((self.gsm_control_channels_decoder, 'msgs'), (self.gsm_extract_immediate_assignment, 'msgs'))
self.msg_connect((self.gsm_control_channels_decoder, 'msgs'), (self.gsm_extract_system_info, 'msgs'))
26 changes: 26 additions & 0 deletions adapter/grgsm/systeminfo_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
import grgsm
from gnuradio import gr


class SystemInfoExtractor(gr.top_block):
def __init__(self, timeslot, burst_file, mode, show_gprs):
gr.top_block.__init__(self, "Top Block")

self.gsm_burst_file_source = grgsm.burst_file_source(burst_file)
self.gsm_burst_timeslot_filter = grgsm.burst_timeslot_filter(timeslot)

if mode == 'BCCH':
self.demapper = grgsm.gsm_bcch_ccch_demapper(timeslot_nr=timeslot, )
elif mode == 'BCCH_SDCCH4':
self.demapper = grgsm.gsm_bcch_ccch_sdcch4_demapper(timeslot_nr=timeslot, )
else:
self.demapper = grgsm.gsm_sdcch8_demapper(timeslot_nr=timeslot, )

self.gsm_control_channels_decoder = grgsm.control_channels_decoder()
self.gsm_extract_system_info = grgsm.extract_system_info()

self.msg_connect((self.gsm_burst_file_source, 'out'), (self.gsm_burst_timeslot_filter, 'in'))
self.msg_connect((self.gsm_burst_timeslot_filter, 'out'), (self.demapper, 'bursts'))
self.msg_connect((self.demapper, 'bursts'), (self.gsm_control_channels_decoder, 'bursts'))
self.msg_connect((self.gsm_control_channels_decoder, 'msgs'), (self.gsm_extract_system_info, 'msgs'))
72 changes: 68 additions & 4 deletions plugins/analysis_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import os

from adapter.grgsm.info_extractor import InfoExtractor
from adapter.grgsm.systeminfo_extractor import SystemInfoExtractor
from adapter.grgsm.tmsi import TmsiCapture
from core.plugin.interface import plugin, PluginBase, arg, cmd, subcmd, PluginError
from core.util.text_utils import columnize

channel_modes = ['BCCH_SDCCH4', 'SDCCH8']
channel_modes_cch = ['BCCH_SDCCH4', 'SDCCH8']
channel_modes = ['BCCH'] + channel_modes_cch


@plugin(name='Analysis Plugin',
Expand All @@ -16,7 +18,7 @@ class AnalysisPlugin(PluginBase):
def analyze(self, args):
pass

@arg("-m", action="store", dest="mode", choices=channel_modes, help="Channel mode.", default="SDCCH8")
@arg("-m", action="store", dest="mode", choices=channel_modes_cch, help="Channel mode.", default="SDCCH8")
@arg("-t", action="store", dest="timeslot", type=int, help="Timeslot of the CCCH.", default=0)
@arg("--bursts", action="store_path", dest="bursts", help="bursts.")
@subcmd(name='cipher', help='Analyze Cipher Mode Command messages in a capture.', parent="analyze")
Expand All @@ -34,7 +36,7 @@ def cipher_mode_commands(self, args):
self.printmsg("Framenumber: %s A5/%s" % (cmc_fnrs[i], cmc_a5vs[i]))

@arg("--gprs-assignments", action="store_true", dest="gprs", help="Show GPRS related immediate assignments.")
@arg("-m", action="store", dest="mode", choices=channel_modes, help="Channel mode.", default="SDCCH8")
@arg("-m", action="store", dest="mode", choices=channel_modes_cch, help="Channel mode.", default="BCCH_SDCCH4")
@arg("-t", action="store", dest="timeslot", type=int, help="Timeslot of the CCCH.", default=0)
@arg("--bursts", action="store_path", dest="bursts", help="bursts.")
@subcmd(name="immediate", help="Analyze Immediate Assignment messages in the capture file.", parent="analyze")
Expand Down Expand Up @@ -74,7 +76,7 @@ def immediate_assignments(self, args):
@arg("-v", action="store_true", dest="verbose", help="If set, the captured TMSI / IMSI are printed.")
@arg("-o", action="store", dest="dest_file",
help="If set, the captured TMSI / IMSI are stored in the specified file.")
@arg("-m", action="store", dest="mode", choices=channel_modes, help="Channel mode.", default="BCCH")
@arg("-m", action="store", dest="mode", choices=channel_modes_cch, help="Channel mode.", default="BCCH")
@arg("-t", action="store", dest="timeslot", type=int, help="Timeslot of the CCCH.", default=0)
@arg("--bursts", action="store_path", dest="bursts", help="bursts.")
@subcmd(name="tmsi", help="Output TMSIs in a capture.", parent="analyze")
Expand Down Expand Up @@ -140,3 +142,65 @@ def tmsi(self, args):
self.printmsg("{} ({} times)".format(key, imsis[key]))

os.remove("tmsicount.txt")

@arg("-m", action="store", dest="mode", choices=channel_modes, help="Channel mode.", default="SDCCH8")
@arg("-t", action="store", dest="timeslot", type=int, help="Timeslot of the CCCH.", default=0)
@arg("--bursts", action="store_path", dest="bursts", help="bursts.")
@subcmd(name="system_info", help="Print system information in the capture file.", parent="analyze")
def system_information(self, args):
extractor = SystemInfoExtractor(args.timeslot, args.bursts, 'BCCH', False)
extractor.start()
extractor.wait()

chans = extractor.gsm_extract_system_info.get_chans() # arfcn
pwrs = extractor.gsm_extract_system_info.get_pwrs()
cell_id = extractor.gsm_extract_system_info.get_cell_id()
lac = extractor.gsm_extract_system_info.get_lac()
mcc = extractor.gsm_extract_system_info.get_mcc()
mnc = extractor.gsm_extract_system_info.get_mnc()
ccch_conf = extractor.gsm_extract_system_info.get_ccch_conf() # 0 = ccch, not combined with SDCCHs

class CellInfo:
def __init__(self, arfcn, pwr, ci, lac, mcc, mnc, ccch_conf, cell_arfcns, neighbour_arfcns):
self.arfcn = arfcn
self.pwr = str(pwr) + " dbm"
self.cell_identity = ci
self.location_are_code = lac
self.mcc = mcc
self.mnc = mnc
self.ccch_conf = ccch_conf
self.cell_arfcns = cell_arfcns
self.neighbour_arfcns = neighbour_arfcns

found_cellinfos = dict()

for i in range(0, len(chans)):
current_arfcn = chans[i]
if current_arfcn not in found_cellinfos:
channel_config = 'BCCH' if ccch_conf[i] == 0 else 'BCCH_SDCCH' if ccch_conf[i] == 1 else ''
cell_arfcns = extractor.gsm_extract_system_info.get_cell_arfcns(current_arfcn)
neighbour_arfcns = extractor.gsm_extract_system_info.get_neighbours(current_arfcn)

current_cell_info = CellInfo(current_arfcn, pwrs[i], cell_id[i], lac[i],
mcc[i], mnc[i], channel_config,
cell_arfcns, neighbour_arfcns
)
found_cellinfos[current_arfcn] = current_cell_info

if len(found_cellinfos) < 1:
self.printmsg("No system information found.")
else:
strings = ["ARFCN", "PWR", "CI", "LAC", "MCC", "MNC", "CONFIG", "CELL ARFCNs", "NEIGHBOUR ARFCNs"]
for arfcn in found_cellinfos:
cell_info = found_cellinfos[arfcn]
strings.append(str(cell_info.arfcn))
strings.append(str(cell_info.pwr))
strings.append(str(cell_info.cell_identity))
strings.append(str(cell_info.location_are_code))
strings.append(str(cell_info.mcc))
strings.append(str(cell_info.mnc))
strings.append(str(cell_info.ccch_conf))
strings.append(", ".join(str(entry) for entry in cell_info.cell_arfcns))
strings.append(", ".join(str(entry) for entry in cell_info.neighbour_arfcns))

self.printmsg(columnize(strings, 9))

0 comments on commit 92c909c

Please sign in to comment.