Skip to content

Commit

Permalink
Additional tweaks to the Elasticsearch model, moving search nodes int…
Browse files Browse the repository at this point in the history
…o a dedicated sub-model
  • Loading branch information
gndcshv committed Oct 26, 2023
1 parent f819b9d commit 395a602
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 79 deletions.
2 changes: 2 additions & 0 deletions service_capacity_modeling/models/org/netflix/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .elasticsearch import nflx_elasticsearch_capacity_model
from .elasticsearch import nflx_elasticsearch_data_capacity_model
from .elasticsearch import nflx_elasticsearch_master_capacity_model
from .elasticsearch import nflx_elasticsearch_search_capacity_model
from .entity import nflx_entity_capacity_model
from .evcache import nflx_evcache_capacity_model
from .kafka import nflx_kafka_capacity_model
Expand All @@ -30,6 +31,7 @@ def models():
"org.netflix.elasticsearch": nflx_elasticsearch_capacity_model,
"org.netflix.elasticsearch.node": nflx_elasticsearch_data_capacity_model,
"org.netflix.elasticsearch.master": nflx_elasticsearch_master_capacity_model,
"org.netflix.elasticsearch.search": nflx_elasticsearch_search_capacity_model,
"org.netflix.entity": nflx_entity_capacity_model,
"org.netflix.cockroachdb": nflx_cockroachdb_capacity_model,
"org.netflix.aurora": nflx_aurora_capacity_model,
Expand Down
150 changes: 72 additions & 78 deletions service_capacity_modeling/models/org/netflix/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def _upsert_params(cluster, params):
class NflxElasticsearchArguments(BaseModel):
copies_per_region: int = Field(
default=3,
description="How many copies of the data will exist e.g. RF=3. If unsupplied"
description="How many copies of the data will exist e.g. RF=3. If not supplied"
" this will be deduced from durability and consistency desires",
)
max_regional_size: int = Field(
Expand All @@ -163,8 +163,8 @@ class NflxElasticsearchArguments(BaseModel):
description="What is the maximum size of a cluster in this region",
)
max_local_disk_gib: int = Field(
# Nodes larger than 4 TiB are painful to recover
default=4096,
# Nodes larger than 8 TiB are painful to recover
default=8192,
description="The maximum amount of data we store per machine",
)
max_rps_to_disk: int = Field(
Expand All @@ -183,15 +183,13 @@ def capacity_plan(
extra_model_arguments: Dict[str, Any],
) -> Optional[CapacityPlan]:

# (FIXME): Need elasticsearch input
# TODO: Use durability requirements to compute RF.
copies_per_region: int = _target_rf(
desires, extra_model_arguments.get("copies_per_region", None)
)
max_regional_size: int = extra_model_arguments.get("max_regional_size", 240)
max_regional_size: int = extra_model_arguments.get("max_regional_size", 120)
max_rps_to_disk: int = extra_model_arguments.get("max_rps_to_disk", 1000)
# Very large nodes are hard to recover
max_local_disk_gib: int = extra_model_arguments.get("max_local_disk_gib", 5000)
max_local_disk_gib: int = extra_model_arguments.get("max_local_disk_gib", 8192)

# the ratio of traffic that should be handled by search nodes.
# 0.0 = no search nodes, all searches handled by data nodes
Expand All @@ -213,7 +211,6 @@ def capacity_plan(
_rps = desires.query_pattern.estimated_read_per_second.mid // zones_in_region

data_rps = _rps / (search_to_data_rps_ratio + 1)
search_rps = search_to_data_rps_ratio * data_rps

# Based on the disk latency and the read latency SLOs we adjust our
# working set to keep more or less data in RAM. Faster drives need
Expand Down Expand Up @@ -278,12 +275,11 @@ def capacity_plan(
# merging can make progress as long as there is some headroom
required_disk_space=lambda x: x * 1.33,
max_local_disk_gib=max_local_disk_gib,
# elasticsearch clusters can autobalance via shard placement
# Elasticsearch clusters can auto-balance via shard placement
cluster_size=lambda x: x,
min_count=1,
# Sidecars/System takes away memory from Elasticsearch
# Elasticsearch uses half of available system max of 32 for compressed
# oops
# which uses half of available system max of 32 for compressed oops
reserve_memory=lambda x: base_mem + max(32, x / 2),
core_reference_ghz=data_requirement.core_reference_ghz,
)
Expand All @@ -293,74 +289,28 @@ def capacity_plan(
params = {"elasticsearch.copies": copies_per_region}
_upsert_params(data_cluster, params)

# elasticsearch clusters generally should try to stay under some total number
# Elasticsearch clusters generally should try to stay under some total number
# of nodes. Orgs do this for all kinds of reasons such as
# * Security group limits. Since you must have < 500 rules if you're
# ingressing public ips)
# * Maintenance. If your restart script does one node at a time you want
# smaller clusters so your restarts don't take months.
# * NxN network issues. Sometimes smaller clusters of bigger nodes
# are better for network propagation
# * Maintenance. If your restart script does one node at a time you want
# smaller clusters so your restarts don't take months.
# * NxN network issues. Sometimes smaller clusters of bigger nodes
# are better for network propagation
if data_cluster.count > (max_regional_size // zones_in_region):
return None
data_requirements = [data_requirement] * zones_in_region
data_clusters = [data_cluster] * zones_in_region

ec2_costs = {
"elasticsearch-data.zonal-clusters": zones_in_region
* data_cluster.annual_cost
}

if search_rps > 0:
search_requirement = _estimate_elasticsearch_requirement(
node_type="search",
instance=instance,
desires=desires,
working_set=0.0,
reads_per_second=search_rps,
zones_in_region=zones_in_region,
copies_per_region=0,
max_rps_to_disk=1, # to avoid divide by zero
)
search_cluster = compute_stateful_zone(
instance=instance,
drive=drive,
needed_cores=int(search_requirement.cpu_cores.mid),
needed_disk_gib=int(search_requirement.disk_gib.mid),
needed_memory_gib=int(search_requirement.mem_gib.mid),
needed_network_mbps=search_requirement.network_mbps.mid,
max_local_disk_gib=0,
# Elasticsearch clusters can autobalance via shard placement
cluster_size=lambda x: x,
min_count=1,
# Sidecars/System takes away memory from Elasticsearch
# Elasticsearch uses half of available system max of 32 for compressed
# oops
reserve_memory=lambda x: base_mem + max(32, x / 2),
core_reference_ghz=search_requirement.core_reference_ghz,
)
search_cluster.cluster_type = "elasticsearch-search"

if search_cluster.count > (max_regional_size // zones_in_region):
return None
ec2_costs["elasticsearch-search.zonal-clusters"] = (
zones_in_region * search_cluster.annual_cost
)

search_requirements = [search_requirement] * zones_in_region
search_clusters = [search_cluster] * zones_in_region
else:
search_requirements = []
search_clusters = []

clusters = Clusters(
annual_costs=ec2_costs,
zonal=data_clusters + search_clusters,
zonal=[data_cluster] * zones_in_region,
regional=[],
)

plan = CapacityPlan(
requirements=Requirements(zonal=data_requirements + search_requirements),
requirements=Requirements(zonal=[data_requirement] * zones_in_region),
candidate_clusters=clusters,
)

Expand All @@ -376,7 +326,7 @@ def capacity_plan(
desires: CapacityDesires,
extra_model_arguments: Dict[str, Any],
) -> Optional[CapacityPlan]:
# only accept running on instances with a lot of RAM and a few CPUs
# Only accept running on instances with a lot of RAM and a few CPUs
if instance.ram_gib <= 24:
return None
if instance.cpu <= 2:
Expand Down Expand Up @@ -413,6 +363,50 @@ def capacity_plan(
)


class NflxElasticsearchSearchCapacityModel(CapacityModel):
@staticmethod
def capacity_plan(
instance: Instance,
drive: Drive,
context: RegionContext,
desires: CapacityDesires,
extra_model_arguments: Dict[str, Any],
) -> Optional[CapacityPlan]:
# Only accept running on instances with a lot of RAM and a few CPUs
if instance.ram_gib <= 24:
return None
if instance.cpu <= 2:
return None

zones_in_region = context.zones_in_region
requirement = CapacityRequirement(
requirement_type="elasticsearch-search-zonal",
core_reference_ghz=desires.core_reference_ghz,
cpu_cores=certain_int(2),
mem_gib=certain_int(24),
context={},
)

cluster = ZoneClusterCapacity(
cluster_type="elasticsearch-search",
count=1,
instance=instance,
attached_drives=[],
annual_cost=instance.annual_cost,
)

ec2_cost = zones_in_region * cluster.annual_cost
clusters = Clusters(
annual_costs={"elasticsearch-search.zonal-clusters": ec2_cost},
zonal=[cluster] * zones_in_region,
)

return CapacityPlan(
requirements=Requirements(zonal=[requirement] * zones_in_region),
candidate_clusters=clusters,
)


class NflxElasticsearchCapacityModel(CapacityModel):
@staticmethod
def capacity_plan(
Expand All @@ -432,21 +426,22 @@ def description():
def compose_with(
user_desires: CapacityDesires, extra_model_arguments: Dict[str, Any]
) -> Tuple[Tuple[str, Callable[[CapacityDesires], CapacityDesires]], ...]:
def _modify_data_desires(
user_desires: CapacityDesires,
) -> CapacityDesires:
def _modify_data_desires(desires: CapacityDesires) -> CapacityDesires:
# data node's model use the full desires
return user_desires
return desires

def _modify_master_desires(
user_desires: CapacityDesires,
) -> CapacityDesires:
# master node's model doesn't use anything from the desires.
return user_desires
def _modify_master_desires(desires: CapacityDesires) -> CapacityDesires:
# master node's model doesn't use anything from the desires
return desires

def _modify_search_desires(desires: CapacityDesires) -> CapacityDesires:
# search node's model doesn't use anything from the desires
return desires

return (
("org.netflix.elasticsearch.node", _modify_data_desires),
("org.netflix.elasticsearch.master", _modify_master_desires),
("org.netflix.elasticsearch.search", _modify_search_desires),
)

@staticmethod
Expand Down Expand Up @@ -508,14 +503,12 @@ def default_desires(user_desires, extra_model_arguments: Dict[str, Any]):
confidence=0.98,
),
),
# Most latency sensitive elasticsearch clusters are in the
# < 100GiB range
# Most latency sensitive Elasticsearch clusters are in the < 100GiB range
data_shape=DataShape(
estimated_state_size_gib=Interval(
low=10, mid=100, high=1000, confidence=0.98
),
# Netflix Elasticsearch compresses with Deflate (gzip)
# by default
# Netflix Elasticsearch compresses with Deflate (gzip) by default
estimated_compression_ratio=Interval(
minimum_value=1.4,
maximum_value=8,
Expand Down Expand Up @@ -595,3 +588,4 @@ def default_desires(user_desires, extra_model_arguments: Dict[str, Any]):
nflx_elasticsearch_capacity_model = NflxElasticsearchCapacityModel()
nflx_elasticsearch_data_capacity_model = NflxElasticsearchDataCapacityModel()
nflx_elasticsearch_master_capacity_model = NflxElasticsearchMasterCapacityModel()
nflx_elasticsearch_search_capacity_model = NflxElasticsearchSearchCapacityModel()
17 changes: 16 additions & 1 deletion tests/netflix/test_elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import Counter
from collections import defaultdict

from service_capacity_modeling.capacity_planner import planner
Expand Down Expand Up @@ -123,20 +124,34 @@ def test_es_simple_certain():
)

assert len(cap_plan) > 0, "Resulting cap_plan is empty"
assert all(plan for plan in cap_plan), "One or more plans is empty"

for plan in cap_plan:
assert plan, "One or more plans is empty"
assert plan.candidate_clusters, "candidate_clusters is empty"
assert plan.candidate_clusters.zonal, "candidate_clusters.zonal is empty"
assert len(plan.candidate_clusters.zonal) == 9, "len(candidate_clusters.zonal) != 9"

cluster_type_counts = Counter(zone.cluster_type for zone in plan.candidate_clusters.zonal)

assert len(cluster_type_counts) == 3, "Expecting 3 cluster types"
assert cluster_type_counts["elasticsearch-search"] == 3, "Expecting exactly 3 search nodes"
assert cluster_type_counts["elasticsearch-master"] == 3, "Expecting exactly 3 master nodes"
assert cluster_type_counts["elasticsearch-data"] >= 3, "Expecting at least 3 data nodes"


def zonal_summary(zlr):
zlr_cpu = zlr.count * zlr.instance.cpu
zlr_cost = zlr.annual_cost
zlr_family = zlr.instance.family
zlr_instance_name = zlr.instance.name
zlr_drive_gib = sum(dr.size_gib for dr in zlr.attached_drives)
if zlr.instance.drive is not None:
zlr_drive_gib += zlr.instance.drive.size_gib
zlr_drive_gib *= zlr.count

return (
zlr_family,
zlr_instance_name,
zlr.count,
zlr_cpu,
zlr_cost,
Expand Down

0 comments on commit 395a602

Please sign in to comment.