Skip to content

Commit

Permalink
fix KubernetesClient download and upload (#749)
Browse files Browse the repository at this point in the history
* support get obproxy parameter value

* support get obproxy parameter value

* fix the log more info

* update

* fix not env param

* fix not env param

* fix not env param

* fix the env param

* fix the env param

* check the inner_config.yml exists, not only check the dir.

* fix 'find' command && delete gather_log.py

* fix 'find' command && delete gather_obproxy_log.py

* fix check task tenant_min_resource && update paramiko==3.4.1

* fix KubernetesClient download and upload

* fix KubernetesClient download and upload

* fix task

* update for kubernetes
  • Loading branch information
wayyoungboy authored Feb 17, 2025
1 parent edf58f7 commit 959ae2b
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 82 deletions.
34 changes: 18 additions & 16 deletions plugins/check/tasks/observer/log/log_size.yaml
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@

info: "check obcluster max_syslog_file_count is 0 or over 100"
task:
- type: sql
sql: "select * FROM oceanbase.GV$OB_PARAMETERS where name=\"max_syslog_file_count\" and SVR_IP=#{svr_ip};"
result:
set_value: ob_max_syslog_file_count
verify: '[ $ob_max_syslog_file_count -eq 0 ] || [ $ob_max_syslog_file_count -ge 100 ]'
err_msg: "node: #{svr_ip} max_syslog_file_count is not 0 or over 100, obcluster max_syslog_file_count: #{ob_max_syslog_file_count}"
- type: ssh
ssh: "df #{home_path}/log | awk 'NR==2{print $4*0.8}'"
result:
set_value: disk_free_space_KB
- type: ssh
ssh: "find #{home_path}/log/ -type f -name \"*.log*\" | wc -l | xargs -I {} echo {} | awk '{print (100-$1) * 256 * 1024}' "
result:
set_value: log_dir_need_space
verify: '[ $disk_free_space_KB -gt $log_dir_need_space ]'
err_msg: "disk_free_space_KB < log_dir_need_space. disk_free_space_KB:#{disk_free_space_KB}KB, log_dir_need_space:#{log_dir_need_space}KB"
- version: "[4.0.0.0,*]"
steps:
- type: sql
sql: "select * FROM oceanbase.GV$OB_PARAMETERS where name=\"max_syslog_file_count\" and SVR_IP=#{svr_ip};"
result:
set_value: ob_max_syslog_file_count
verify: '[ $ob_max_syslog_file_count -eq 0 ] || [ $ob_max_syslog_file_count -ge 100 ]'
err_msg: "node: #{svr_ip} max_syslog_file_count is not 0 or over 100, obcluster max_syslog_file_count: #{ob_max_syslog_file_count}"
- type: ssh
ssh: "df #{home_path}/log | awk 'NR==2{print $4*0.8}'"
result:
set_value: disk_free_space_KB
- type: ssh
ssh: "find #{home_path}/log/ -type f -name \"*.log*\" | wc -l | xargs -I {} echo {} | awk '{print (100-$1) * 256 * 1024}' "
result:
set_value: log_dir_need_space
verify: '[ $disk_free_space_KB -gt $log_dir_need_space ]'
err_msg: "disk_free_space_KB < log_dir_need_space. disk_free_space_KB:#{disk_free_space_KB}KB, log_dir_need_space:#{log_dir_need_space}KB"



Expand Down
52 changes: 27 additions & 25 deletions plugins/check/tasks/observer/log/log_size_with_ocp.yaml
Original file line number Diff line number Diff line change
@@ -1,31 +1,33 @@

info: "check log_dir free space is over the size of 100 file"
task:
- type: ssh
ssh: "echo \"/home/admin/ocp_agent/conf/config_properties/ob_logcleaner.yaml\" "
result:
set_value: ob_logcleaner_file_path
verify: '[ ! -e "${ob_logcleaner_file_path}" ]'
report_type: execution
err_msg: "[not warning] ob_logcleaner.yaml is exist . This node is not deployed by ocp."
- type: ssh
ssh: "echo \"/home/admin/ocp_agent/conf/config_properties/ob_logcleaner.yaml\" |grep \"ob.logcleaner.ob_log.disk.threshold\" -A1 | grep -oE '[0-9]+'"
result:
set_value: ob_logcleaner_file_nu
verify: 79
verify_type: min
report_type: warning
err_msg: "ocp ob.logcleaner.ob_log.disk.threshold is less 80%"
- type: ssh
ssh: "df #{home_path}/log/ | awk 'NR==2{print $4*#{ob_logcleaner_file_nu}/100}'"
result:
set_value: disk_free_space_KB
- type: ssh
ssh: "find #{home_path}/log/ -type f -name \"*.log*\" | wc -l | xargs -I {} echo {} | awk '{print (100-$1) * 256 * 1024}' "
result:
set_value: log_dir_need_space
verify: '[ $disk_free_space_KB -gt $log_dir_need_space ]'
err_msg: "disk_free_space_KB < log_dir_need_space. disk_free_space_KB:#{disk_free_space_KB}KB, log_dir_need_space:#{log_dir_need_space}KB"
- version: "[4.0.0.0,*]"
steps:
- type: ssh
ssh: "echo \"/home/admin/ocp_agent/conf/config_properties/ob_logcleaner.yaml\" "
result:
set_value: ob_logcleaner_file_path
verify: '[ ! -e "${ob_logcleaner_file_path}" ]'
report_type: execution
err_msg: "[not warning] ob_logcleaner.yaml is exist . This node is not deployed by ocp."
- type: ssh
ssh: "echo \"/home/admin/ocp_agent/conf/config_properties/ob_logcleaner.yaml\" |grep \"ob.logcleaner.ob_log.disk.threshold\" -A1 | grep -oE '[0-9]+'"
result:
set_value: ob_logcleaner_file_nu
verify: 79
verify_type: min
report_type: warning
err_msg: "ocp ob.logcleaner.ob_log.disk.threshold is less 80%"
- type: ssh
ssh: "df #{home_path}/log/ | awk 'NR==2{print $4*#{ob_logcleaner_file_nu}/100}'"
result:
set_value: disk_free_space_KB
- type: ssh
ssh: "find #{home_path}/log/ -type f -name \"*.log*\" | wc -l | xargs -I {} echo {} | awk '{print (100-$1) * 256 * 1024}' "
result:
set_value: log_dir_need_space
verify: '[ $disk_free_space_KB -gt $log_dir_need_space ]'
err_msg: "disk_free_space_KB < log_dir_need_space. disk_free_space_KB:#{disk_free_space_KB}KB, log_dir_need_space:#{log_dir_need_space}KB"



Expand Down
8 changes: 4 additions & 4 deletions plugins/check/tasks/observer/tenant/tenant_min_resource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ task:
steps:
- type: sql
sql: ' SELECT GROUP_CONCAT(DISTINCT T4.TENANT_ID)
FROM DBA_OB_RESOURCE_POOLS T1
JOIN DBA_OB_UNIT_CONFIGS T2 ON T1.UNIT_CONFIG_ID = T2.UNIT_CONFIG_ID
JOIN DBA_OB_UNITS T3 ON T1.RESOURCE_POOL_ID = T3.RESOURCE_POOL_ID
JOIN DBA_OB_TENANTS T4 ON T1.TENANT_ID = T4.TENANT_ID
FROM oceanbase.DBA_OB_RESOURCE_POOLS T1
JOIN oceanbase.DBA_OB_UNIT_CONFIGS T2 ON T1.UNIT_CONFIG_ID = T2.UNIT_CONFIG_ID
JOIN oceanbase.DBA_OB_UNITS T3 ON T1.RESOURCE_POOL_ID = T3.RESOURCE_POOL_ID
JOIN oceanbase.DBA_OB_TENANTS T4 ON T1.TENANT_ID = T4.TENANT_ID
WHERE T4.TENANT_ID>1 AND T2.MAX_CPU < 2 OR ROUND(T2.MEMORY_SIZE/1024/1024/1024,2) < 4 ;'
result:
set_value: too_small_tenant
Expand Down
2 changes: 1 addition & 1 deletion requirements3.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ cryptography==44.0.1
idna==3.7
Jinja2==3.1.5
MarkupSafe==2.0.1
paramiko==3.4.0
paramiko==3.4.1
protobuf==4.21.0
pyasn1==0.4.8
pycparser==2.21
Expand Down
163 changes: 127 additions & 36 deletions src/common/ssh_client/kubernetes_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
@file: kubernetes_client.py
@desc:
"""

import os
import select
import tarfile
import tempfile
from tempfile import TemporaryFile
from src.common.ssh_client.base import SsherClient
from kubernetes import client, config
from kubernetes.stream import stream
from kubernetes.stream.ws_client import STDERR_CHANNEL, STDOUT_CHANNEL
from websocket import ABNF


class KubernetesClient(SsherClient):
Expand Down Expand Up @@ -49,57 +55,98 @@ def exec_cmd(self, cmd):
return "KubernetesClient can't get the resp by {0}".format(cmd)
return resp
except Exception as e:
return f"KubernetesClient can't get the resp by {cmd}: {str(e)}"
self.stdio.error("KubernetesClient can't get the resp by {0}: {1}".format(cmd, e))
raise e

def download(self, remote_path, local_path):
return self.__download_file_from_pod(self.namespace, self.pod_name, self.container_name, remote_path, local_path)

def __download_file_from_pod(self, namespace, pod_name, container_name, file_path, local_path):
exec_command = ['tar', 'cf', '-', '-C', '/', file_path]
resp = stream(self.client.connect_get_namespaced_pod_exec, pod_name, namespace, command=exec_command, stderr=True, stdin=False, stdout=True, tty=False, container=container_name, _preload_content=False)
with open(local_path, 'wb') as file:
while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
out = resp.read_stdout()
file.write(out.encode('utf-8'))
if resp.peek_stderr():
err = resp.read_stderr()
self.stdio.error("ERROR: ", err)
break
resp.close()
dir = os.path.dirname(file_path)
bname = os.path.basename(file_path)
exec_command = ['/bin/sh', '-c', f'cd {dir}; tar cf - {bname}']

with TemporaryFile() as tar_buffer:
exec_stream = stream(self.client.connect_get_namespaced_pod_exec, pod_name, namespace, command=exec_command, stderr=True, stdin=True, stdout=True, tty=False, _preload_content=False)
# Copy file to stream
try:
reader = WSFileManager(exec_stream)
while True:
out, err, closed = reader.read_bytes()
if out:
tar_buffer.write(out)
elif err:
self.stdio.error("Error copying file {0}".format(err.decode("utf-8", "replace")))
if closed:
break
exec_stream.close()
tar_buffer.flush()
tar_buffer.seek(0)
with tarfile.open(fileobj=tar_buffer, mode='r:') as tar:
member = tar.getmember(bname)
local_path = os.path.dirname(local_path)
tar.extract(member, path=local_path)
return True
except Exception as e:
raise e

def upload(self, remote_path, local_path):
return self.__upload_file_to_pod(self.namespace, self.pod_name, self.container_name, local_path, remote_path)

def __upload_file_to_pod(self, namespace, pod_name, container_name, local_path, remote_path):
config.load_kube_config()
v1 = client.CoreV1Api()
exec_command = ['tar', 'xvf', '-', '-C', '/', remote_path]
with open(local_path, 'rb') as file:
resp = stream(v1.connect_get_namespaced_pod_exec, pod_name, namespace, command=exec_command, stderr=True, stdin=True, stdout=True, tty=False, container=container_name, _preload_content=False)
# Support data flow for tar command
commands = []
commands.append(file.read())
while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
self.stdio.verbose("STDOUT: %s" % resp.read_stdout())
if resp.peek_stderr():
self.stdio.error("STDERR: %s" % resp.read_stderr())
if commands:
c = commands.pop(0)
resp.write_stdin(c)
else:
break
resp.close()
self.stdio.verbose("upload file to pod")
self.stdio.verbose("local_path: {0}".format(local_path))
remote_path_file_name = os.path.basename(remote_path)
remote_path = "{0}/".format(os.path.dirname(remote_path))
self.stdio.verbose("remote_path: {0}".format(remote_path))
src_path = local_path
dest_dir = remote_path

with tempfile.NamedTemporaryFile(delete=False) as temp_tar:
with tarfile.open(fileobj=temp_tar, mode='w') as tar:
arcname = os.path.basename(src_path)
tar.add(src_path, arcname=arcname)
temp_tar_path = temp_tar.name
self.stdio.verbose(temp_tar_path)

try:
# read tar_data from file
with open(temp_tar_path, 'rb') as f:
tar_data = f.read()

# execute tar command in pod
command = ["tar", "xvf", "-", "-C", dest_dir]
ws_client = stream(self.client.connect_get_namespaced_pod_exec, pod_name, namespace, command=command, stderr=True, stdin=True, stdout=True, tty=False, _preload_content=False)

# send tar_data to pod
chunk_size = 4096
for i in range(0, len(tar_data), chunk_size):
chunk = tar_data[i : i + chunk_size]
ws_client.write_stdin(chunk)

# 关闭输入流并等待完成
# ws_client.write_stdin(None) # 发送EOF
while ws_client.is_open():
ws_client.update(timeout=1)

# 获取错误输出
stderr = ws_client.read_channel(2) # STDERR_CHANNEL=2
if stderr:
raise RuntimeError(f"Pod执行错误: {stderr.decode('utf-8')}")
ws_client.close()
finally:
os.remove(temp_tar_path)

if remote_path_file_name != os.path.basename(local_path):
self.stdio.verbose("move")
self.exec_cmd("mv {0}/{1} {0}/{2}".format(remote_path, os.path.basename(local_path), remote_path_file_name))

def ssh_invoke_shell_switch_user(self, new_user, cmd, time_out):
return self.__ssh_invoke_shell_switch_user(new_user, cmd, time_out)

def __ssh_invoke_shell_switch_user(self, new_user, cmd, time_out):
command = ['/bin/sh', '-c', cmd]
# 构建执行tar命令串,该命令串在切换用户后执行
# exec comm
exec_command = ['su', '-u', new_user, "&"] + command
resp = stream(self.client.connect_get_namespaced_pod_exec, self.pod_name, self.namespace, command=exec_command, stderr=True, stdin=False, stdout=True, tty=False, container=self.container_name)
parts = resp.split('\n', maxsplit=1)
Expand All @@ -115,3 +162,47 @@ def get_ip(self):
if self.node.get("ip") is None:
raise Exception("kubernetes need set the ip of observer")
return self.node.get("ip")


class WSFileManager:
"""
WS wrapper to manage read and write bytes in K8s WSClient
"""

def __init__(self, ws_client):
"""
:param wsclient: Kubernetes WSClient
"""
self.ws_client = ws_client

def read_bytes(self, timeout=0):
"""
Read slice of bytes from stream
:param timeout: read timeout
:return: stdout, stderr and closed stream flag
"""
stdout_bytes = None
stderr_bytes = None

if self.ws_client.is_open():
if not self.ws_client.sock.connected:
self.ws_client._connected = False
else:
r, _, _ = select.select((self.ws_client.sock.sock,), (), (), timeout)
if r:
op_code, frame = self.ws_client.sock.recv_data_frame(True)
if op_code == ABNF.OPCODE_CLOSE:
self.ws_client._connected = False
elif op_code == ABNF.OPCODE_BINARY or op_code == ABNF.OPCODE_TEXT:
data = frame.data
if len(data) > 1:
channel = data[0]
data = data[1:]
if data:
if channel == STDOUT_CHANNEL:
stdout_bytes = data
elif channel == STDERR_CHANNEL:
stderr_bytes = data
return stdout_bytes, stderr_bytes, not self.ws_client._connected
2 changes: 2 additions & 0 deletions src/handler/analyzer/analyze_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ def __parse_log_lines(self, file_full_path):
ret_code = self.__get_observer_ret_code(line)
if len(ret_code) > 1:
trace_id = self.__get_trace_id(line)
if trace_id is None:
continue
if error_dict.get(ret_code) is None:
error_dict[ret_code] = {"file_name": file_full_path, "count": 1, "first_found_time": line_time, "last_found_time": line_time, "trace_id_list": {trace_id} if len(trace_id) > 0 else {}}
else:
Expand Down
8 changes: 8 additions & 0 deletions src/handler/gather/gather_obstack2.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,16 @@ def handle_from_node(node):
file_size = os.path.getsize(resp["gather_pack_path"])
gather_tuples.append((node.get("ip"), False, resp["error"], file_size, int(time.time() - st), resp["gather_pack_path"]))

exec_tag = False
for node in self.nodes:
if node.get("ssh_type") == "docker" or node.get("ssh_type") == "kubernetes":
self.stdio.warn("Skip gather from node {0} because it is a docker or kubernetes node".format(node.get("ip")))
continue
handle_from_node(node)
exec_tag = True
if not exec_tag:
self.stdio.verbose("No node to gather from, skip")
return ObdiagResult(ObdiagResult.SUCCESS_CODE, data={"store_dir": pack_dir_this_command})

summary_tuples = self.__get_overall_summary(gather_tuples)
self.stdio.print(summary_tuples)
Expand Down
8 changes: 8 additions & 0 deletions src/handler/gather/gather_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,23 @@ def handle_from_node(node):
file_size = os.path.getsize(resp["gather_pack_path"])
gather_tuples.append((node.get("ip"), False, resp["error"], file_size, int(time.time() - st), resp["gather_pack_path"]))

exec_tag = False
if self.is_ssh:
for node in self.nodes:
if node.get("ssh_type") == "docker" or node.get("ssh_type") == "kubernetes":
self.stdio.warn("Skip gather from node {0} because it is a docker or kubernetes node".format(node.get("ip")))
continue
handle_from_node(node)
exec_tag = True
else:
local_ip = NetUtils.get_inner_ip(self.stdio)
node = self.nodes[0]
node["ip"] = local_ip
for node in self.nodes:
handle_from_node(node)
if not exec_tag:
self.stdio.verbose("No node to gather from, skip")
return ObdiagResult(ObdiagResult.SUCCESS_CODE, data={"store_dir": pack_dir_this_command})

summary_tuples = self.__get_overall_summary(gather_tuples)
self.stdio.print(summary_tuples)
Expand Down
9 changes: 9 additions & 0 deletions src/handler/gather/gather_sysstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,24 @@ def handle_from_node(node):
file_size = os.path.getsize(resp["gather_pack_path"])
gather_tuples.append((node.get("ip"), False, resp["error"], file_size, int(time.time() - st), resp["gather_pack_path"]))

exec_tag = False
if self.is_ssh:
for node in self.nodes:
if node.get("ssh_type") == "docker" or node.get("ssh_type") == "kubernetes":
self.stdio.warn("Skip gather from node {0} because it is a docker or kubernetes node".format(node.get("ip")))
continue
handle_from_node(node)
exec_tag = True

else:
local_ip = NetUtils.get_inner_ip()
node = self.nodes[0]
node["ip"] = local_ip
for node in self.nodes:
handle_from_node(node)
if not exec_tag:
self.stdio.verbose("No node to gather from, skip")
return ObdiagResult(ObdiagResult.SUCCESS_CODE, data={"store_dir": pack_dir_this_command})

summary_tuples = self.__get_overall_summary(gather_tuples)
self.stdio.print(summary_tuples)
Expand Down

0 comments on commit 959ae2b

Please sign in to comment.