diff --git a/.gitignore b/.gitignore index 7aa9446..edc67c6 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,5 @@ __pycache__ dist sperf.spec sperf.build + +.vagrant diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 1173689..919e294 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,3 +1,14 @@ +sperf 0.6.15 +------------ +* added detection for indexing backoff with auto soft commit +* added detection for tombstone scan warnings and errors +* added detection for drops and making recommendations around them +* added detection for zero copy bloom filter warnings +* added detection for tpc core imbalance +* added detection for network backpressure rejections +* added detection for tpc backpressure being enabled +* deprecated all commands except sperf command moving all functionality into the primary sperf command + sperf 0.6.14 ------------ * sperf was not parsing new byte eviction format as of in sperf filtercache as diff --git a/pysper/__init__.py b/pysper/__init__.py index fa14364..829b388 100644 --- a/pysper/__init__.py +++ b/pysper/__init__.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. """top level module for sperf python port""" -VERSION = "0.6.14" +VERSION = "0.6.15" diff --git a/pysper/changelog.py b/pysper/changelog.py index 3101349..4aeeb7b 100644 --- a/pysper/changelog.py +++ b/pysper/changelog.py @@ -1,4 +1,15 @@ CHANGES = """ +sperf 0.6.15 +------------ +* added detection for indexing backoff with auto soft commit +* added detection for tombstone scan warnings and errors +* added detection for drops and making recommendations around them +* added detection for zero copy bloom filter warnings +* added detection for tpc core imbalance +* added detection for network backpressure rejections +* added detection for tpc backpressure being enabled +* deprecated all commands except sperf command moving all functionality into the primary sperf command + sperf 0.6.14 ------------ * sperf was not parsing new byte eviction format as of in sperf filtercache as diff --git a/pysper/core/statuslogger.py b/pysper/core/statuslogger.py index 14878ec..41a96df 100644 --- a/pysper/core/statuslogger.py +++ b/pysper/core/statuslogger.py @@ -297,8 +297,11 @@ def print_histogram(self): if not self.nodes: print("Nothing found!") return - + cassandra_versions = set() + dse_versions = set() for name, node in self.nodes.items(): + cassandra_versions.add(node.cassandra_version) + dse_versions.add(node.dse_versions) print(name) print("-" * 60) print("%s lines" % format_num(node.lines)) @@ -357,10 +360,20 @@ def print_histogram(self): for line in percentiles: print("".join(line)) print("") - self.__print_recs() + self.__print_recs(dse_versions, cassandra_versions) - def __print_recs(self): - engine = Engine() + def __print_recs(self, versions, cassandra_versions): + version = None + cassandra_version = None + if len(cassandra_versions) > 1: + print( + "WARNING more than one version present assuming no version with recommendations" + ) + elif len(cassandra_versions) == 1: + cassandra_version = cassandra_versions[0] + elif len(versions) == 1: + version = versions[0] + engine = Engine(version, cassandra_version) recs = set() for node in self.nodes.values(): rstage = OrderedDefaultDict(dict) @@ -460,7 +473,7 @@ def print_summary(self): pad_table(data, extra_pad=2) for line in data: print("".join(line)) - self.__print_recs() + self.__print_recs(summary.versions, summary.cassandra_versions) def __print_backpressure(self, stages, data): nodes = OrderedDict() diff --git a/pysper/parser/cases.py b/pysper/parser/cases.py index f5ab80f..96e8c96 100644 --- a/pysper/parser/cases.py +++ b/pysper/parser/cases.py @@ -164,6 +164,28 @@ def solr_rules(): event_type="query_logs", ), ), + case("AbstractSolrSecondaryIndex"), + rule( + capture( + r"\[(?P.+)\]: Increasing soft commit max time to (?P[0-9]+)" + ), + update( + event_product="solr", + event_category="indexing", + event_type="increase_soft_commit", + ), + ), + case("AbstractSolrSecondaryIndex"), + rule( + capture( + r"\[(?P.+)\]: Restoring soft commit max time back to (?P[0-9]+)" + ), + update( + event_product="solr", + event_category="indexing", + event_type="restore_soft_commit", + ), + ), ) @@ -615,3 +637,144 @@ def dd_rules(): ), ), ) + + +def tpc_rules(): + """rules to capture backpressure and core balance problems""" + return ( + case("NoSpamLogger"), + rule( + capture( + r"TPC backpressure is active on core (?P\d+) with global local/remote pending tasks at (?P\d+)/(?P\d+)" + ), + convert(int, "core_num", "global_pending", "remote_pending"), + update( + event_product="tpc", + event_category="backpressure", + event_type="core_backpressure", + ), + ), + rule( + capture( + r"Local TPC backpressure is active with count (?P\d+)" + ), + convert(int, "local_count"), + update( + event_product="tpc", + event_category="backpressure", + event_type="core_backpressure_local", + ), + ), + rule( + capture( + r"Rejecting droppable message on connection (?P.+) with id (?P\d+) from \/(?P.+) to \/(?P.+) via \((?P.+)\), total dropped: (?P.\d+), total pending: (?P.\d+), total completed: (?P.\d+)\." + ), + convert(int, "total_dropped"), + update( + event_product="tpc", + event_category="backpressure", + event_type="network_backpressure", + ), + ), + ) + + +def zc_rules(): + """catch issues with zero copy streaming""" + return ( + case("SSTableReader"), + rule( + capture( + r"Could not recreate or deserialize existing bloom filter, continuing with a pass-through bloom filter but this will significantly impact reads performance" + ), + update( + event_product="zcs", + event_category="streaming", + event_type="bloom_filter", + ), + ), + ) + + +def tombstone_rules(): + """catch tombstone problems""" + return ( + case("MessageDeliveryTask"), + rule( + capture( + r"Scanned over (?P[0-9]*) tombstones during query '(?P.*)' \(last scanned row partion key was \((?P.*)\)\); query aborted" + ), + convert(int, "tombstones"), + update( + event_product="tombstone", + event_category="reading", + event_type="scan_error", + ), + ), + case("NoSpamLogger"), + rule( + capture( + r"Scanned over (?P[0-9]*) tombstone rows for query (?P.*) - more than the warning threshold [\d+]+" + ), + convert(int, "tombstones"), + update( + event_product="tombstone", + event_category="reading", + event_type="tpc_scan_warn", + ), + ), + case("MessageDeliveryTask"), + rule( + capture( + r"Read (?P[0-9]*) live rows and (?P[0-9]*) tombstone cells for query (?P.*) \(see tombstone_warn_threshold\)" + ), + convert(int, "tombstones"), + update( + event_product="tombstone", + event_category="reading", + event_type="seda_scan_warn", + ), + ), + ) + + +def drop_rules(): + """drop rules""" + return ( + # tpc era + case("DroppedMessages"), + rule( + capture( + r"(?P\S*) messages were dropped in the last 5 s: (?P\d*) internal and (?P\d*) cross node\. Mean internal dropped latency: (?P\d*) ms and Mean cross-node dropped latency: (?P\d*) ms" + ), + convert(int, "localCount", "remoteCount"), + convert( + float, + "localLatency", + "remoteLatency", + ), + update( + event_product="cassandra", + event_category="pools", + event_type="drops", + ), + ), + # seda era + case("MessagingService"), + rule( + capture( + r"(?P\S*) messages were dropped in last 5000 ms: (?P\d*) internal and (?P\d*) cross node\. Mean internal dropped latency: (?P\d*) ms and Mean cross-node dropped latency: (?P\d*) ms" + ), + convert(int, "localCount", "remoteCount"), + convert( + float, + "localLatency", + "remoteLatency", + ), + update( + event_product="cassandra", + event_category="pools", + event_type="drops", + ), + ), + ) diff --git a/pysper/parser/systemlog.py b/pysper/parser/systemlog.py index b4c6a2f..0d8e15e 100644 --- a/pysper/parser/systemlog.py +++ b/pysper/parser/systemlog.py @@ -20,18 +20,26 @@ solr_rules, cfs_rules, daemon_rules, + drop_rules, status_rules, + tpc_rules, + tombstone_rules, + zc_rules, ) from pysper.parser.captures import system_capture_rule capture_message = switch( ( *daemon_rules(), + *drop_rules(), *status_rules(), *gc_rules(), *memtable_rules(), *cfs_rules(), *solr_rules(), + *tpc_rules(), + *tombstone_rules(), + *zc_rules(), ) ) diff --git a/pysper/recs.py b/pysper/recs.py index 1f77ac0..3fc8871 100644 --- a/pysper/recs.py +++ b/pysper/recs.py @@ -44,16 +44,41 @@ class Engine: """stage analyzer will make recommendations on stages based on the type of stage and it's stats""" - def __init__(self): + def __init__(self, dse_version, cass_version): self.memtable_lower_rec = "lower memtable_cleanup_threshold in cassandra.yaml" self.flush_writer_raise_rec = "raise memtable_flush_writers in cassandra.yaml" self.compaction_throughput_raise_rec = ( "raise compaction_throughput_in_mb in cassandra.yaml" ) - self.ntr_queue_raise_rec = ( - "raise or set -Dcassandra.max_queued_native_transport_requests= " - + "(valid range is 1024-8192)" - ) + if dse_version is None and cass_version is None: + self.ntr_queue_raise_rec = ( + "Undetected Cassandra/DSE version. For Cassandra versions 3.0.19 and up, 3.11.5 and up, and Cassandra 4.0 -" + + "raise native_transport_max_concurrent_requests_in_bytes_per_ip and native_transport_max_concurrent_requests_in_bytes (see CASSANDRA-15013 and https://cassandra.apache.org/blog/Improving-Apache-Cassandras-Front-Door-and-Backpressure.html)." + + "If is DSE or Cassandra versions 2.x, 3.0.18 or lower, 3.11.4 or lower then raise or set -Dcassandra.max_queued_native_transport_requests= " + + "(valid range is 1024-8192)" + ) + else: + is_new_dynamic_bp_model = False + if dse_version is None and cass_version is not None: + version_tokens = cass_version.split(".") + if len(version_tokens) == 1 and version_tokens[0] == "DSE Private Fork": + is_new_dynamic_bp_model = True + else: + major = int(version_tokens[0]) + minor = int(version_tokens[1]) + patch = int(version_tokens[2]) + is_new_dynamic_bp_model = ( + major == 4 + or (major == 3 and minor == 0 and patch > 18) + or (major == 3 and minor == 11 and patch > 4) + ) + if is_new_dynamic_bp_model: + self.ntr_queue_raise_rec = "double native_transport_max_concurrent_requests_in_bytes and native_transport_max_concurrent_requests_in_bytes_per_ip." + else: + self.ntr_queue_raise_rec = ( + "raise or set -Dcassandra.max_queued_native_transport_requests= " + + "(valid range is 1024-8192)" + ) self.tpc_cores_raise_rec = ( "raise or set tpc_concurrent_requests_limit in " + "cassandra.yaml (default is 128), if CPU is underutilized." @@ -92,6 +117,8 @@ def _compaction(self, stage): return None, None def _ntr(self, stage): + if stage.pending > 1000: + return "more than 1000 pending NTR", self.ntr_queue_raise_rec if stage.blocked > 10: return "blocked NTR over 10", self.ntr_queue_raise_rec if stage.all_time_blocked > 100: diff --git a/pysper/sperf_default.py b/pysper/sperf_default.py index ab2c4c6..52aee9d 100644 --- a/pysper/sperf_default.py +++ b/pysper/sperf_default.py @@ -14,10 +14,12 @@ """sperf default command run when you type `sperf`""" from collections import OrderedDict from dataclasses import dataclass +from typing import List, Dict +import re from pysper import humanize from pysper.core.diag import parse_diag -from pysper import diag +from pysper import diag, env from pysper import parser, util from pysper.core.diag.reporter import ( format_gc, @@ -39,6 +41,24 @@ class StatusLoggerCounter: blocked: int = 0 +@dataclass +class CoreBackpressureStats: + cores: List[int] + total_bp_events: int + + +@dataclass +class BackpressureStats: + local_backpressure_active: Dict[str, int] + per_core_bp: Dict[str, CoreBackpressureStats] + + +@dataclass +class PendingCoreMeasurement: + core: int + pending: int + + def parse(args): """read diag tarball""" res = parse_diag(args, lambda n: [calculate(n)]) @@ -50,9 +70,35 @@ def parse(args): parsed["configs"] = res.get("original_configs") parsed["summary"] = res.get("configs")[0] parsed["rec_logs"] = res.get("system_logs") + debug_logs + parsed["jvm_args"] = collect_gc_args(res) return parsed +def collect_gc_args(res): + """collects gc arguments""" + gc_feature_node_conf = {} + # these are things that we do not already look for but should be, this was a quick hack + interesting_gc_keys = ["-XX:MaxGCPauseMillis", "-XX:MaxDirectMemorySize"] + for node, config in res.get("original_configs").items(): + if "jvm_args" not in config: + continue + for key, values in config["jvm_args"].items(): + if key in interesting_gc_keys: + for value in values: + if key in gc_feature_node_conf: + if value in gc_feature_node_conf[key]: + gc_feature_node_conf[key][value].append(node) + else: + gc_feature_node_conf[key][value] = [node] + else: + gc_feature_node_conf[key] = {} + try: + gc_feature_node_conf[key][value] = [node] + except TypeError: + raise ValueError(value) + return gc_feature_node_conf + + def calculate(node_config): """aggregate parsed information for the report""" summary = OrderedDict() @@ -83,14 +129,16 @@ def calculate(node_config): def _recs_on_stages( - recommendations: list, - gc_over_500: int, + recommendations: List[Dict[str, str]], + gc_over_target: int, + gc_target: int, counter: StatusLoggerCounter, ): - if gc_over_500 > 0: + if gc_over_target > 0: recommendations.append( { - "issue": "There were %i incidents of GC over 500ms" % gc_over_500, + "issue": "There were %i incidents of GC over %ims" + % (gc_over_target, gc_target), "rec": "Run `sperf core gc` for more analysis", } ) @@ -200,9 +248,50 @@ def _status_logger_counter(event, counter): def generate_recommendations(parsed): """generate recommendations off the parsed data""" - gc_over_500 = 0 + gc_target = 0 + pause_target = None + if "jvm_args" in parsed: + pause_target = parsed["jvm_args"].get("-XX:MaxGCPauseMillis") + if pause_target is not None: + parse_target_array = [] + if len(pause_target.keys()) > 0: + for value, nodes in pause_target.items(): + try: + parsed_target = int(value) + except TypeError as e: + raise ValueError( + "unexpected gc target pause of %s on nodes %s with error %s" + % (value, nodes, e) + ) + if gc_target < parsed_target: + gc_target = parsed_target + parse_target_array.append("(%s: %s)" % (parsed_target, ",".join(nodes))) + if len(pause_target.keys()) > 1: + print( + "WARN there are several pause targets so using the highest for recommendations but they may not be accurate: Configuration is as follows %s" + % "; ".join(parse_target_array) + ) + else: + print( + "WARN cannot find -XX:MaxGCPauseMillis in the logs setting common default of 500ms" + ) + gc_target = 500 + tombstone_errors = 0 + tombstone_warns = 0 + gc_over_target = 0 counter = StatusLoggerCounter() + solr_index_backoff = OrderedDict() + solr_index_restore = OrderedDict() + zero_copy_errors = 0 + drops_remote_only = 0 + rejected = 0 + drop_sums = 0 + drop_types = set() event_filter = diag.UniqEventPerNodeFilter() + bp = BackpressureStats(local_backpressure_active={}, per_core_bp={}) + core_balance = {} + tpc_event_types = ["6.8", "new"] + pool_name_pattern = re.compile(r"TPC\/(?P[0-9]+)$") for rec_log in parsed["rec_logs"]: node = util.extract_node_name(rec_log) event_filter.set_node(node) @@ -213,24 +302,313 @@ def generate_recommendations(parsed): ) else: statuslogger_fixer = diag.UnknownStatusLoggerWriter() + if env.DEBUG: + print("parsing", rec_log_file.filepath) events = parser.read_system_log(rec_log_file) for event in [e for e in events if not event_filter.is_duplicate(e)]: + # statuslogger_fixer.check(event) HAS to run first before any code below or statuslogger events will get tossed. statuslogger_fixer.check(event) - if event.get("event_type") == "unknown": + event_type = event.get("event_type") + event_category = event.get("event_category") + event_product = event.get("event_product") + rule_type = event.get("rule_type") + if event_type == "unknown": continue - if ( - event.get("event_type") == "pause" - and event.get("event_category") == "garbage_collection" + if event_type == "pause" and event_category == "garbage_collection": + if event.get("duration") > gc_target: + gc_over_target += 1 + elif event_product == "tombstone": + if event_type == "scan_error": + tombstone_errors += event.get("tombstones") + elif ( + event_type == "tpc_scan_warn" + or event_type == "seda_scan_warn" + ): + tombstone_warns += event.get("tombstones") + elif ( + event_type == "threadpool_status" + and rule_type in tpc_event_types + ): + pool_name = event.get("pool_name") + if env.DEBUG: + print("detected pool name is %s" % pool_name) + match = pool_name_pattern.match(pool_name) + if match: + core = int(match.group(1)) + if env.DEBUG: + print("detected core is %i" % core) + pending = event.get("pending") + if node in core_balance: + core_balance[node].append( + PendingCoreMeasurement(core, pending) + ) + else: + core_balance[node] = [ + PendingCoreMeasurement(core, pending) + ] + elif ( + event_category == "streaming" + and event_product == "zcs" + and event_type == "bloom_filter" ): - if event.get("duration") > 500: - gc_over_500 += 1 + zero_copy_errors += 1 + elif event_type == "core_backpressure": + if node in bp.per_core_bp.keys(): + bp.per_core_bp[node].cores.append(event.get("core_num")) + bp.per_core_bp[node].total_bp_events += 1 + else: + bp.per_core_bp[node] = CoreBackpressureStats( + cores=[event.get("core_num")], total_bp_events=1 + ) + elif event_type == "core_backpressure_local": + if node in bp.local_backpressure_active: + bp.local_backpressure_active[node] += 1 + else: + bp.local_backpressure_active[node] = 1 + elif event_category == "indexing": + core_name = event.get("core_name") + d = event.get("date") + if event.get("event_type") == "increase_soft_commit": + if core_name in solr_index_backoff: + solr_index_backoff[core_name]["count"] += 1 + solr_index_backoff[core_name]["dates"].append(d) + else: + solr_index_backoff[core_name] = { + "count": 1, + "dates": [event.get("date")], + } + elif event_type == "restore_soft_commit": + if core_name in solr_index_restore: + solr_index_restore[core_name]["count"] += 1 + solr_index_restore[core_name]["dates"].append(d) + else: + solr_index_restore[core_name] = { + "count": 1, + "dates": [event.get("date")], + } + elif event_type == "network_backpressure": + rejected += event.get("total_dropped") + elif event_type == "drops": + local = event.get("localCount") + remote = event.get("remoteCount") + drop_type = event.get("messageType") + drop_types.add(drop_type) + drop_sums += local + remote + if remote > 0 and local == 0: + drops_remote_only += 1 _status_logger_counter(event, counter) recommendations = [] - _recs_on_stages(recommendations, gc_over_500, counter) + _recs_on_stages(recommendations, gc_over_target, gc_target, counter) _recs_on_configs(recommendations, parsed["configs"]) + _recs_on_solr(recommendations, solr_index_backoff, solr_index_restore) + _recs_on_drops( + recommendations, + drops_remote_only, + sorted(list(drop_types), reverse=True), + drop_sums, + ) + _recs_on_zero_copy(recommendations, zero_copy_errors) + _recs_on_rejects(recommendations, rejected) + _recs_on_bp(recommendations, bp, gc_over_target) + _recs_on_core_balance(recommendations, core_balance) + _recs_on_tombstones(recommendations, tombstone_errors, tombstone_warns) return recommendations +def _recs_on_tombstones(recommendations, tombstone_errors, tombstone_warns): + """provides warnings on tombstone warnings and overflow""" + if tombstone_errors > 0: + recommendations.append( + { + "issue": "Tombstone overflowing exceptions found, there were %i total tombstones scanned" + % tombstone_errors, + "rec": "The data model has a problem", + } + ) + if tombstone_warns > 0: + recommendations.append( + { + "issue": "Tombstone warnings found, there were %i total tombstones scanned" + % tombstone_warns, + "rec": "The data model has a problem", + } + ) + + +def _recs_on_rejects(recommendations, rejected): + """reports on network backpressure which is a pretty extreme case of backpressure and seen when the nodes are really struggling""" + if rejected > 0: + recommendations.append( + { + "issue": "TPC network backpressure detected. This is a pretty severe form of backpressure and suggests very overwhelmed nodes", + "rec": "Run `sperf core statuslogger` for more analysis", + } + ) + + +def _recs_on_core_balance(recommendations, core_balance): + """shares that there may be a problems with data model where too many tasks are on one core""" + imbalanced_core_count = {} + for node, measurements in core_balance.items(): + pending_measures = [] + last_core = -1 + total_pending = 0 + for measurement in measurements: + pending_measures.append(measurement.pending) + total_pending += measurement.pending + core_diff = measurement.core - last_core + if core_diff != 1: # we are assuming we are done now + # add a minimum number for pending tasks to cut out noise + if total_pending > 99: + expected_balance = 0.0 + if total_pending != 0: + expected_balance = 1.0 / float(len(pending_measures)) + imbalanced_treshold = expected_balance * 2.0 + if env.DEBUG: + print("%.2f is the imbalance threshold" % imbalanced_treshold) + for pending in pending_measures: + perc_pending = 0.0 + if total_pending != 0: + perc_pending = float(pending) / float(total_pending) + if env.DEBUG: + print( + "%.2f is the percent pending for %i" + % (perc_pending, measurement.core) + ) + if perc_pending > imbalanced_treshold: + if node in imbalanced_core_count: + imbalanced_core_count[node] += 1 + else: + imbalanced_core_count[node] = 1 + else: + if env.DEBUG: + print( + "total pending tasks was only %i so we are skipping since it is below the threshold of 100 pending tasks to measure core balance" + % total_pending + ) + pending_measures = [] + total_pending = 0 + last_core = measurement.core + if len(imbalanced_core_count.keys()) > 0: + nodes_with_imbalance = [] + for node, value in imbalanced_core_count.items(): + nodes_with_imbalance.append("%s (%i times)" % (node, value)) + recommendations.append( + { + "issue": "TPC core imbalance detected on nodes: %s" + % ",".join(nodes_with_imbalance), + "rec": "The data model is likely broken. Look for time series with large buckets and little randomization of writes, large IN queries, and hot partitions on writes. On DSE versions before 6.8.5 consider upgrading first before changing the data model", + } + ) + + +def _recs_on_bp(recommendations, bp, gc_over_target): + """tracks how many times bp was active""" + rec = "" + if gc_over_target > 0: + rec = "GC was over target %s times however despire TPC backpressure being active it may be dangerous to raise TPC limits, run sperf core statuslogger for further analysis on the type of requests that are pending" + else: + rec = "there were no incidents of GC over target, so it is probably safe to raise tpc_concurrent_requests_limit. It may be prudent to run `sperf core statuslogger` for further analysis on the type of requests that are pending" + local_backpressure_active = bp.local_backpressure_active + if len(local_backpressure_active.keys()) > 0: + nodes_with_local = [] + for key, value in local_backpressure_active.items(): + nodes_with_local.append("%s (%i times)" % (key, value)) + recommendations.append( + { + "issue": "Global local backpressure was active on the following nodes: %s" + % ",".join(nodes_with_local), + "rec": rec, + } + ) + per_core_bp = bp.per_core_bp + if len(per_core_bp.keys()): + nodes_with_per_core_bp = [] + for key, value in per_core_bp.items(): + unique_cores = set() + for core in value.cores: + unique_cores.add(core) + nodes_with_per_core_bp.append( + "%s (%i times - %i different cores)" + % ( + key, + value.total_bp_events, + len(unique_cores), + ) + ) + recommendations.append( + { + "issue": "Local core backpressure was active on the following nodes: %s" + % ",".join(nodes_with_per_core_bp), + "rec": rec, + } + ) + + +def _recs_on_zero_copy(recommendations, zero_copy_errors): + """this is a tricky feature, when we see these in the logs we want to report on them, but sometimes they are because the file is too tiny, we will recommend a potential course of + action but caution that it may not help, later on the log message should change and these errors should become more useful""" + if zero_copy_errors > 0: + recommendations.append( + { + "issue": ( + "There were %i incidents of zero copy errors related to bloom filter generation. This COULD be an explaination for increased read latencies, however, this error " + % zero_copy_errors + ) + + "can be only due to the sstables being so small they are immediately deleted and therefore there is no bloom filter generated. In that case there is no issue with this warning", + "rec": "Keep an eye on this. If there are no other good explainations for increased latencies run `nodetool upgradesstables -s` on each node then restart to regenerate and load new bloom filters", + } + ) + + +def _recs_on_drops(recommendations, drops_remote_only, drop_types, drop_sums): + """sums up the drops""" + if drops_remote_only > 0: + recommendations.append( + { + "issue": "There were %i incidents of remote only drops indicating issues with other nodes, the network or TPC balance" + % drops_remote_only, + "rec": "If this is DSE 6.0.x-6.8.4, then upgrade to 6.8.latest - https://datastax.jira.com/browse/DB-4683. Otherwise check for network issues or offline nodes", + } + ) + if drop_sums > 0: + recommendations.append( + { + "issue": "There were drops of the following request types: %s for a total of %i drops" + % (", ".join(drop_types), drop_sums), + "rec": "Run sperf core statuslogger and look for high pending stages for those messages types", + } + ) + + +def _recs_on_solr(recommendations, solr_index_backoff, solr_index_restore): + """sees if we have auto soft commit increases and restores and makes recommendations accordingly""" + if len(solr_index_backoff) > 0: + for core in solr_index_backoff.keys(): + data = solr_index_backoff[core] + dates_str = ", ".join( + list(map(lambda a: a.strftime("%Y-%m-%d %H:%M:%S"), data["dates"])) + ) + if core in solr_index_restore: + recommendations.append( + { + "issue": "There were %i incidents of indexing not be able to keep up for core %s at the following times: %s" + % (data["count"], core, dates_str), + "rec": "Consider raising auto soft commit to 60000 for core '%s' to avoid dropped mutations and timeouts" + % core, + } + ) + else: + recommendations.append( + { + "issue": "There were %i incidents of indexing not be able to keep up for core %s at the following times: %s. There is nothing in the log indicating restore to the configured auto soft commit." + % (data["count"], dates_str, core), + "rec": "Strongly consider raising auto soft commit for core '%s' to avoid dropped mutations and timeouts. This core never restores to the configured value, so there is no benefit to keeping it where it is at" + % core, + } + ) + + def generate_report(parsed, recommendations): """generates report from calculated data""" calculated = parsed.get("summary") diff --git a/setup.py b/setup.py index 95c6592..d16a928 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setup( name="sperf", - version="0.6.14", + version="0.6.15", description="Diagnostic utility for DSE and Cassandra", url="https://www.github.com/DataStax-Toolkit/sperf", scripts=["scripts/sperf"], diff --git a/tests/core/test_statuslogger.py b/tests/core/test_statuslogger.py index c2dded6..0c4d930 100644 --- a/tests/core/test_statuslogger.py +++ b/tests/core/test_statuslogger.py @@ -46,7 +46,7 @@ def test_skip_duplicate_events_diag(self): self.assertEqual(len(sl.nodes), 3) s = Summary(sl.nodes) self.assertEqual(s.lines, 22054) - self.assertEqual(s.skipped_lines, 444) + self.assertEqual(s.skipped_lines, 445) self.assertEqual( s.get_busiest_stages()[0], [ diff --git a/tests/test_recs.py b/tests/test_recs.py index 11a8b97..830d383 100644 --- a/tests/test_recs.py +++ b/tests/test_recs.py @@ -23,7 +23,7 @@ class TestRecs(unittest.TestCase): def test_high_pending_write_remote(self): """verify StageAnalyzer makes recs on high pending writes""" - analyzer = recs.Engine() + analyzer = recs.Engine(None, None) stage = recs.Stage( name="TPC/all/WRITE_REMOTE", pending=10001, @@ -43,7 +43,7 @@ def test_high_pending_write_remote(self): def test_high_pending_write_local(self): """verify StageAnalyzer makes recs on high pending local writes""" - analyzer = recs.Engine() + analyzer = recs.Engine(None, None) stage = recs.Stage( name="TPC/all/WRITE_LOCAL", pending=10001, @@ -63,7 +63,7 @@ def test_high_pending_write_local(self): def test_high_pending_mutations(self): """verify StageAnalyzer makes recs on high pending local writes""" - analyzer = recs.Engine() + analyzer = recs.Engine(None, None) stage = recs.Stage( name="MutationStage", pending=10001, @@ -83,7 +83,7 @@ def test_high_pending_mutations(self): def test_tpc_backpressure(self): """verify StageAnalyzer makes recs on any backpressure""" - analyzer = recs.Engine() + analyzer = recs.Engine(None, None) stage = recs.Stage( name="TPC/2", pending=1, @@ -108,7 +108,7 @@ def test_tpc_backpressure(self): def test_full_memtable(self): """verify StageAnalyzer""" - analyzer = recs.Engine() + analyzer = recs.Engine(None, None) stage = recs.Stage( name="TPC/all/WRITE_MEMTABLE_FULL", pending=0, @@ -128,7 +128,7 @@ def test_full_memtable(self): def test_full_memtable_completed(self): """verify full memtable historically completed""" - analyzer = recs.Engine() + analyzer = recs.Engine(None, None) stage = recs.Stage( name="TPC/all/WRITE_MEMTABLE_FULL", pending=0, @@ -150,7 +150,7 @@ def test_full_memtable_completed(self): def test_compactions_behind(self): """verify compactions analysis""" - analyzer = recs.Engine() + analyzer = recs.Engine(None, None) stage = recs.Stage( name="CompactionManger", pending=101, @@ -170,7 +170,7 @@ def test_compactions_behind(self): def test_memtable_flush_writer_pending(self): """verify flush writer pending""" - analyzer = recs.Engine() + analyzer = recs.Engine(None, None) stage = recs.Stage( name="MemtableFlushWriter", pending=6, @@ -190,7 +190,7 @@ def test_memtable_flush_writer_pending(self): def test_memtable_flush_writer_blocked(self): """verify flush writer blocked""" - analyzer = recs.Engine() + analyzer = recs.Engine(None, None) stage = recs.Stage( name="MemtableFlushWriter", pending=0, @@ -210,7 +210,7 @@ def test_memtable_flush_writer_blocked(self): def test_ntr_blocked(self): """verify ntr blocked""" - analyzer = recs.Engine() + analyzer = recs.Engine(None, "3.11.3") stage = recs.Stage( name="Native-Transport-Requests", pending=0, @@ -234,7 +234,7 @@ def test_ntr_blocked(self): def test_ntr_all_time_blocked(self): """verify ntr blocked""" - analyzer = recs.Engine() + analyzer = recs.Engine(None, "3.11.3") stage = recs.Stage( name="Native-Transport-Requests", pending=0, diff --git a/tests/test_sperf_default.py b/tests/test_sperf_default.py index 176937d..c314841 100644 --- a/tests/test_sperf_default.py +++ b/tests/test_sperf_default.py @@ -32,6 +32,7 @@ def run(): output = steal_output(run) # reads better with the extra newline + self.maxDiff = None self.assertEqual( "\n" + output, """ @@ -56,7 +57,9 @@ def run(): recommendations --------------- -* There were 16 incidents of GC over 500ms. Run `sperf core gc` for more analysis.""", +* There were 16 incidents of GC over 500ms. Run `sperf core gc` for more analysis. +* There were drops of the following request types: MUTATION for a total of 1096 drops. Run sperf core statuslogger and look for high pending stages for those messages types. +* Global local backpressure was active on the following nodes: 10.101.35.102 (1 times). GC was over target %s times however despire TPC backpressure being active it may be dangerous to raise TPC limits, run sperf core statuslogger for further analysis on the type of requests that are pending.""", ) def test_sperf_68(self): @@ -69,6 +72,7 @@ def test_sperf_68(self): args.node_info_prefix = "node_info.json" args.block_dev_prefix = "blockdev_report" args.cfstats_prefix = "cfstats" + self.maxDiff = None def run(): sperf_default.run(args) @@ -78,6 +82,8 @@ def run(): self.assertEqual( "\n" + output, """ +WARN cannot find -XX:MaxGCPauseMillis in the logs setting common default of 500ms + nodes 1 dse version(s) (startup logs) { 6.8.1 } cassandra version(s) (startup logs) { DSE private fork } @@ -100,11 +106,17 @@ def run(): recommendations --------------- -* There were 5 incidents of GC over 500ms. Run `sperf core gc` for more analysis.""", +* There were 5 incidents of GC over 500ms. Run `sperf core gc` for more analysis. +* There were drops of the following request types: RANGE_SLICE, LWT for a total of 8 drops. Run sperf core statuslogger and look for high pending stages for those messages types. +* There were 2 incidents of zero copy errors related to bloom filter generation. This COULD be an explaination for increased read latencies, however, this error can be only due to the sstables being so small they are immediately deleted and therefore there is no bloom filter generated. In that case there is no issue with this warning. Keep an eye on this. If there are no other good explainations for increased latencies run `nodetool upgradesstables -s` on each node then restart to regenerate and load new bloom filters. +* Local core backpressure was active on the following nodes: 172.17.0.2 (320 times - 3 different cores). GC was over target %s times however despire TPC backpressure being active it may be dangerous to raise TPC limits, run sperf core statuslogger for further analysis on the type of requests that are pending. +* TPC core imbalance detected on nodes: 172.17.0.2 (1 times). The data model is likely broken. Look for time series with large buckets and little randomization of writes, large IN queries, and hot partitions on writes. On DSE versions before 6.8.5 consider upgrading first before changing the data model. +* Tombstone warnings found, there were 1001 total tombstones scanned. The data model has a problem.""", ) def test_empty_recommendations(self): """pass cthrough recommendations engine""" + self.maxDiff = None parsed = {"warnings": [], "rec_logs": [], "configs": {}} recommendations = sperf_default.generate_recommendations(parsed) self.assertEqual(len(recommendations), 0) diff --git a/tests/test_statuslogger.py b/tests/test_statuslogger.py index 100481f..15b1fdb 100644 --- a/tests/test_statuslogger.py +++ b/tests/test_statuslogger.py @@ -36,6 +36,7 @@ def test_sperf(self): args.debug_log_prefix = "debug.log" args.reporter = "summary" args.system_log_prefix = "system.log" + self.maxDiff = None def run(): statuslogger.run(args) @@ -47,7 +48,7 @@ def run(): "sperf core statuslogger version: %s\n" % (VERSION) + """ Summary (22,054 lines) -Summary (444 skipped lines) +Summary (445 skipped lines) dse versions: {'6.7.7'} cassandra versions: {'DSE Private Fork'} @@ -79,7 +80,9 @@ def run(): busiest stages in PENDING ------------------------------ 10.101.35.102: - CompactionExecutor: 1""", + CompactionExecutor: 1 + +WARNING more than one version present assuming no version with recommendations""", ) def test_sperf_68(self): @@ -94,6 +97,7 @@ def test_sperf_68(self): args.reporter = "summary" args.debug_log_prefix = "debug.log" args.system_log_prefix = "system.log" + self.maxDiff = None def run(): statuslogger.run(args) @@ -104,8 +108,8 @@ def run(): output, "sperf core statuslogger version: %s\n" % (VERSION) + """ -Summary (20,240 lines) -Summary (2,196 skipped lines) +Summary (20,245 lines) +Summary (2,204 skipped lines) dse versions: {'6.8.1'} cassandra versions: {'DSE Private Fork'} @@ -130,19 +134,22 @@ def run(): busiest stages across all nodes ------------------------------ -* MemtablePostFlush pending: 6 (172.17.0.2) -* MemtableFlushWriter active: 3 (172.17.0.2) -* PerDiskMemtableFlushWriter_0 active: 2 (172.17.0.2) -* CompactionExecutor active: 1 (172.17.0.2) -* MemtablePostFlush active: 1 (172.17.0.2) -* TPC/all/EXECUTE_STATEMENT active: 1 (172.17.0.2) -* LwtStage active: 1 (172.17.0.2) -* TPC/other active: 1 (172.17.0.2) -* TPC/other/EXECUTE_STATEMENT active: 1 (172.17.0.2) -* TPC/0/TIMED_TIMEOUT active: 1 (172.17.0.2) +* TPC/1/READ_LOCAL pending: 100 (172.17.0.2) +* TPC/1/READ_LOCAL active: 32 (172.17.0.2) +* MemtablePostFlush pending: 6 (172.17.0.2) +* MemtableFlushWriter active: 3 (172.17.0.2) +* PerDiskMemtableFlushWriter_0 active: 2 (172.17.0.2) +* CompactionExecutor active: 1 (172.17.0.2) +* MemtablePostFlush active: 1 (172.17.0.2) +* TPC/all/EXECUTE_STATEMENT active: 1 (172.17.0.2) +* LwtStage active: 1 (172.17.0.2) +* TPC/other active: 1 (172.17.0.2) +* TPC/other/EXECUTE_STATEMENT active: 1 (172.17.0.2) +* TPC/0/TIMED_TIMEOUT active: 1 (172.17.0.2) busiest stages in PENDING ------------------------------ 172.17.0.2: + TPC/1/READ_LOCAL: 100 MemtablePostFlush: 6""", ) diff --git a/tests/testdata/dse68/nodes/172.17.0.2/logs/cassandra/debug.log b/tests/testdata/dse68/nodes/172.17.0.2/logs/cassandra/debug.log index a4fedd5..e8a0cc5 100644 --- a/tests/testdata/dse68/nodes/172.17.0.2/logs/cassandra/debug.log +++ b/tests/testdata/dse68/nodes/172.17.0.2/logs/cassandra/debug.log @@ -8949,7 +8949,7 @@ TPC/0/WRITE_MEMTABLE_FULL 0 N/A TPC/0/WRITE_REMOTE 0 0 N/A N/A 0 0 0 N/A 0 TPC/0/WRITE_SWITCH_FOR_MEMTABLE 0 N/A N/A N/A 0 0 2969 N/A N/A TPC/0/WRITE_SWITCH_FOR_RESPONSE 0 N/A N/A N/A 2252539 1685913 10856926 N/A N/A -TPC/1 0 0 0 0 2074412 2034732 23385552 N/A 0 +TPC/1 32 100 0 0 2074412 2034732 23385552 N/A 0 TPC/1/AUTHENTICATION 0 N/A N/A N/A 0 0 0 N/A N/A TPC/1/AUTHORIZATION 0 N/A N/A N/A 0 0 0 N/A N/A TPC/1/BACKPRESSURED_MESSAGE_DECODE 0 N/A N/A N/A 0 0 0 N/A N/A @@ -8982,7 +8982,7 @@ TPC/1/PARALLEL_MESSAGE_DECODE 0 N/A TPC/1/POPULATE_VIRTUAL_TABLE 0 N/A N/A N/A 0 0 0 N/A N/A TPC/1/READ_DISK_ASYNC 0 N/A N/A N/A 0 0 112 N/A N/A TPC/1/READ_INTERNAL 0 N/A N/A N/A 11 0 18878 N/A N/A -TPC/1/READ_LOCAL 0 0 N/A N/A 26 67 89 N/A 0 +TPC/1/READ_LOCAL 32 100 N/A N/A 26 67 89 N/A 0 TPC/1/READ_RANGE_INTERNAL 0 N/A N/A N/A 12 14 32 N/A N/A TPC/1/READ_RANGE_LOCAL 0 0 N/A N/A 0 0 0 N/A 0 TPC/1/READ_RANGE_NODESYNC 0 0 N/A N/A 0 0 0 N/A 0 diff --git a/tests/testdata/dse68/nodes/172.17.0.2/logs/cassandra/system.log b/tests/testdata/dse68/nodes/172.17.0.2/logs/cassandra/system.log index d0a5048..84b0529 100644 --- a/tests/testdata/dse68/nodes/172.17.0.2/logs/cassandra/system.log +++ b/tests/testdata/dse68/nodes/172.17.0.2/logs/cassandra/system.log @@ -2666,3 +2666,8 @@ keyspace1.counter1 0,0 keyspace1.standard1 2390,26526384 WARN [ScheduledTasks:1] 2020-07-22 13:44:16,408 NoSpamLogger.java:98 - Some operations timed out, details available at debug level (debug.log) +WARN [SSTableBatchOpen:9] 2020-07-22 13:45:16,408 SSTableReader.java:775 - Could not recreate or deserialize existing bloom filter, continuing with a pass-through bloom filter but this will significantly impact reads performance +WARN [CoreThread-5] 2020-07-22 13:46:16,408 NoSpamLogger.java:98 - Rejecting droppable message on connection SMALL_MESSAGE with id 13 from /172.17.0.2 to /172.17.0.3 via (/10.1.1.99,/172.17.0.3:7000) +WARN [CoreThread-5] 2020-07-22 13:46:17,408 NoSpamLogger.java:98 - Scanned over 1001 tombstone rows for query SELECT * FROM keyspace1.standard1 WHERE id = 23JDQs-3241-323D-322DD-328DJAKJLD LIMIT 1001 - more than the warning threshold 1000 +WARN [Stream-Deserializer-/10.17.0.12:13331-1341113] 2020-07-22 13:46:18,408 SSTableReader.java:801 - Cannot recreate bloom filter, cannot estimate number of keys +WARN [Stream-Deserializer-/10.17.0.12:13331-1341113] 2020-07-22 13:46:18,409 SSTableReader.java:775 - Could not recreate or deserialize existing bloom filter, continuing with a pass-through bloom filter but this will significantly impact reads performance \ No newline at end of file