Skip to content

Commit

Permalink
Feature/manual fot (#332)
Browse files Browse the repository at this point in the history
  • Loading branch information
henry54809 authored Jul 21, 2021
1 parent 2ffdfc2 commit 18fd432
Show file tree
Hide file tree
Showing 37 changed files with 1,559 additions and 1,608 deletions.
7 changes: 3 additions & 4 deletions .stickler.yml
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
---
linters:
flake8:
ignore: E701
python: 3
max-line-length: 100
pep8:
ignore: E701
python: 3
max-line-length: 100
py3k:
ignore:
- 'W1618'
- 'W1619'
- 'E1601'
ignore: W1618, W1619, E1601, E701
files:
ignore:
- 'forch/proto/*'
5 changes: 5 additions & 0 deletions bin/setup_stack
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ if [[ -n $dts ]]; then
DTS_IP=$(docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' $DTS_CID)
vxlan="vxlan $DTS_IP"
sudo cp -a controller/site_config inst/forch-dts/
if [ "$DEFAULT_CLEAN_MAC_PREFIX" != "$CLEAN_MAC_PREFIX" ]; then
for f in `find inst/forch-dts/ -name "$DEFAULT_CLEAN_MAC_PREFIX*"`; do
cp -r $f `echo $f | sed -e "s/$DEFAULT_CLEAN_MAC_PREFIX/$CLEAN_MAC_PREFIX/g"`
done
fi
else
vxlan="vxlan $dts"
fi
Expand Down
2 changes: 2 additions & 0 deletions controller/bin/daq_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ device_reporting:
external_subnets:
- subnet: 192.168.1.0/24

default_timeout_sec: 120

site_path: /site
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"modules": {
"acquire": {
"timeout_sec": 120
}
}
}
1 change: 1 addition & 0 deletions etc/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ setuptools
watchdog
wheel
yq
python-dateutil==2.8.2
4 changes: 2 additions & 2 deletions etc/test_fot.out
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ testing/test_fot
Starting Forchestrator
1
1
Device 04 log count 0
171
171
276
275
100
274
9a99571e8f01: []
9a99571e8f02: []
9a99571e8f03: ['9a99571e8f03:hold:DaqException']
9a99571e8f04: []
9a99571e8f05: ['9a99571e8f05:hold:DaqException']
5 packets transmitted, 5 received, 0% packet loss, XXX
5 packets transmitted, 0 received, XX errors, 100% packet loss, XXX
2 changes: 1 addition & 1 deletion forch/base_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ def process_port_learn(self, dp_name, port, mac, vlan):
"""Process faucet port learn events"""

@abc.abstractmethod
def process_port_assign(self, mac, vlan):
def process_port_assign(self, mac, device_vlan, assigned_vlan):
"""Process faucet port vlan assignment"""
5 changes: 3 additions & 2 deletions forch/device_report_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ def process_port_learn(self, dp_name, port, mac, vlan):
self._dp_mac_map[dp_key] = mac
self._process_session_ready(mac)

def process_port_assign(self, mac, vlan):
def process_port_assign(self, mac, device_vlan, assigned_vlan):
"""Process faucet port vlan assignment"""
with self._lock:
self._mac_assigned_vlan_map[mac] = vlan
self._mac_device_vlan_map[mac] = device_vlan
self._mac_assigned_vlan_map[mac] = assigned_vlan
self._process_session_ready(mac)
18 changes: 10 additions & 8 deletions forch/faucetizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ def _update_device_dva_state(self, mac, device_placement, device_behavior):

if dva_state:
self._update_vlan_state(device_placement.switch, device_placement.port, dva_state)
return dva_state

def _update_vlans_config(self, behavioral_faucet_config):
vlans_config = behavioral_faucet_config.setdefault('vlans', {})
Expand Down Expand Up @@ -362,7 +363,12 @@ def _update_ports_config(self, behavioral_faucet_config):
if assigned_vlan:
assigned_vlans.add(assigned_vlan)

self._update_device_dva_state(mac, device_placement, device_behavior)
dva_state = self._update_device_dva_state(mac, device_placement, device_behavior)
if all((device_vlan != old_device_vlan, dva_state == DVAState.sequestered,
self._orchestration_manager)):
self._orchestration_manager.update_device_testing_vlans(mac,
device_vlan,
assigned_vlan)

self._finalize_host_ports_config(
behavioral_faucet_config, new_testing_device_vlans, list(assigned_vlans))
Expand Down Expand Up @@ -437,13 +443,9 @@ def reload_segments_to_vlans(self, file_path):

self.flush_behavioral_config()

def clear_static_placements(self):
"""Remove all static placements in memory"""
self._static_devices.ClearField('device_mac_placements')

def clear_static_behaviors(self):
"""Remove all static behaviors in memory"""
self._static_devices.ClearField('device_mac_behaviors')
def clear_static_placement(self, mac):
"""Remove static placement for devices with mac if exists"""
self._static_devices.device_mac_placements.pop(mac.lower(), None)

def flush_behavioral_config(self, force=False):
"""Generate and write behavioral config to file"""
Expand Down
28 changes: 18 additions & 10 deletions forch/file_change_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@

import hashlib
import os
from dataclasses import dataclass
from typing import Callable

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

from forch.utils import get_logger


@dataclass
class FileData:
"""Watched file data wrapper"""
content: str
hash: str
callback: Callable

class FileChangeWatcher:
"""Watch file changes in a directory"""
def __init__(self, dir_path):
Expand All @@ -29,9 +38,8 @@ def stop(self):

def register_file_callback(self, file_path, file_change_callback):
"""Register a file handler"""
file_data = self._watched_files.setdefault(file_path, {})
file_data['hash'] = self._get_file_hash(file_path)
file_data['callback'] = file_change_callback
content, _ = self._get_file_data(file_path)
self._watched_files[file_path] = FileData(content, hash, file_change_callback)

def unregister_file_callback(self, file_path):
"""Unregister the handler for a file"""
Expand All @@ -47,22 +55,22 @@ def _handle_file_change(self, file_path):
if not file_data:
return

new_hash = self._get_file_hash(file_path)
if new_hash == file_data['hash']:
new_content, new_hash = self._get_file_data(file_path)
if new_hash == file_data.hash:
return
file_data['hash'] = new_hash

self._logger.info('File "%s" changed. Executing callback', file_path)
file_data.callback(file_path, new_content, file_data.content)
file_data.hash = new_hash
file_data.content = new_content

file_data['callback'](file_path)

def _get_file_hash(self, file_path):
def _get_file_data(self, file_path):
if not os.path.exists(file_path):
return None

with open(file_path) as file:
content = file.read()
return hashlib.sha256(content.encode('utf-8')).hexdigest()
return content, hashlib.sha256(content.encode('utf-8')).hexdigest()


class FileChangeHandler(FileSystemEventHandler):
Expand Down
79 changes: 63 additions & 16 deletions forch/forchestrator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Orchestrator component for controlling a Faucet SDN"""

# pylint: disable=too-many-lines,too-many-public-methods
from datetime import datetime
import functools
import os
Expand All @@ -26,7 +27,7 @@
from forch.port_state_manager import PortStateManager
from forch.varz_state_collector import VarzStateCollector
from forch.utils import (
get_logger, proto_dict, yaml_proto, FaucetEventOrderError, MetricsFetchingError)
get_logger, proto_dict, yaml_content_proto, FaucetEventOrderError, MetricsFetchingError)

from forch.__version__ import __version__

Expand Down Expand Up @@ -273,17 +274,33 @@ def _process_static_device_placement(self):
if not placement_file_name:
return
placement_file_path = os.path.join(self._forch_config_dir, placement_file_name)
self._reload_static_device_placement(placement_file_path)
with open(placement_file_path, 'r') as fd:
content = fd.read()
self._reload_static_device_placement(placement_file_path, content)
self._config_file_watcher.register_file_callback(
placement_file_path, self._reload_static_device_placement)

def _reload_static_device_placement(self, file_path):
if self._faucetizer:
self._faucetizer.clear_static_placements()
devices_state = yaml_proto(file_path, DevicesState)
for eth_src, device_placement in devices_state.device_mac_placements.items():
def _reload_static_device_placement(self, file_path, new, current=None):
new_mac_placements = yaml_content_proto(new, DevicesState).device_mac_placements
current_macs = set()
if current:
current_mac_placements = yaml_content_proto(current, DevicesState).device_mac_placements
current_macs = set(current_mac_placements)
for eth_src, device_placement in new_mac_placements.items():
if eth_src in current_macs:
current_macs.remove(eth_src)
current_placement = current_mac_placements[current_macs]
if current_placement.SerializeToString() == device_placement.SerializeToString():
continue
if self._faucetizer:
self._faucetizer.clear_static_placement(eth_src)
self._process_device_placement(eth_src, device_placement, static=True)

# Remove macs that're deleted
if self._faucetizer:
for mac in current_macs:
self._faucetizer.clear_static_placement(mac)

def _process_static_device_behavior(self):
if self._should_ignore_static_behavior:
return
Expand All @@ -292,33 +309,52 @@ def _process_static_device_behavior(self):
if not behaviors_file_name:
return
behaviors_file_path = os.path.join(self._forch_config_dir, behaviors_file_name)
self._reload_static_device_behavior(behaviors_file_path)
with open(behaviors_file_path, 'r') as fd:
content = fd.read()
self._reload_static_device_behavior(behaviors_file_path, content)
self._config_file_watcher.register_file_callback(
behaviors_file_path, self._reload_static_device_behavior)

def _reload_static_device_behavior(self, file_path):
self._port_state_manager.clear_static_device_behaviors()

def _reload_static_device_behavior(self, file_path, new, current=None):
try:
self._logger.info('Reading static device behavior file: %s', file_path)
devices_state = yaml_proto(file_path, DevicesState)
mac_hehaviors = yaml_content_proto(new, DevicesState).device_mac_behaviors
except Exception as error:
msg = f'All auth was disabled: could not load static behavior file {file_path}'
self._logger.error('%s: %s', msg, error)
self._port_state_manager.clear_static_device_behaviors()
with self._states_lock:
self._forch_config_errors[STATIC_BEHAVIORAL_FILE] = msg
self._should_ignore_auth_result = True
return

current_macs = set()
if current:
try:
device_states = yaml_content_proto(current, DevicesState)
current_mac_hehaviors = device_states.device_mac_behaviors
current_macs = set(current_mac_hehaviors)
except Exception:
# Ignore any exceptions with the last content
pass

with self._states_lock:
self._forch_config_errors.pop(STATIC_BEHAVIORAL_FILE, None)
self._should_ignore_auth_result = False

self._logger.info('Authentication resumed')

for mac, device_behavior in devices_state.device_mac_behaviors.items():
for mac, device_behavior in mac_hehaviors.items():
if mac in current_macs:
current_macs.remove(mac)
current_behavior = current_mac_hehaviors[mac]
if current_behavior.SerializeToString() == device_behavior.SerializeToString():
continue
self._port_state_manager.handle_static_device_behavior(mac, device_behavior)

for mac in current_macs:
self._port_state_manager.clear_static_device_behavior(mac)

def _handle_device_result(self, device_result):
return self._port_state_manager.handle_testing_result(device_result)

Expand All @@ -330,6 +366,11 @@ def update_static_vlan_varz(self, mac, vlan):
if self._metrics:
self._metrics.update_var('static_mac_vlan', labels=[mac], value=vlan)

def update_device_testing_vlans(self, mac, device_vlan, assigned_vlan):
"""Updates device testing vlan in device report handler"""
if self._device_report_handler:
self._device_report_handler.process_port_assign(mac, device_vlan, assigned_vlan)

def _calculate_orchestration_config(self):
orch_config = self._config.orchestration

Expand Down Expand Up @@ -396,6 +437,9 @@ def _initialize_faucetizer(self, sequester_segment=None):
orch_config, self._structural_config_file, self._behavioral_config_file, self,
sequester_segment)

def callback_adapter(func):
return lambda file_path, new, current: func(file_path)

if orch_config.faucetize_interval_sec:
self._faucetize_scheduler = HeartbeatScheduler(orch_config.faucetize_interval_sec)

Expand All @@ -405,14 +449,17 @@ def _initialize_faucetizer(self, sequester_segment=None):
self._faucetize_scheduler.add_callback(update_write_faucet_config)
else:
self._config_file_watcher.register_file_callback(
self._structural_config_file, self._faucetizer.reload_structural_config)
self._structural_config_file,
callback_adapter(self._faucetizer.reload_structural_config))
if self._gauge_config_file:
self._config_file_watcher.register_file_callback(
self._gauge_config_file, self._faucetizer.reload_and_flush_gauge_config)
self._gauge_config_file,
callback_adapter(self._faucetizer.reload_and_flush_gauge_config))

if self._segments_vlans_file:
self._config_file_watcher.register_file_callback(
self._segments_vlans_file, self._faucetizer.reload_segments_to_vlans)
self._segments_vlans_file,
callback_adapter(self._faucetizer.reload_segments_to_vlans))

def _initialize_gauge_metrics_scheduler(self, interval_sec):
get_gauge_metrics = (
Expand Down
Loading

0 comments on commit 18fd432

Please sign in to comment.