Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Commit

Permalink
Recommendations 0.6.15 (#75)
Browse files Browse the repository at this point in the history
* detection of dropped requests and recommandations for next step
* added TPC core balance checks and recommendations to resolve
* added support for tombstone scan counting and recommendations
* detection of back pressure and recommendations to resolve
* detection of remote only drops and recommendations to resolve
* detection of zero copy streaming problems
  • Loading branch information
Ryan SVIHLA authored Oct 22, 2021
1 parent 116fd7b commit 8a58565
Show file tree
Hide file tree
Showing 16 changed files with 692 additions and 55 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ __pycache__
dist
sperf.spec
sperf.build

.vagrant
11 changes: 11 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
sperf 0.6.15
------------
* added detection for indexing backoff with auto soft commit
* added detection for tombstone scan warnings and errors
* added detection for drops and making recommendations around them
* added detection for zero copy bloom filter warnings
* added detection for tpc core imbalance
* added detection for network backpressure rejections
* added detection for tpc backpressure being enabled
* deprecated all commands except sperf command moving all functionality into the primary sperf command

sperf 0.6.14
------------
* sperf was not parsing new byte eviction format as of in sperf filtercache as
Expand Down
2 changes: 1 addition & 1 deletion pysper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""top level module for sperf python port"""
VERSION = "0.6.14"
VERSION = "0.6.15"
11 changes: 11 additions & 0 deletions pysper/changelog.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
CHANGES = """
sperf 0.6.15
------------
* added detection for indexing backoff with auto soft commit
* added detection for tombstone scan warnings and errors
* added detection for drops and making recommendations around them
* added detection for zero copy bloom filter warnings
* added detection for tpc core imbalance
* added detection for network backpressure rejections
* added detection for tpc backpressure being enabled
* deprecated all commands except sperf command moving all functionality into the primary sperf command
sperf 0.6.14
------------
* sperf was not parsing new byte eviction format as of in sperf filtercache as
Expand Down
23 changes: 18 additions & 5 deletions pysper/core/statuslogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,11 @@ def print_histogram(self):
if not self.nodes:
print("Nothing found!")
return

cassandra_versions = set()
dse_versions = set()
for name, node in self.nodes.items():
cassandra_versions.add(node.cassandra_version)
dse_versions.add(node.dse_versions)
print(name)
print("-" * 60)
print("%s lines" % format_num(node.lines))
Expand Down Expand Up @@ -357,10 +360,20 @@ def print_histogram(self):
for line in percentiles:
print("".join(line))
print("")
self.__print_recs()
self.__print_recs(dse_versions, cassandra_versions)

def __print_recs(self):
engine = Engine()
def __print_recs(self, versions, cassandra_versions):
version = None
cassandra_version = None
if len(cassandra_versions) > 1:
print(
"WARNING more than one version present assuming no version with recommendations"
)
elif len(cassandra_versions) == 1:
cassandra_version = cassandra_versions[0]
elif len(versions) == 1:
version = versions[0]
engine = Engine(version, cassandra_version)
recs = set()
for node in self.nodes.values():
rstage = OrderedDefaultDict(dict)
Expand Down Expand Up @@ -460,7 +473,7 @@ def print_summary(self):
pad_table(data, extra_pad=2)
for line in data:
print("".join(line))
self.__print_recs()
self.__print_recs(summary.versions, summary.cassandra_versions)

def __print_backpressure(self, stages, data):
nodes = OrderedDict()
Expand Down
163 changes: 163 additions & 0 deletions pysper/parser/cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,28 @@ def solr_rules():
event_type="query_logs",
),
),
case("AbstractSolrSecondaryIndex"),
rule(
capture(
r"\[(?P<core_name>.+)\]: Increasing soft commit max time to (?P<commit_time>[0-9]+)"
),
update(
event_product="solr",
event_category="indexing",
event_type="increase_soft_commit",
),
),
case("AbstractSolrSecondaryIndex"),
rule(
capture(
r"\[(?P<core_name>.+)\]: Restoring soft commit max time back to (?P<commit_time>[0-9]+)"
),
update(
event_product="solr",
event_category="indexing",
event_type="restore_soft_commit",
),
),
)


Expand Down Expand Up @@ -615,3 +637,144 @@ def dd_rules():
),
),
)


def tpc_rules():
"""rules to capture backpressure and core balance problems"""
return (
case("NoSpamLogger"),
rule(
capture(
r"TPC backpressure is active on core (?P<core_num>\d+) with global local/remote pending tasks at (?P<global_pending>\d+)/(?P<remote_pending>\d+)"
),
convert(int, "core_num", "global_pending", "remote_pending"),
update(
event_product="tpc",
event_category="backpressure",
event_type="core_backpressure",
),
),
rule(
capture(
r"Local TPC backpressure is active with count (?P<local_count>\d+)"
),
convert(int, "local_count"),
update(
event_product="tpc",
event_category="backpressure",
event_type="core_backpressure_local",
),
),
rule(
capture(
r"Rejecting droppable message on connection (?P<message_type>.+) with id (?P<id>\d+) from \/(?P<source_ip>.+) to \/(?P<dest_ip>.+) via \((?P<via_ips>.+)\), total dropped: (?P<total_dropped>.\d+), total pending: (?P<total_pending>.\d+), total completed: (?P<total_completed>.\d+)\."
),
convert(int, "total_dropped"),
update(
event_product="tpc",
event_category="backpressure",
event_type="network_backpressure",
),
),
)


def zc_rules():
"""catch issues with zero copy streaming"""
return (
case("SSTableReader"),
rule(
capture(
r"Could not recreate or deserialize existing bloom filter, continuing with a pass-through bloom filter but this will significantly impact reads performance"
),
update(
event_product="zcs",
event_category="streaming",
event_type="bloom_filter",
),
),
)


def tombstone_rules():
"""catch tombstone problems"""
return (
case("MessageDeliveryTask"),
rule(
capture(
r"Scanned over (?P<tombstones>[0-9]*) tombstones during query '(?P<query>.*)' \(last scanned row partion key was \((?P<pk>.*)\)\); query aborted"
),
convert(int, "tombstones"),
update(
event_product="tombstone",
event_category="reading",
event_type="scan_error",
),
),
case("NoSpamLogger"),
rule(
capture(
r"Scanned over (?P<tombstones>[0-9]*) tombstone rows for query (?P<query>.*) - more than the warning threshold [\d+]+"
),
convert(int, "tombstones"),
update(
event_product="tombstone",
event_category="reading",
event_type="tpc_scan_warn",
),
),
case("MessageDeliveryTask"),
rule(
capture(
r"Read (?P<live>[0-9]*) live rows and (?P<tombstones>[0-9]*) tombstone cells for query (?P<query>.*) \(see tombstone_warn_threshold\)"
),
convert(int, "tombstones"),
update(
event_product="tombstone",
event_category="reading",
event_type="seda_scan_warn",
),
),
)


def drop_rules():
"""drop rules"""
return (
# tpc era
case("DroppedMessages"),
rule(
capture(
r"(?P<messageType>\S*) messages were dropped in the last 5 s: (?P<localCount>\d*) internal and (?P<remoteCount>\d*) cross node\. Mean internal dropped latency: (?P<localLatency>\d*) ms and Mean cross-node dropped latency: (?P<remoteLatency>\d*) ms"
),
convert(int, "localCount", "remoteCount"),
convert(
float,
"localLatency",
"remoteLatency",
),
update(
event_product="cassandra",
event_category="pools",
event_type="drops",
),
),
# seda era
case("MessagingService"),
rule(
capture(
r"(?P<messageType>\S*) messages were dropped in last 5000 ms: (?P<localCount>\d*) internal and (?P<remoteCount>\d*) cross node\. Mean internal dropped latency: (?P<localLatency>\d*) ms and Mean cross-node dropped latency: (?P<remoteLatency>\d*) ms"
),
convert(int, "localCount", "remoteCount"),
convert(
float,
"localLatency",
"remoteLatency",
),
update(
event_product="cassandra",
event_category="pools",
event_type="drops",
),
),
)
8 changes: 8 additions & 0 deletions pysper/parser/systemlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,26 @@
solr_rules,
cfs_rules,
daemon_rules,
drop_rules,
status_rules,
tpc_rules,
tombstone_rules,
zc_rules,
)
from pysper.parser.captures import system_capture_rule

capture_message = switch(
(
*daemon_rules(),
*drop_rules(),
*status_rules(),
*gc_rules(),
*memtable_rules(),
*cfs_rules(),
*solr_rules(),
*tpc_rules(),
*tombstone_rules(),
*zc_rules(),
)
)

Expand Down
37 changes: 32 additions & 5 deletions pysper/recs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,41 @@ class Engine:
"""stage analyzer will make recommendations on stages based on
the type of stage and it's stats"""

def __init__(self):
def __init__(self, dse_version, cass_version):
self.memtable_lower_rec = "lower memtable_cleanup_threshold in cassandra.yaml"
self.flush_writer_raise_rec = "raise memtable_flush_writers in cassandra.yaml"
self.compaction_throughput_raise_rec = (
"raise compaction_throughput_in_mb in cassandra.yaml"
)
self.ntr_queue_raise_rec = (
"raise or set -Dcassandra.max_queued_native_transport_requests= "
+ "(valid range is 1024-8192)"
)
if dse_version is None and cass_version is None:
self.ntr_queue_raise_rec = (
"Undetected Cassandra/DSE version. For Cassandra versions 3.0.19 and up, 3.11.5 and up, and Cassandra 4.0 -"
+ "raise native_transport_max_concurrent_requests_in_bytes_per_ip and native_transport_max_concurrent_requests_in_bytes (see CASSANDRA-15013 and https://cassandra.apache.org/blog/Improving-Apache-Cassandras-Front-Door-and-Backpressure.html)."
+ "If is DSE or Cassandra versions 2.x, 3.0.18 or lower, 3.11.4 or lower then raise or set -Dcassandra.max_queued_native_transport_requests= "
+ "(valid range is 1024-8192)"
)
else:
is_new_dynamic_bp_model = False
if dse_version is None and cass_version is not None:
version_tokens = cass_version.split(".")
if len(version_tokens) == 1 and version_tokens[0] == "DSE Private Fork":
is_new_dynamic_bp_model = True
else:
major = int(version_tokens[0])
minor = int(version_tokens[1])
patch = int(version_tokens[2])
is_new_dynamic_bp_model = (
major == 4
or (major == 3 and minor == 0 and patch > 18)
or (major == 3 and minor == 11 and patch > 4)
)
if is_new_dynamic_bp_model:
self.ntr_queue_raise_rec = "double native_transport_max_concurrent_requests_in_bytes and native_transport_max_concurrent_requests_in_bytes_per_ip."
else:
self.ntr_queue_raise_rec = (
"raise or set -Dcassandra.max_queued_native_transport_requests= "
+ "(valid range is 1024-8192)"
)
self.tpc_cores_raise_rec = (
"raise or set tpc_concurrent_requests_limit in "
+ "cassandra.yaml (default is 128), if CPU is underutilized."
Expand Down Expand Up @@ -92,6 +117,8 @@ def _compaction(self, stage):
return None, None

def _ntr(self, stage):
if stage.pending > 1000:
return "more than 1000 pending NTR", self.ntr_queue_raise_rec
if stage.blocked > 10:
return "blocked NTR over 10", self.ntr_queue_raise_rec
if stage.all_time_blocked > 100:
Expand Down
Loading

0 comments on commit 8a58565

Please sign in to comment.