Skip to content

Commit

Permalink
throttle k8s (#699)
Browse files Browse the repository at this point in the history
* throttle max bytes per socket/cycle to 512k, max msgs for critical k8s entities to 100

* ifdef k8s caching

* adjust some commented (TBD) code

* fix the message limit logic
  • Loading branch information
aleks-f authored and luca3m committed Dec 20, 2016
1 parent 2d09f02 commit 8dce069
Show file tree
Hide file tree
Showing 18 changed files with 142 additions and 75 deletions.
2 changes: 2 additions & 0 deletions userspace/libsinsp/filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ sinsp_filter_check_list::sinsp_filter_check_list()
add_filter_check(new sinsp_filter_check_container());
add_filter_check(new sinsp_filter_check_utils());
add_filter_check(new sinsp_filter_check_fdlist());
#ifndef HAS_ANALYZER
add_filter_check(new sinsp_filter_check_k8s());
#endif // HAS_ANALYZER
add_filter_check(new sinsp_filter_check_mesos());
add_filter_check(new sinsp_filter_check_tracer());
add_filter_check(new sinsp_filter_check_evtin());
Expand Down
4 changes: 4 additions & 0 deletions userspace/libsinsp/filterchecks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6301,6 +6301,8 @@ uint8_t* sinsp_filter_check_fdlist::extract(sinsp_evt *evt, OUT uint32_t* len, b
}
}

#ifndef HAS_ANALYZER

///////////////////////////////////////////////////////////////////////////////
// sinsp_filter_check_k8s implementation
///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -6868,6 +6870,8 @@ uint8_t* sinsp_filter_check_k8s::extract(sinsp_evt *evt, OUT uint32_t* len, bool
return NULL;
}

#endif // HAS_ANALYZER

///////////////////////////////////////////////////////////////////////////////
// sinsp_filter_check_mesos implementation
///////////////////////////////////////////////////////////////////////////////
Expand Down
4 changes: 4 additions & 0 deletions userspace/libsinsp/filterchecks.h
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,8 @@ class sinsp_filter_check_fdlist : public sinsp_filter_check
char m_addrbuff[100];
};

#ifndef HAS_ANALYZER

class sinsp_filter_check_k8s : public sinsp_filter_check
{
public:
Expand Down Expand Up @@ -896,6 +898,8 @@ class sinsp_filter_check_k8s : public sinsp_filter_check
string m_tstr;
};

#endif // HAS_ANALYZER

class sinsp_filter_check_mesos : public sinsp_filter_check
{
public:
Expand Down
3 changes: 2 additions & 1 deletion userspace/libsinsp/k8s_api_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ k8s_api_handler::k8s_api_handler(collector_ptr_t collector,
k8s_handler("k8s_api_handler", false,
#ifdef HAS_CAPTURE
url, path, filter, ".", collector, http_version, 1000L, ssl, bt,
false, true, std::make_shared<k8s_dummy_handler>(), blocking_socket, ~0,
#endif // HAS_CAPTURE
false, true, std::make_shared<k8s_dummy_handler>(), blocking_socket, nullptr)
nullptr)
{
}

Expand Down
1 change: 1 addition & 0 deletions userspace/libsinsp/k8s_daemonset_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ k8s_daemonset_handler::k8s_daemonset_handler(k8s_state_t& state
STATE_FILTER, EVENT_FILTER, collector,
http_version, 1000L, ssl, bt, true,
connect, dependency_handler, blocking_socket,
100, // max msgs
#endif // HAS_CAPTURE
&state)
{
Expand Down
1 change: 1 addition & 0 deletions userspace/libsinsp/k8s_deployment_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ k8s_deployment_handler::k8s_deployment_handler(k8s_state_t& state
STATE_FILTER, EVENT_FILTER, collector,
http_version, 1000L, ssl, bt, true,
connect, dependency_handler, blocking_socket,
100, // max msgs
#endif // HAS_CAPTURE
&state)
{
Expand Down
2 changes: 1 addition & 1 deletion userspace/libsinsp/k8s_event_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ k8s_event_handler::k8s_event_handler(k8s_state_t& state
url, "/api/v1/events",
STATE_FILTER, EVENT_FILTER, collector,
http_version, 1000L, ssl, bt, true,
connect, dependency_handler, blocking_socket,
connect, dependency_handler, blocking_socket, ~0,
#endif // HAS_CAPTURE
&state),
m_event_filter(event_filter)
Expand Down
117 changes: 61 additions & 56 deletions userspace/libsinsp/k8s_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ k8s_handler::k8s_handler(const std::string& id,
bool connect,
ptr_t dependency_handler,
bool blocking_socket,
unsigned max_messages,
#endif // HAS_CAPTURE
k8s_state_t* state): m_state(state),
m_id(id + "_state"),
Expand All @@ -61,6 +62,7 @@ k8s_handler::k8s_handler(const std::string& id,
m_connect(connect),
m_dependency_handler(dependency_handler),
m_blocking_socket(blocking_socket),
m_max_messages(max_messages),
#endif // HAS_CAPTURE
m_is_captured(is_captured)
{
Expand All @@ -72,13 +74,13 @@ k8s_handler::k8s_handler(const std::string& id,
{
g_logger.log(std::string("K8s (" + m_id + ") creating handler for " +
uri(m_url).to_string(false) + m_path), sinsp_logger::SEV_DEBUG);
m_http = std::make_shared<handler_t>(*this, m_id, m_url, m_path, m_http_version,
m_handler = std::make_shared<handler_t>(*this, m_id, m_url, m_path, m_http_version,
m_timeout_ms, m_ssl, m_bt, !m_blocking_socket, m_blocking_socket);
m_http->set_json_callback(&k8s_handler::set_event_json);
m_http->add_json_filter(*m_filter);
m_http->add_json_filter(ERROR_FILTER);
m_http->close_on_chunked_end(false);
m_http->set_check_chunked(false);
m_handler->set_json_callback(&k8s_handler::set_event_json);
m_handler->add_json_filter(*m_filter);
m_handler->add_json_filter(ERROR_FILTER);
m_handler->close_on_chunked_end(false);
m_handler->set_check_chunked(false);
this->connect();
}
#endif // HAS_CAPTURE
Expand All @@ -93,50 +95,50 @@ void k8s_handler::make_http()
#ifdef HAS_CAPTURE
if(m_connect && m_collector)
{
if(!m_http)
if(!m_handler)
{
g_logger.log(std::string("K8s (" + m_id + ") creating handler for " +
uri(m_url).to_string(false) + m_path), sinsp_logger::SEV_INFO);
m_http = std::make_shared<handler_t>(*this, m_id, m_url, m_path, m_http_version,
m_handler = std::make_shared<handler_t>(*this, m_id, m_url, m_path, m_http_version,
m_timeout_ms, m_ssl, m_bt, true, m_blocking_socket);
m_http->set_json_callback(&k8s_handler::set_event_json);
m_handler->set_json_callback(&k8s_handler::set_event_json);
}
else if(m_collector->has(m_http))
else if(m_collector->has(m_handler))
{
m_collector->remove(m_http);
m_collector->remove(m_handler);
}
m_http->remove_json_filter(m_state_filter);
m_handler->remove_json_filter(m_state_filter);
m_filter = &m_event_filter;
if(!m_http->has_json_filter(ERROR_FILTER))
if(!m_handler->has_json_filter(ERROR_FILTER))
{
m_http->add_json_filter(ERROR_FILTER);
m_handler->add_json_filter(ERROR_FILTER);
}
// good event filter must always be before error event filter
m_http->add_json_filter(*m_filter, ERROR_FILTER);
m_http->set_path(m_path);
m_http->set_id(m_id);
m_handler->add_json_filter(*m_filter, ERROR_FILTER);
m_handler->set_path(m_path);
m_handler->set_id(m_id);
m_collector->set_steady_state(true);
m_watching = true;
m_blocking_socket = false;
m_http->close_on_chunked_end(false);
m_http->set_check_chunked(true);
m_handler->close_on_chunked_end(false);
m_handler->set_check_chunked(true);

m_req_sent = false;
m_resp_recvd = false;
connect();
m_http->set_socket_option(SOCK_NONBLOCK);
m_handler->set_socket_option(SOCK_NONBLOCK);
}
#endif // HAS_CAPTURE
}

void k8s_handler::check_enabled()
{
#ifdef HAS_CAPTURE
if(!m_http->is_enabled())
if(!m_handler->is_enabled())
{
g_logger.log("k8s_handler (" + m_id +
") check_enabled() enabling socket in collector", sinsp_logger::SEV_TRACE);
m_http->enable();
m_handler->enable();
}
else
{
Expand All @@ -151,22 +153,22 @@ void k8s_handler::check_enabled()
bool k8s_handler::connect()
{
#ifdef HAS_CAPTURE
if(m_collector && m_http)
if(m_collector && m_handler)
{
if(!m_collector->has(m_http))
if(!m_collector->has(m_handler))
{
g_logger.log(std::string("k8s_handler (" + m_id +
") k8s_handler::connect() adding handler to collector"), sinsp_logger::SEV_TRACE);
m_collector->add(m_http);
m_collector->add(m_handler);
return false;
}
if(m_http->is_connecting())
if(m_handler->is_connecting())
{
g_logger.log(std::string("k8s_handler (" + m_id +
"), k8s_handler::connect() connecting to " + m_http->get_url().to_string(false)), sinsp_logger::SEV_TRACE);
"), k8s_handler::connect() connecting to " + m_handler->get_url().to_string(false)), sinsp_logger::SEV_TRACE);
return false;
}
if(m_http->is_connected())
if(m_handler->is_connected())
{
g_logger.log("k8s_handler (" + m_id +
") k8s_handler::connect() socket is connected.", sinsp_logger::SEV_TRACE);
Expand All @@ -188,22 +190,22 @@ bool k8s_handler::connect()
void k8s_handler::send_data_request()
{
#ifdef HAS_CAPTURE
if(m_http)
if(m_handler)
{
if(!m_req_sent)
{
if(m_http->is_connected())
if(m_handler->is_connected())
{
g_logger.log("k8s_handler (" + m_id + ") sending request to " +
m_http->get_url().to_string(false) + m_path,
m_handler->get_url().to_string(false) + m_path,
sinsp_logger::SEV_DEBUG);
m_http->send_request();
m_handler->send_request();
m_req_sent = true;
}
else if(m_http->is_connecting())
else if(m_handler->is_connecting())
{
g_logger.log("k8s_handler (" + m_id + ") is connecting to " +
m_http->get_url().to_string(false),
m_handler->get_url().to_string(false),
sinsp_logger::SEV_DEBUG);
}
}
Expand All @@ -218,13 +220,13 @@ void k8s_handler::send_data_request()
void k8s_handler::receive_response()
{
#ifdef HAS_CAPTURE
if(m_http)
if(m_handler)
{
if(m_req_sent)
{
if(!m_watching)
{
if(m_http->get_all_data())
if(m_handler->get_all_data())
{
m_data_received = true;
}
Expand Down Expand Up @@ -253,9 +255,9 @@ void k8s_handler::receive_response()
bool k8s_handler::is_alive() const
{
#ifdef HAS_CAPTURE
if(m_http && !m_http->is_connecting() && !m_http->is_connected())
if(m_handler && !m_handler->is_connecting() && !m_handler->is_connected())
{
g_logger.log("k8s_handler (" + m_id + ") connection (" + m_http->get_url().to_string(false) + ") loss.",
g_logger.log("k8s_handler (" + m_id + ") connection (" + m_handler->get_url().to_string(false) + ") loss.",
sinsp_logger::SEV_WARNING);
return false;
}
Expand All @@ -268,9 +270,9 @@ void k8s_handler::check_collector_status()
#ifdef HAS_CAPTURE
if(m_collector)
{
if(!m_collector->has(m_http))
if(!m_collector->has(m_handler))
{
m_http.reset();
m_handler.reset();
make_http();
}
}
Expand All @@ -284,7 +286,7 @@ void k8s_handler::check_collector_status()
void k8s_handler::check_state()
{
#ifdef HAS_CAPTURE
if(m_collector && m_http)
if(m_collector && m_handler)
{
if(m_resp_recvd && m_watch && !m_watching)
{
Expand All @@ -309,10 +311,10 @@ void k8s_handler::check_state()
throw sinsp_exception("k8s_handler (" + m_id + "), invalid URL path: " + m_path);
}
}
m_http->set_socket_option(SOCK_NONBLOCK);
m_handler->set_socket_option(SOCK_NONBLOCK);
make_http();
}
if(m_watching && m_id.find("_state") == std::string::npos && m_http->wants_send())
if(m_watching && m_id.find("_state") == std::string::npos && m_handler->wants_send())
{
m_req_sent = false;
m_resp_recvd = false;
Expand All @@ -324,9 +326,9 @@ void k8s_handler::check_state()
bool k8s_handler::connection_error() const
{
#ifdef HAS_CAPTURE
if(m_http)
if(m_handler)
{
return m_http->connection_error();
return m_handler->connection_error();
}
#endif // HAS_CAPTURE
return false;
Expand All @@ -335,17 +337,17 @@ bool k8s_handler::connection_error() const
void k8s_handler::collect_data()
{
#ifdef HAS_CAPTURE
if(m_collector && m_http)
if(m_collector && m_handler)
{
process_events(); // there may be leftovers from state connection closed by collector
check_state(); // switch to events, if needed
g_logger.log("k8s_handler (" + m_id + ")::collect_data(), checking connection to " + uri(m_url).to_string(false), sinsp_logger::SEV_DEBUG);
if(m_http->is_connecting())
if(m_handler->is_connecting())
{
g_logger.log("k8s_handler (" + m_id + ")::collect_data(), connecting to " + uri(m_url).to_string(false), sinsp_logger::SEV_DEBUG);
return;
}
else if(m_http->is_connected())
else if(m_handler->is_connected())
{
if(!m_connect_logged)
{
Expand Down Expand Up @@ -632,35 +634,38 @@ void k8s_handler::process_events()
{
if(dependency_ready())
{
for(auto evt : m_events)
unsigned counter = 0;
for(auto evt = m_events.begin(); evt != m_events.end();)
{
if(evt && !evt->isNull())
m_state_processing_started = true;
if(++counter >= get_max_messages()) { break; }
if(*evt && !(*evt)->isNull())
{
if(g_logger.get_severity() >= sinsp_logger::SEV_TRACE)
{
g_logger.log("k8s_handler (" + m_id + ") processing event data:\n" + json_as_string(*evt),
g_logger.log("k8s_handler (" + m_id + ") processing event data:\n" + json_as_string(*(*evt)),
sinsp_logger::SEV_TRACE);
}
#ifdef HAS_CAPTURE
if(m_is_captured)
{
m_state->enqueue_capture_event(*evt);
m_state->enqueue_capture_event(**evt);
}
#endif // HAS_CAPTURE
handle_json(std::move(*evt));
handle_json(std::move(**evt));
}
else
{
g_logger.log("k8s_handler (" + m_id + ") error " +
#ifdef HAS_CAPTURE
"(" + uri(m_url).to_string(false) + ") " +
#endif // HAS_CAPTURE
(!evt ? "data is null." : (evt->isNull() ? "JSON is null." : "Unknown")),
(!(*evt) ? "data is null." : ((*evt)->isNull() ? "JSON is null." : "Unknown")),
sinsp_logger::SEV_ERROR);
}
evt = m_events.erase(evt);
}
if(!m_state_built && m_events.size()) { m_state_built = true; }
m_events.clear();
if(!m_state_built && m_state_processing_started && !m_events.size()) { m_state_built = true; }
}
}

Expand Down
Loading

0 comments on commit 8dce069

Please sign in to comment.