Skip to content

Commit

Permalink
Improve error message readability.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 660517158
  • Loading branch information
tensorflower-gardener authored and copybara-github committed Aug 7, 2024
1 parent fe82771 commit 39c59b9
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 7 deletions.
3 changes: 3 additions & 0 deletions xla/tsl/distributed_runtime/coordination/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ package(

cc_library(
name = "coordination_service_error_util",
srcs = ["coordination_service_error_util.cc"],
hdrs = ["coordination_service_error_util.h"],
deps = [
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:cord",
"@tsl//tsl/platform:regexp",
"@tsl//tsl/protobuf:coordination_service_proto_cc",
],
)
Expand All @@ -28,6 +30,7 @@ tsl_cc_test(
deps = [
":coordination_service_error_util",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings",
"@tsl//tsl/platform:status",
"@tsl//tsl/platform:test",
"@tsl//tsl/platform:test_main",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ absl::Status CoordinationServiceAgentImpl::ShutdownInternal() {
} else {
LOG(ERROR)
<< "Failed to disconnect from coordination service with status: "
<< status
<< TrimCoordinationErrorMessage(status)
<< "\nProceeding with agent shutdown anyway. This is usually caused "
"by an earlier error during execution. Check the logs (this task "
"or the leader) for an earlier error to debug further.";
Expand Down Expand Up @@ -887,11 +887,12 @@ void CoordinationServiceAgentImpl::SetError(const absl::Status& error) {
assert(!error.ok());
absl::MutexLock l(&state_mu_);
if (state_ == CoordinatedTaskState::TASKSTATE_ERROR) return;
absl::Status trimmed_error = TrimCoordinationErrorMessage(error);

LOG(ERROR) << "Coordination agent is set to ERROR: " << error;
LOG(ERROR) << "Coordination agent is set to ERROR: " << trimmed_error;
state_ = CoordinatedTaskState::TASKSTATE_ERROR;
status_ = error;
error_fn_(error);
status_ = trimmed_error;
error_fn_(trimmed_error);
}

absl::Status CoordinationServiceAgentImpl::ActivateWatch(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/* Copyright 2024 The OpenXLA Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

#include "xla/tsl/distributed_runtime/coordination/coordination_service_error_util.h"

#include <optional>
#include <string>

#include "absl/status/status.h"
#include "absl/strings/cord.h"
#include "absl/strings/str_cat.h"
#include "tsl/platform/regexp.h"

namespace tsl {
absl::Status TrimCoordinationErrorMessage(const absl::Status& s) {
if (s.ok()) {
return s;
}
auto status_message = std::string(s.message());
auto additional_info_index = status_message.find("Additional GRPC");
// This error didn't come from gRPC, so we don't need to trim it.
if (additional_info_index == std::string::npos) {
return s;
}

std::optional<absl::Cord> payload =
s.GetPayload(CoordinationErrorPayloadKey());
if (!payload.has_value() && absl::IsUnavailable(s)) {
// This error is not provided by us, so it's probably an RPC layer error.
auto prefix_message =
"Failed to send RPC to coordination service. Either the leader task "
"died/restarted unexpectedly or this task is experiencing network "
"issues. Check earlier logs from this task and the "
"leader (usually slice 0 process/task/worker 0) to debug further.\n";
status_message = absl::StrCat(
prefix_message,
// Replace the duplicated error message at the start with the prefix.
status_message.substr(additional_info_index));
} else {
// Extract RPC called.
std::string rpc_name;
// Note: it is unfortunate that we have to keep the tensorflow prefix
// because that's the RPC service proto namespace.
RE2::PartialMatch(status_message,
"(/tensorflow.CoordinationService/(\\w+))", &rpc_name);
// Erase duplicated error message.
status_message = status_message.substr(0, additional_info_index);
absl::StrAppend(&status_message, "\nRPC: ", rpc_name);
}
auto trimmed_status = absl::Status(s.code(), status_message);
// Reattach payload.
if (payload.has_value()) {
trimmed_status.SetPayload(CoordinationErrorPayloadKey(), *payload);
}
#if defined(PLATFORM_GOOGLE)
// Reattach source locations.
for (const auto& source_location : s.GetSourceLocations()) {
trimmed_status.AddSourceLocation(source_location);
}
#endif
return trimmed_status;
}
} // namespace tsl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ inline absl::Status MakeCoordinationError(
absl::Cord(payload.SerializeAsString()));
return s;
}

// Trims the error message by replacing the `Additional GRPC error` part.
// Note: The duplicated error message is a quirk of the underlying gRPC code
// that we are using. Changing the shared code may hide important messages for
// other libraries, so we trim the error message for coordination service
// instead. See tsl/distributed_runtime/rpc/grpc_state.h for more details.
absl::Status TrimCoordinationErrorMessage(const absl::Status& s);

} // namespace tsl

#endif // XLA_TSL_DISTRIBUTED_RUNTIME_COORDINATION_COORDINATION_SERVICE_ERROR_UTIL_H_
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
#include <string>

#include "absl/status/status.h"
#include "absl/strings/match.h"
#include "tsl/platform/test.h"
#include "tsl/protobuf/coordination_service.pb.h"
namespace tsl {
Expand Down Expand Up @@ -99,5 +100,54 @@ TEST(CoordinationServiceErrorUtil, MakeCoordinationErrorWithPayload) {
EXPECT_EQ(actual_payload.is_reported_error(), payload.is_reported_error());
}

TEST(CoordinationServiceErrorUtil,
TrimCoordinationErrorMessage_CoordinationError) {
absl::Status error = MakeCoordinationError(absl::InternalError(
"Coordination service has stopped. RecordHeartbeat() from task: "
"/job:jax_worker/replica:0/task:2 failed. Additional GRPC error "
"information from remote target coordination_service while calling "
"/tensorflow.CoordinationService/Heartbeat::UNKNOWN:Error received from "
"peer "
"{file:'third_party/grpc/src/core/lib/surface/filter_stack_call.cc', "
"file_line:464, created_time:'2024-08-05T13:57:51.331198242-07:00', "
"grpc_status:13, grpc_message:'Coordination service has stopped. "
"RecordHeartbeat() from task: /job:jax_worker/replica:0/task:2 failed. "
"'} "));

absl::Status trimmed_error = TrimCoordinationErrorMessage(error);
EXPECT_EQ(trimmed_error.code(), error.code());
EXPECT_EQ(trimmed_error.message(),
"Coordination service has stopped. RecordHeartbeat() from task: "
"/job:jax_worker/replica:0/task:2 failed. \nRPC: "
"/tensorflow.CoordinationService/Heartbeat");
// Payload exists but has no value.
EXPECT_EQ(trimmed_error.GetPayload(CoordinationErrorPayloadKey()).value(),
"");
}

TEST(CoordinationServiceErrorUtil, TrimCoordinationErrorMessage_NetworkError) {
absl::Status error = absl::UnavailableError(
"failed to connect to all addresses; last error: UNKNOWN: "
"ipv4:127.0.0.1:10001: Failed to connect to remote host: Connection "
"refused. Additional GRPC error information from remote target "
"coordination_service while calling "
"/tensorflow.CoordinationService/Heartbeat::UNKNOWN:Error received from "
"peer "
"{file:'third_party/grpc/src/core/lib/surface/filter_stack_call.cc', "
"file_line:464, created_time:'2024-08-05T13:57:53.123562608-07:00', "
"grpc_status:14, grpc_message:'failed to connect to all addresses; last "
"error: UNKNOWN: ipv4:127.0.0.1:10001: Failed to connect to remote host: "
"Connection refused'} ");

absl::Status trimmed_error = TrimCoordinationErrorMessage(error);
auto message = trimmed_error.message();
EXPECT_EQ(trimmed_error.code(), error.code());
EXPECT_TRUE(absl::StrContains(message, "Check earlier logs"));
// Message is not duplicated.
EXPECT_EQ(message.find("failed to connect"),
message.rfind("failed to connect"))
<< trimmed_error;
}

} // namespace
} // namespace tsl
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,7 @@ CoordinationClientCache* NewGrpcCoordinationClientCache(

CoordinationClient* NewGrpcCoordinationClient(
std::shared_ptr<::grpc::Channel> channel) {
// TODO(hanyangtay): Pass in the logical task name for better logging.
return new GrpcCoordinationClient(
channel, /*target=*/"unknown_target_for_coordination_leader");
return new GrpcCoordinationClient(channel, /*target=*/"coordination_service");
}

} // namespace tsl

0 comments on commit 39c59b9

Please sign in to comment.