Skip to content

Commit

Permalink
multi vxlan (#337)
Browse files Browse the repository at this point in the history
  • Loading branch information
henry54809 authored Aug 27, 2021
1 parent a0dc6f8 commit 7d236c2
Show file tree
Hide file tree
Showing 17 changed files with 499 additions and 62 deletions.
1 change: 1 addition & 0 deletions bin/build_proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ done
$proto_files2
sed -i'' -e 's/id="\./id="/' -e 's/href="#\./href="#/' protos.html
sed -i'' -e 's/&lt;/</g' -e 's/&gt;/>/g' protos.html
$PYTHON -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. forch/proto/endpoint_server.proto
)

mkdir -p forch/proto
Expand Down
1 change: 0 additions & 1 deletion bin/setup_stack
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ add_faux corp 1 0 dnsmasq

if [ -n "$vxlan" ]; then
echo Configuring for vxlan and endpoint handler server
add_vxlan t1sw1 4
if [[ -z $skip_conn_check ]]; then
echo Connection check incompatible with vxlan: will wait forever.
false
Expand Down
7 changes: 0 additions & 7 deletions bin/stack_functions
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,6 @@ function add_iface {
sudo ifconfig $iface up
}

function add_vxlan {
br=$1
pt=$2
if=vxlan
sudo ovs-vsctl add-port $br $if -- set interface $if ofport_request=$pt
}

function add_oeth {
br_a=$1
pt_a=$2
Expand Down
4 changes: 3 additions & 1 deletion controller/bin/controller_go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ FORCH_CONFIG=$FORCH_CONFIG_DIR/forch.yaml
export PYTHONPATH=$FORCH_ROOT:$DAQ_ROOT:$FAUCET_ROOT:$DAQ_ROOT/mininet

DAQ_BASE=bin/daq_config.yaml
DAQ_OVERRIDE=
DAQ_CONFIG=startup.yaml
echo site_description: generated > $DAQ_CONFIG

Expand Down Expand Up @@ -76,6 +77,7 @@ if [[ -n $VXLAN_IP ]]; then
yq -i -y .orchestration.sequester_config.service_address=\"$DTS_IP\" $FORCH_CONFIG
yq -i -y .switch_setup.endpoint.ip=\"$docker_ip\" $DAQ_CONFIG
else
DAQ_OVERRIDE=../bin/daq_config_override.yaml
echo Waiting for data0...
while ! ip link show data0; do
sleep 2
Expand All @@ -99,7 +101,7 @@ if [[ -z $DTS_IP || $DTS_IP == localhost ]]; then
PYTHONPATH=$PYTHONPATH \
PATH=$DAQ_ROOT/mininet:$PATH \
FAUCET_EVENT_SOCK=$DAQ_EVENT_SOCK \
daq/cmd/start ../$DAQ_BASE ../$DAQ_CONFIG -k $no_test > daq/inst/cmdrun.log 2>&1 &
daq/cmd/start ../$DAQ_BASE $DAQ_OVERRIDE ../$DAQ_CONFIG -k $no_test > daq/inst/cmdrun.log 2>&1 &
fi

echo Blocking forever...
Expand Down
2 changes: 0 additions & 2 deletions controller/bin/daq_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ include: ${DAQ_LIB}/config/system/default.yaml
base_conf: ${DAQ_LIB}/resources/setups/orchestration/base_config.json

switch_setup:
data_intf: data0
lo_port: 3984
varz_port: 5678
model: EXT_STACK
native: True
of_dpid: '0'

run_trigger:
vlan_start: 272
Expand Down
3 changes: 3 additions & 0 deletions controller/bin/daq_config_override.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
switch_setup:
data_intf: data0
of_dpid: '0'
2 changes: 1 addition & 1 deletion etc/DAQ_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.10.26
1.10.28
2 changes: 1 addition & 1 deletion etc/start_faux
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ else
fi

if [ -n "${options[ping]}" ]; then
ping -c 10 $GATEWAY &
ping -c 30 $GATEWAY &
fi

ip addr
Expand Down
12 changes: 6 additions & 6 deletions forch/device_report_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import threading
import grpc

import forch.endpoint_handler as endpoint_handler

from forch.proto.shared_constants_pb2 import PortBehavior
from forch.proto.devices_state_pb2 import DevicesState
from forch.base_classes import DeviceStateReporter
Expand All @@ -28,11 +26,11 @@
DEFAULT_SERVER_ADDRESS = '127.0.0.1'
CONNECT_TIMEOUT_SEC = 60


# pylint: disable=too-many-arguments
class DeviceReportClient(DeviceStateReporter):
"""gRPC client to send device result"""

def __init__(self, result_handler, target, unauth_vlan, tunnel_ip):
def __init__(self, result_handler, target, unauth_vlan, tunnel_ip, endpoint_handler=None):
self._logger = get_logger('devreport')
self._logger.info('Initializing with unauthenticated vlan %s', unauth_vlan)
self._logger.info('Using target %s, proto %s', target, bool(PORT_BEHAVIOR_SESSION_RESULT))
Expand All @@ -46,7 +44,7 @@ def __init__(self, result_handler, target, unauth_vlan, tunnel_ip):
self._lock = threading.Lock()
self._result_handler = result_handler
self._tunnel_ip = tunnel_ip
self._endpoint_handler = endpoint_handler.EndpointHandler(tunnel_ip) if tunnel_ip else None
self._endpoint_handler = endpoint_handler

def start(self):
"""Start the client handler"""
Expand Down Expand Up @@ -74,6 +72,8 @@ def disconnect(self, mac):
if session:
session.cancel()
self._mac_sessions.pop(mac)
if self._endpoint_handler:
self._endpoint_handler.free_endpoint(mac)
self._logger.info('Device %s disconnected', mac)
else:
self._logger.warning('Attempt to disconnect unconnected device %s', mac)
Expand All @@ -96,7 +96,7 @@ def _convert_and_handle(self, mac, progress):
self._logger.info('Device report %s endpoint %s (handler=%s)',
mac, endpoint_ip, bool(self._endpoint_handler))
if self._endpoint_handler:
self._endpoint_handler.process_endpoint(progress.endpoint)
self._endpoint_handler.process_endpoint(progress.endpoint, mac)
return False

def _process_progress(self, mac, session):
Expand Down
145 changes: 106 additions & 39 deletions forch/endpoint_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,54 +13,109 @@
import logging
import subprocess
import time
import threading
import yaml
import grpc

from forch.utils import get_logger

try:
import daq.proto.session_server_pb2_grpc as server_grpc
from daq.proto.session_server_pb2 import SessionParams, SessionProgress
SessionServerServicer = server_grpc.SessionServerServicer
PROTO_LOADED = True
except ImportError:
PROTO_LOADED = False

class SessionServerServicer:
"""Dummy class for weak import"""
import forch.proto.endpoint_server_pb2_grpc as server_grpc
from forch.proto.endpoint_server_pb2 import Endpoint
from forch.proto.shared_constants_pb2 import Empty

DEFAULT_SERVER_PORT = 50111
DEFAULT_BIND_ADDRESS = '0.0.0.0'
DEFAULT_VXLAN_PORT = 4789
DEFAULT_VXLAN_VNI = 0

BASE_T1SW_PORT = 29
VXLAN_CMD_FMT = 'ip link add %s type vxlan id %s remote %s dstport %s srcport %s %s nolearning'

# TODO: This switch value should be configurable
T1_SW1 = 'nz-kiwi-t1sw1'
CONNECT_TIMEOUT_SEC = 60

# 'TAP' description is needed by forch to append test vlans.
TAP_PORT_CONFIG = {
'description': 'TAP',
'tagged_vlans': [171]
}

class EndpointHandler:
"""Class to handle endpoint updates"""

def __init__(self, target_ip):
def __init__(self, target_ip, structural_config_file):
self._logger = get_logger('endpproxy')
self._lock = threading.RLock()
self._mac_tap_port = {}
self._freed_tap_ports = set()
self._next_tap_port = BASE_T1SW_PORT
self._structural_config_file = structural_config_file
server_port = DEFAULT_SERVER_PORT
address = f'{target_ip}:{server_port}'
self._logger.info('Proxy requests to %s, proto %s', address, PROTO_LOADED)
self._logger.info('Proxy requests to %s', address)
channel = grpc.insecure_channel(address)
self._stub = server_grpc.SessionServerStub(channel)
self._stub = server_grpc.EndpointServerStub(channel)
grpc.channel_ready_future(channel).result(timeout=CONNECT_TIMEOUT_SEC)

def process_endpoint(self, endpoint):
def _allocate_tap_port(self, mac):
with self._lock:
if mac in self._mac_tap_port:
return self._mac_tap_port[mac]
if self._freed_tap_ports:
tap_port = max(self._freed_tap_ports)
self._freed_tap_ports.remove(tap_port)
else:
tap_port = self._next_tap_port
self._next_tap_port += 1
self._mac_tap_port[mac] = tap_port
with open(self._structural_config_file, 'r') as file:
structural_config = yaml.safe_load(file)
structural_config['dps'][T1_SW1]['interfaces'][tap_port] = TAP_PORT_CONFIG
with open(self._structural_config_file, 'w') as file:
yaml.dump(structural_config, file)
return tap_port

def process_endpoint(self, endpoint, mac):
"""Handle an endpoint request"""
self._logger.info('Process request for %s', endpoint.ip)
session_params = SessionParams()
session_params.endpoint.CopyFrom(endpoint)
for progress in self._stub.StartSession(session_params):
self._logger.info('Recevied progress %s', progress)
t1sw_port = self._allocate_tap_port(mac)
session_endpoint = Endpoint()
session_endpoint.ip = endpoint.ip
session_endpoint.port = endpoint.port
session_endpoint.vni = endpoint.vni
session_endpoint.tap_port = t1sw_port

self._stub.ConfigureInterface(session_endpoint)
self._logger.info('Done with proxy request')


class EndpointServicer(SessionServerServicer):
def _deallocate_tap_port(self, freed_port):
with self._lock:
self._freed_tap_ports.add(freed_port)
with open(self._structural_config_file, 'r') as file:
structural_config = yaml.safe_load(file)
self._logger.info(structural_config)
structural_config['dps'][T1_SW1]['interfaces'].pop(freed_port, None)
with open(self._structural_config_file, 'w') as file:
yaml.dump(structural_config, file)

def free_endpoint(self, mac: str):
"""Cleanup endpoint resources."""
with self._lock:
self._logger.info('Process request to free endpoint for %s', mac)
freed_port = self._mac_tap_port.pop(mac, None)
if not freed_port:
return
self._deallocate_tap_port(freed_port)
# Move next_tap_port counter forward if possible
port_range = list(range(freed_port, self._next_tap_port))
all_free = all((port in self._freed_tap_ports for port in port_range))
if all_free:
self._next_tap_port = freed_port
self._freed_tap_ports -= set(port_range)
session_endpoint = Endpoint()
session_endpoint.tap_port = freed_port
self._stub.CleanupInterface(session_endpoint)
self._logger.info('Done with proxy request')


class EndpointServicer(server_grpc.EndpointServerServicer):
"""gRPC servicer to receive devices state"""

def __init__(self):
Expand All @@ -69,6 +124,12 @@ def __init__(self):
result = self._exec('sudo ovs-vsctl show')
self._logger.info('result: %s', result)

def _exec_no_raise(self, cmd):
try:
self._exec(cmd)
except Exception as e:
self._logger.info('Ignoring exception: %s', str(e))

def _exec(self, cmd):
self._logger.info('executing: %s', cmd)
cmd_args = cmd.split(' ')
Expand All @@ -79,26 +140,32 @@ def _exec(self, cmd):
raise Exception('Failed subshell execution')
return process.stdout.decode('utf-8')

def _session_stream(self, request):
yield SessionProgress()
def _remove_interface(self, interface):
self._logger.info('Removing vxlan interface %s', interface)
self._exec_no_raise('sudo ip link set %s down' % interface)
self._exec_no_raise('sudo ip link del %s' % interface)
self._exec_no_raise('sudo ovs-vsctl del-port t1sw1 %s' % interface)

# pylint: disable=invalid-name
def StartSession(self, request, context):
def ConfigureInterface(self, request, context):
"""Start a session servicer"""
endpoint = request.endpoint
self._logger.info('Redirect tunnel to %s', endpoint.ip)
try:
self._exec('sudo ip link set vxlan down')
self._exec('sudo ip link del vxlan')
except Exception as e:
self._logger.info('Ignoring exception: %s', str(e))

cmd = VXLAN_CMD_FMT % ('vxlan', DEFAULT_VXLAN_VNI, endpoint.ip,
interface = "vxlan%s" % request.tap_port
self._remove_interface(interface)
self._logger.info('Adding vxlan tunnel to %s', request.ip)
self._exec('sudo ovs-vsctl add-port t1sw1 %s -- set interface %s ofport_request=%s' % (
interface, interface, request.tap_port
))
cmd = VXLAN_CMD_FMT % (interface, request.vni, request.ip,
DEFAULT_VXLAN_PORT, DEFAULT_VXLAN_PORT, DEFAULT_VXLAN_PORT)
self._exec('sudo ' + cmd)
self._exec('sudo ip link set vxlan up')
return self._session_stream(request)
self._exec('sudo ip link set %s up' % interface)
return Empty()

def CleanupInterface(self, request, context):
"""Start a session servicer"""
interface = "vxlan%s" % request.tap_port
self._remove_interface(interface)
return Empty()

class EndpointServer:
"""Endpoint configuration server"""
Expand All @@ -108,7 +175,7 @@ def __init__(self, server_address=DEFAULT_BIND_ADDRESS, server_port=DEFAULT_SERV
self._server = grpc.server(futures.ThreadPoolExecutor())
self._servicer = EndpointServicer()

server_grpc.add_SessionServerServicer_to_server(self._servicer, self._server)
server_grpc.add_EndpointServerServicer_to_server(self._servicer, self._server)

self._address = f'{server_address}:{server_port}'
self._logger.info('Listening on %s', self._address)
Expand Down
8 changes: 5 additions & 3 deletions forch/faucetizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,11 @@ def reload_structural_config(self, structural_config_file=None):
"""Reload structural config from file"""
structural_config_file = structural_config_file or self._structural_config_file
self._logger.info('Reading structural config file: %s', structural_config_file)
with open(structural_config_file) as file:
structural_config = yaml.safe_load(file)
self._process_structural_config(structural_config)
with self._lock:
with open(structural_config_file) as file:
structural_config = yaml.safe_load(file)
if structural_config:
self._process_structural_config(structural_config)

def reload_and_flush_gauge_config(self, gauge_config_file):
"""Reload gauge config file and rewrite to faucet config directory"""
Expand Down
6 changes: 5 additions & 1 deletion forch/forchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from forch.authenticator import Authenticator
from forch.cpn_state_collector import CPNStateCollector
from forch.device_report_client import DeviceReportClient
from forch.endpoint_handler import EndpointHandler
from forch.file_change_watcher import FileChangeWatcher
from forch.faucet_state_collector import FaucetStateCollector
from forch.forch_metrics import ForchMetrics, VarzUpdater
Expand Down Expand Up @@ -257,8 +258,11 @@ def _create_device_report_handler(self):
tunnel_ip = self._config.orchestration.sequester_config.tunnel_ip
self._logger.info('Connecting report client to %s, local %s, vlan %s',
service_target, tunnel_ip, unauth_vlan)
endpoint_handler = None
if tunnel_ip:
endpoint_handler = EndpointHandler(tunnel_ip, self._structural_config_file)
return DeviceReportClient(self._handle_device_result, service_target,
unauth_vlan, tunnel_ip)
unauth_vlan, tunnel_ip, endpoint_handler=endpoint_handler)

def _attempt_authenticator_initialise(self):
orch_config = self._config.orchestration
Expand Down
Loading

0 comments on commit 7d236c2

Please sign in to comment.