Skip to content

Commit

Permalink
Capture VPC now supports all AZs in region (#152)
Browse files Browse the repository at this point in the history
* Capture VPC now provisions nodes in each Regional AZ

Signed-off-by: Chris Helma <[email protected]>

* VPC Plans now use AZ Names, not number of AZs

Signed-off-by: Chris Helma <[email protected]>

* Bumped the versions to indicate a breaking change

* Using a specific list of AZs to provision into as a part of the
  VPCPlan is incompatible with previous versions of the Arkime
  Cluster

Signed-off-by: Chris Helma <[email protected]>

* Fixed small bug in Capture ASG CDK config

Signed-off-by: Chris Helma <[email protected]>

* Linting and test fixes

Signed-off-by: Chris Helma <[email protected]>

* A few changes to addess PR feedback

Signed-off-by: Chris Helma <[email protected]>

* Rejiggered Capture Node planning per PR discussion

Signed-off-by: Chris Helma <[email protected]>

---------

Signed-off-by: Chris Helma <[email protected]>
  • Loading branch information
chelma authored Jan 17, 2024
1 parent a1aec22 commit 1c851b5
Show file tree
Hide file tree
Showing 20 changed files with 192 additions and 111 deletions.
19 changes: 13 additions & 6 deletions cdk-lib/capture-stacks/capture-nodes-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export class CaptureNodesStack extends cdk.Stack {
loadBalancerAttributes: [
{
key: 'load_balancing.cross_zone.enabled',
value: 'true', // IMO, resilience is more important than latency here
value: 'true', // IMO, resilience & efficient utilization is more important than latency/AZ independence
}
],
});
Expand All @@ -64,6 +64,12 @@ export class CaptureNodesStack extends cdk.Stack {
targetType: 'instance',
healthCheckProtocol: 'TCP',
healthCheckPort: healthCheckPort.toString(),
targetGroupAttributes: [
{
key: 'stickiness.enabled',
value: 'true', // On by default, but let's be explicit
}
]
});

const gwlbListener = new elbv2.CfnListener(this, 'GWLBListener', {
Expand Down Expand Up @@ -125,6 +131,7 @@ export class CaptureNodesStack extends cdk.Stack {

const capacityProvider = new ecs.AsgCapacityProvider(this, 'AsgCapacityProvider', {
autoScalingGroup: autoScalingGroup,
enableManagedScaling: true,
enableManagedTerminationProtection: false
});
cluster.addAsgCapacityProvider(capacityProvider);
Expand Down Expand Up @@ -192,15 +199,15 @@ export class CaptureNodesStack extends cdk.Stack {
const service = new ecs.Ec2Service(this, 'Service', {
cluster,
taskDefinition,
desiredCount: 1,
desiredCount: props.planCluster.captureNodes.desiredCount,
minHealthyPercent: 0, // TODO: Speeds up test deployments but need to change to something safer
enableExecuteCommand: true
});

// TODO: Fix autoscaling. We need our ECS Tasks to scale together with our EC2 fleet since we are only placing
// a single container on each instance due to using the HOST network mode.
// See: https://stackoverflow.com/questions/72839842/aws-ecs-auto-scaling-an-ec2-auto-scaling-group-with-single-container-hosts
const scaling = service.autoScaleTaskCount({ maxCapacity: 10 });
const scaling = service.autoScaleTaskCount({
minCapacity: props.planCluster.captureNodes.minCount,
maxCapacity: props.planCluster.captureNodes.maxCount,
});
scaling.scaleOnCpuUtilization('CpuScaling', {
targetUtilizationPercent: 60,
});
Expand Down
2 changes: 1 addition & 1 deletion cdk-lib/capture-stacks/capture-vpc-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export class CaptureVpcStack extends Stack {

this.vpc = new ec2.Vpc(this, 'VPC', {
ipAddresses: ec2.IpAddresses.cidr(props.planCluster.captureVpc.cidr.block),
maxAzs: props.planCluster.captureVpc.numAzs,
availabilityZones: props.planCluster.captureVpc.azs,
subnetConfiguration: [
{
subnetType: ec2.SubnetType.PUBLIC,
Expand Down
7 changes: 6 additions & 1 deletion cdk-lib/capture-stacks/opensearch-domain-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export interface OpenSearchDomainStackProps extends StackProps {
}

export class OpenSearchDomainStack extends Stack {
public readonly azCount: number = 2;
public readonly domainKey: kms.Key;
public readonly domain: Domain;
public readonly osSg: ec2.SecurityGroup;
Expand Down Expand Up @@ -81,7 +82,8 @@ export class OpenSearchDomainStack extends Stack {
kmsKey: this.domainKey,
},
zoneAwareness: {
availabilityZoneCount: props.planCluster.captureVpc.numAzs,
enabled: true,
availabilityZoneCount: this.azCount,
},
logging: {
slowSearchLogEnabled: true,
Expand All @@ -90,6 +92,9 @@ export class OpenSearchDomainStack extends Stack {
},
vpc: props.captureVpc,
vpcSubnets: [{
// The AZ list should be stable as it's pulling from the beginning of a sorted list and AZs are rarely
// (never?) deprecated
availabilityZones: props.captureVpc.availabilityZones.slice(0, this.azCount),
subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS
}],
tlsSecurityPolicy: TLSSecurityPolicy.TLS_1_2,
Expand Down
1 change: 1 addition & 0 deletions cdk-lib/cloud-demo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ case 'MirrorMgmtParams':
vpcSsmParamName: params.nameVpcSsmParam,
vpceServiceId: params.idVpceService,
mirrorVni: params.idVni,
env: env,
});
break;
case 'DeployDemoTrafficParams':
Expand Down
2 changes: 1 addition & 1 deletion cdk-lib/core/context-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export interface ViewerNodesPlan {
*/
export interface VpcPlan {
cidr: Cidr;
numAzs: number;
azs: string[];
publicSubnetMask: number;
}

Expand Down
4 changes: 2 additions & 2 deletions cdk-lib/traffic-gen-sample/traffic-gen-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export class TrafficGenStack extends cdk.Stack {
// to the private subnet.
const vpc = new ec2.Vpc(this, 'VPC', {
ipAddresses: ec2.IpAddresses.cidr(props.cidr),
maxAzs: 1
maxAzs: 99, // 99 AZs means "use all AZs"
});

/**
Expand Down Expand Up @@ -77,7 +77,7 @@ export class TrafficGenStack extends cdk.Stack {
machineImage: ecs.EcsOptimizedImage.amazonLinux2(),
desiredCapacity: 3,
minCapacity: 3,
maxCapacity: 10 // Arbitrarily chosen
maxCapacity: 20 // Arbitrarily chosen
});

const ecsCluster = new ecs.Cluster(this, 'EcsCluster', {
Expand Down
2 changes: 1 addition & 1 deletion cdk-lib/viewer-stacks/viewer-vpc-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export class ViewerVpcStack extends Stack {
// The VPC the Viewer Nodes will live in
this.vpc = new ec2.Vpc(this, 'VPC', {
ipAddresses: ec2.IpAddresses.cidr(props.viewerVpcPlan.cidr.block),
maxAzs: props.viewerVpcPlan.numAzs,
availabilityZones: props.viewerVpcPlan.azs,
subnetConfiguration: [
{
subnetType: ec2.SubnetType.PUBLIC,
Expand Down
11 changes: 10 additions & 1 deletion manage_arkime/aws_interactions/ec2_interactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,13 @@ def get_vpc_details(vpc_id: str, aws_provider: AwsClientProvider) -> VpcDetails:
owner_id=vpc_details["OwnerId"],
cidr_blocks=cidr_blocks,
tenancy=vpc_details["InstanceTenancy"]
)
)

def get_azs_in_region(aws_provider: AwsClientProvider) -> List[str]:
ec2_client = aws_provider.get_ec2()

response = ec2_client.describe_availability_zones()
azs = [az["ZoneName"] for az in response["AvailabilityZones"]]
azs.sort() # Ensure stable ordering

return azs
8 changes: 4 additions & 4 deletions manage_arkime/cdk_interactions/cdk_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

import core.constants as constants
from core.capacity_planning import (CaptureNodesPlan, ViewerNodesPlan, VpcPlan, ClusterPlan, DataNodesPlan, EcsSysResourcePlan,
MasterNodesPlan, OSDomainPlan, DEFAULT_NUM_AZS, S3Plan, DEFAULT_S3_STORAGE_CLASS,
DEFAULT_VPC_CIDR, DEFAULT_CAPTURE_PUBLIC_MASK)
MasterNodesPlan, OSDomainPlan, S3Plan, DEFAULT_S3_STORAGE_CLASS, DEFAULT_VPC_CIDR,
DEFAULT_CAPTURE_PUBLIC_MASK)
from core.user_config import UserConfig

@dataclass
Expand Down Expand Up @@ -59,12 +59,12 @@ def generate_cluster_destroy_context(name: str, stack_names: ClusterStackNames,
fake_arn = "N/A"
fake_cluster_plan = ClusterPlan(
CaptureNodesPlan("m5.xlarge", 1, 2, 1),
VpcPlan(DEFAULT_VPC_CIDR, DEFAULT_NUM_AZS, DEFAULT_CAPTURE_PUBLIC_MASK),
VpcPlan(DEFAULT_VPC_CIDR, ["us-fake-1"], DEFAULT_CAPTURE_PUBLIC_MASK),
EcsSysResourcePlan(1, 1),
OSDomainPlan(DataNodesPlan(2, "t3.small.search", 100), MasterNodesPlan(3, "m6g.large.search")),
S3Plan(DEFAULT_S3_STORAGE_CLASS, 1),
ViewerNodesPlan(4, 2),
VpcPlan(DEFAULT_VPC_CIDR, DEFAULT_NUM_AZS, DEFAULT_CAPTURE_PUBLIC_MASK) if has_viewer_vpc else None,
VpcPlan(DEFAULT_VPC_CIDR, ["us-fake-1"], DEFAULT_CAPTURE_PUBLIC_MASK) if has_viewer_vpc else None,
)
fake_user_config = UserConfig(1, 1, 1, 1, 1)
fake_bucket_name = ""
Expand Down
16 changes: 10 additions & 6 deletions manage_arkime/commands/cluster_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from aws_interactions.aws_client_provider import AwsClientProvider
from aws_interactions.aws_environment import AwsEnvironment
import aws_interactions.events_interactions as events
import aws_interactions.ec2_interactions as ec2
import aws_interactions.s3_interactions as s3
import aws_interactions.ssm_operations as ssm_ops
from cdk_interactions.cdk_client import CdkClient
Expand Down Expand Up @@ -41,7 +42,7 @@ def cmd_cluster_create(profile: str, region: str, name: str, expected_traffic: f
previous_user_config = _get_previous_user_config(name, aws_provider)
next_user_config = _get_next_user_config(name, expected_traffic, spi_days, history_days, replicas, pcap_days, aws_provider)
previous_capacity_plan = _get_previous_capacity_plan(name, aws_provider)
next_capacity_plan = _get_next_capacity_plan(next_user_config, previous_capacity_plan, capture_cidr, viewer_cidr)
next_capacity_plan = _get_next_capacity_plan(next_user_config, previous_capacity_plan, capture_cidr, viewer_cidr, aws_provider)

is_initial_invocation = _is_initial_invocation(name, aws_provider)

Expand Down Expand Up @@ -201,14 +202,17 @@ def _get_previous_capacity_plan(cluster_name: str, aws_provider: AwsClientProvid
None
)

def _get_next_capacity_plan(user_config: UserConfig, previous_capacity_plan: ClusterPlan, next_capture_cidr: str, next_viewer_cidr: str) -> ClusterPlan:
capture_plan = get_capture_node_capacity_plan(user_config.expectedTraffic)
capture_vpc_plan = get_capture_vpc_plan(previous_capacity_plan.captureVpc, next_capture_cidr)
os_domain_plan = get_os_domain_plan(user_config.expectedTraffic, user_config.spiDays, user_config.replicas, capture_vpc_plan.numAzs)
def _get_next_capacity_plan(user_config: UserConfig, previous_capacity_plan: ClusterPlan, next_capture_cidr: str,
next_viewer_cidr: str, aws_provider: AwsClientProvider) -> ClusterPlan:
az_in_region = ec2.get_azs_in_region(aws_provider)

capture_plan = get_capture_node_capacity_plan(user_config.expectedTraffic, az_in_region)
capture_vpc_plan = get_capture_vpc_plan(previous_capacity_plan.captureVpc, next_capture_cidr, az_in_region)
os_domain_plan = get_os_domain_plan(user_config.expectedTraffic, user_config.spiDays, user_config.replicas, len(capture_vpc_plan.azs))
ecs_resource_plan = get_ecs_sys_resource_plan(capture_plan.instanceType)
s3_plan = S3Plan(DEFAULT_S3_STORAGE_CLASS, user_config.pcapDays)
viewer_plan = get_viewer_node_capacity_plan(user_config.expectedTraffic)
viewer_vpc_plan = get_viewer_vpc_plan(previous_capacity_plan.viewerVpc, next_viewer_cidr)
viewer_vpc_plan = get_viewer_vpc_plan(previous_capacity_plan.viewerVpc, next_viewer_cidr, az_in_region)

return ClusterPlan(capture_plan, capture_vpc_plan, ecs_resource_plan, os_domain_plan, s3_plan, viewer_plan, viewer_vpc_plan)

Expand Down
2 changes: 1 addition & 1 deletion manage_arkime/commands/vpc_add.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def cmd_vpc_add(profile: str, region: str, cluster_name: str, vpc_id: str, user_
vpc_acct_provider = aws_provider
if not association:
cluster_acct_provider = vpc_acct_provider = aws_provider

vpc_aws_env = vpc_acct_provider.get_aws_env()
cdk_client = CdkClient(vpc_aws_env)
vni_provider = SsmVniProvider(cluster_name, cluster_acct_provider)
Expand Down
59 changes: 33 additions & 26 deletions manage_arkime/core/capacity_planning.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import logging
import re
import sys
from typing import Dict, Type, TypeVar
from typing import Dict, List, Type, TypeVar


logger = logging.getLogger(__name__)

MAX_TRAFFIC = 100 # Gbps, scaling limit of a single User Subnet VPC Endpoint
MINIMUM_NODES = 1 # We'll always have at least one capture node
MINIMUM_TRAFFIC = 0.01 # Gbps; arbitrarily chosen, but will yield a minimal cluster
CAPACITY_BUFFER_FACTOR = 1.25 # Arbitrarily chosen
MASTER_NODE_COUNT = 3 # Recommended number in docs
Expand All @@ -21,18 +20,21 @@
@dataclass
class CaptureInstance:
instanceType: str
maxTraffic: float
trafficPer: float
trafficPer: float # The traffic in Gbps each instance of this type can handle
maxTraffic: float # The max traffic in Gbps a cluster of this type should handle
minNodes: int # The minimum number of nodes we should have of this type
ecsCPU: int
ecsMemory: int

# These are the possible instances types we assign for capture nodes based on maxTraffic
# These are the possible instances types we assign for capture nodes
T3_MEDIUM = CaptureInstance("t3.medium", 0.25, 2 * 0.25, 1, 1536, 3072)
M5_XLARGE = CaptureInstance("m5.xlarge", 2.0, MAX_TRAFFIC, 2, 3584, 15360)

CAPTURE_INSTANCES = [
CaptureInstance("t3.medium", 0.5, 0.25, 1536, 3072),
CaptureInstance("m5.xlarge", MAX_TRAFFIC, 2.0, 3584, 15360)
T3_MEDIUM,
M5_XLARGE
]


@dataclass
class MasterInstance:
instanceType: str
Expand Down Expand Up @@ -95,27 +97,32 @@ def to_dict(self) -> Dict[str, any]:
"minCount": self.minCount,
}

def get_capture_node_capacity_plan(expected_traffic: float) -> CaptureNodesPlan:
def get_capture_node_capacity_plan(expected_traffic: float, azs: List[str]) -> CaptureNodesPlan:
"""
Creates a capacity plan for the indicated traffic load.
expected_traffic: The expected traffic volume for the Arkime cluster, in Gigabits Per Second (Gbps)
azs: The AZs to deploy the cluster into
"""

if not expected_traffic or expected_traffic < MINIMUM_TRAFFIC:
expected_traffic = MINIMUM_TRAFFIC

if expected_traffic > MAX_TRAFFIC:
raise TooMuchTraffic(expected_traffic)

chosen_instance = next(instance for instance in CAPTURE_INSTANCES if expected_traffic <= instance.maxTraffic)

desired_instances = math.ceil(expected_traffic/chosen_instance.trafficPer)

desired_instances = max(
chosen_instance.minNodes,
math.ceil(expected_traffic/chosen_instance.trafficPer)
)

return CaptureNodesPlan(
chosen_instance.instanceType,
desired_instances,
math.ceil(desired_instances * CAPACITY_BUFFER_FACTOR),
MINIMUM_NODES
chosen_instance.minNodes
)

@dataclass
Expand Down Expand Up @@ -397,22 +404,22 @@ def to_dict(self) -> Dict[str, any]:
T_VpcPlan = TypeVar('T_VpcPlan', bound='VpcPlan')

@dataclass
class VpcPlan:
class VpcPlan:
cidr: Cidr
numAzs: int
azs: List[str]
publicSubnetMask: int

def __eq__(self, other) -> bool:
if other is None:
return True if self is None else False

return (self.cidr == other.cidr and self.numAzs == other.numAzs
return (self.cidr == other.cidr and self.azs == other.azs
and self.publicSubnetMask == other.publicSubnetMask)

def to_dict(self) -> Dict[str, any]:
return {
"cidr": self.cidr.to_dict(),
"numAzs": self.numAzs,
"azs": self.azs,
"publicSubnetMask": self.publicSubnetMask,
}

Expand All @@ -422,34 +429,34 @@ def from_dict(cls: Type[T_VpcPlan], input: Dict[str, any]) -> T_VpcPlan:
return None

cidr = Cidr(input['cidr']['block'])
numAzs = input["numAzs"]
azs = input["azs"]
publicSubnetMask = input["publicSubnetMask"]

return cls(cidr, numAzs, publicSubnetMask)
return cls(cidr, azs, publicSubnetMask)

def get_usable_ips(self) -> int:
total_ips = 2 ** (32 - int(self.cidr.mask))
public_ips = 2 ** (32 - int(self.publicSubnetMask)) * self.numAzs
public_ips = 2 ** (32 - int(self.publicSubnetMask)) * len(self.azs)
reserved_ips_per_subnet = 2 # The first (local gateway) and last (broadcast) IP are often reserved
reserved_private_ips = reserved_ips_per_subnet * self.numAzs
reserved_private_ips = reserved_ips_per_subnet * len(self.azs)

return total_ips - public_ips - reserved_private_ips

def get_capture_vpc_plan(previous_plan: VpcPlan, capture_cidr_block: str) -> VpcPlan:
def get_capture_vpc_plan(previous_plan: VpcPlan, capture_cidr_block: str, azs: List[str]) -> VpcPlan:
if previous_plan and all(value is not None for value in vars(previous_plan).values()):
return previous_plan
elif not capture_cidr_block:
return VpcPlan(DEFAULT_VPC_CIDR, DEFAULT_NUM_AZS, DEFAULT_CAPTURE_PUBLIC_MASK)
return VpcPlan(DEFAULT_VPC_CIDR, azs, DEFAULT_CAPTURE_PUBLIC_MASK)
else:
return VpcPlan(Cidr(capture_cidr_block), DEFAULT_NUM_AZS, DEFAULT_CAPTURE_PUBLIC_MASK)
return VpcPlan(Cidr(capture_cidr_block), azs, DEFAULT_CAPTURE_PUBLIC_MASK)

def get_viewer_vpc_plan(previous_plan: VpcPlan, viewer_cidr_block: str) -> VpcPlan:
def get_viewer_vpc_plan(previous_plan: VpcPlan, viewer_cidr_block: str, azs: List[str]) -> VpcPlan:
if previous_plan and all(value is not None for value in vars(previous_plan).values()):
return previous_plan
elif not viewer_cidr_block:
return None
else:
return VpcPlan(Cidr(viewer_cidr_block), DEFAULT_NUM_AZS, DEFAULT_VIEWER_PUBLIC_MASK)
return VpcPlan(Cidr(viewer_cidr_block), azs[0:DEFAULT_NUM_AZS], DEFAULT_VIEWER_PUBLIC_MASK)

DEFAULT_S3_STORAGE_CLASS = "STANDARD"
DEFAULT_S3_STORAGE_DAYS = 30
Expand Down
2 changes: 1 addition & 1 deletion manage_arkime/core/versioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"""
Manually updated/managed version number. Increment if/when a backwards incompatible change is made.
"""
AWS_AIO_VERSION=1
AWS_AIO_VERSION=2

class CouldntReadSourceVersion(Exception):
def __init__(self):
Expand Down
2 changes: 1 addition & 1 deletion manage_arkime/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setuptools.setup(
name="manage_arkime",
version="0.1.1",
version="1.0.0",
description=("Tooling and configuration to install/manage Arkime Clusters in an AWS account"),
author="Chris Helma",
package_dir={"": "."},
Expand Down
Loading

0 comments on commit 1c851b5

Please sign in to comment.