Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stop for Engine #309

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion lazyllm/common/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def __repr__(self): return repr(self._func)
def __doc__(self, value): self._func.__doc__ = value

@property
def flag(self):
def flag(self) -> once_flag:
return getattr(self._instance, f'_lazyllm_{self._func.__name__}_once_flag')

def __init__(self, func):
Expand Down
50 changes: 37 additions & 13 deletions lazyllm/engine/lightengine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .engine import Engine, Node
from lazyllm import ActionModule, once_wrapper
from typing import List, Dict, Optional
from lazyllm import once_wrapper
from typing import List, Dict, Optional, overload
import uuid


Expand Down Expand Up @@ -32,12 +32,37 @@ def update_node(self, node):
self._nodes[node.id] = super(__class__, self).build_node(node)
return self._nodes[node.id]

@overload
def start(self, nodes: str) -> None:
...

@overload
def start(self, nodes: List[Dict] = [], edges: List[Dict] = [], resources: List[Dict] = [],
gid: Optional[str] = None, name: Optional[str] = None):
node = Node(id=gid or str(uuid.uuid4().hex), kind='Graph',
name=name or str(uuid.uuid4().hex), args=dict(nodes=nodes, edges=edges, resources=resources))
self.graph = self.build_node(node).func
self.graph.start()
gid: Optional[str] = None, name: Optional[str] = None) -> str:
...

def start(self, nodes=[], edges=[], resources=[], gid=None, name=None):
if isinstance(nodes, str):
self.build_node(nodes).func.start()
else:
gid, name = gid or str(uuid.uuid4().hex), name or str(uuid.uuid4().hex)
node = Node(id=gid, kind='Graph', name=name, args=dict(nodes=nodes, edges=edges, resources=resources))
self.build_node(node).func.start()
return gid

def status(self, node_id: str, task_name: Optional[str] = None):
node = self.build_node(node_id)
assert node.kind in ('LocalLLM')
return node.func.status(task_name=task_name)

def stop(self, node_id: str, task_name: Optional[str] = None):
node = self.build_node(node_id)
if task_name:
assert node.kind in ('LocalLLM')
node.func.stop(task_name=task_name)
else:
assert node.kind in ('Graph', 'LocalLLM', 'LocalEmbedding', 'SD', 'TTS', 'STT')
node.func.stop()

def update(self, nodes: List[Dict] = [], changed_nodes: List[Dict] = [],
edges: List[Dict] = [], changed_resources: List[Dict] = [],
Expand All @@ -47,10 +72,9 @@ def update(self, nodes: List[Dict] = [], changed_nodes: List[Dict] = [],
raise NotImplementedError('Web and Api server are not allowed now')
self.update_node(r)
for n in changed_nodes: self.update_node(n)
node = Node(id=gid or str(uuid.uuid4().hex), kind='Graph',
name=name or str(uuid.uuid4().hex), args=dict(nodes=nodes, edges=edges))
self.graph = self.update_node(node).func
ActionModule(self.graph).start()
gid, name = gid or str(uuid.uuid4().hex), name or str(uuid.uuid4().hex)
node = Node(id=gid, kind='Graph', name=name, args=dict(nodes=nodes, edges=edges))
self.update_node(node).func.start()

def run(self, *args, **kw):
return self.graph(*args, **kw)
def run(self, id: str, *args, **kw):
return self.build_node(id).func(*args, **kw)
12 changes: 1 addition & 11 deletions lazyllm/engine/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,7 @@ class NodeArgs(object):
builder_argument=dict(
trainset=NodeArgs(str),
prompt=NodeArgs(str),
finetune_method=NodeArgs(str, getattr_f=partial(getattr, lazyllm.finetune)),
deploy_method=NodeArgs(str, 'vllm', getattr_f=partial(getattr, lazyllm.deploy))),
other_arguments=dict(
finetune_method=dict(
batch_size=NodeArgs(int, 16),
micro_batch_size=NodeArgs(int, 2),
num_epochs=NodeArgs(int, 3),
learning_rate=NodeArgs(float, 5e-4),
lora_r=NodeArgs(int, 8),
lora_alpha=NodeArgs(int, 32),
lora_dropout=NodeArgs(float, 0.05)))
deploy_method=NodeArgs(str, 'vllm', getattr_f=partial(getattr, lazyllm.deploy)))
)

all_nodes['OnlineLLM'] = dict(
Expand Down
53 changes: 46 additions & 7 deletions lazyllm/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ class Status(Enum):


class LazyLLMLaunchersBase(object, metaclass=LazyLLMRegisterMetaClass):
Status = Status

def __init__(self) -> None:
self.status = Status.TBSubmitted
self._id = str(uuid.uuid4().hex)

def makejob(self, cmd):
Expand All @@ -44,6 +45,12 @@ def cleanup(self):
v.stop()
LOG.info(f"killed job:{k}")
self.all_processes.pop(self._id)
self.wait()

@property
def status(self):
assert len(self.all_processes[self._id]) == 1
return self.all_processes[self._id][0].status

def wait(self):
for _, v in self.all_processes[self._id]:
Expand Down Expand Up @@ -257,7 +264,7 @@ def _get_idle_gpus(self):
encoding='utf-8'
)
except Exception as e:
LOG.error(f"An error occurred: {e}")
LOG.warning(f"Get idle gpus failed: {e}, if you have no gpu-driver, ignor it.")
return []
lines = order_list.strip().split('\n')

Expand Down Expand Up @@ -316,6 +323,8 @@ def stop(self):
cmd = f"scancel --quiet {self.jobid}"
subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
encoding='utf-8', executable='/bin/bash')
self.jobid = None

if self.ps:
self.ps.terminate()
self.queue = Queue()
Expand Down Expand Up @@ -521,6 +530,30 @@ def get_jobip(self):
else:
raise RuntimeError("Cannot get IP.", f"JobID: {self.jobid}")

def _scancel_job(self, cmd, max_retries=3):
retries = 0
while retries < max_retries:
ps = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding='utf-8', executable='/bin/bash')
try:
stdout, stderr = ps.communicate(timeout=3)
if stdout:
LOG.info(stdout)
if 'success scancel' in stdout:
break
if stderr:
LOG.error(stderr)
except subprocess.TimeoutExpired:
ps.kill()
LOG.warning(f"Command timed out, retrying... (Attempt {retries + 1}/{max_retries})")
except Exception as e:
LOG.error("Try to scancel, but meet: ", e)
retries += 1
if retries == max_retries:
LOG.error(f"Command failed after {max_retries} attempts.")

def stop(self):
if self.jobid:
cmd = f"scancel --workspace-id={self.workspace_name} {self.jobid}"
Expand All @@ -531,17 +564,21 @@ def stop(self):
f"To delete by terminal, you can execute: `{cmd}`"
)
else:
subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding='utf-8', executable='/bin/bash')
self._scancel_job(cmd)
time.sleep(0.5) # Avoid the execution of scancel and scontrol too close together.

with lazyllm.timeout(25):
while self.status not in (Status.Done, Status.Cancelled, Status.Failed):
time.sleep(1)

if self.ps:
self.ps.terminate()
self.queue = Queue()
self.output_thread_event.set()
self.output_thread.join()

self.jobid = None

def wait(self):
if self.ps:
self.ps.wait()
Expand All @@ -556,11 +593,13 @@ def status(self):
job_state = id_json['status_phase'].strip().lower()
if job_state == 'running':
return Status.Running
elif job_state in ['tbsubmitted', 'suspending', 'suspended']:
elif job_state in ['tbsubmitted', 'suspending']:
return Status.TBSubmitted
elif job_state in ['waiting', 'init', 'queueing', 'creating',
'restarting', 'recovering', 'starting']:
return Status.InQueue
elif job_state in ['suspended']:
return Status.Cancelled
elif job_state == 'succeeded':
return Status.Done
except Exception:
Expand Down
69 changes: 46 additions & 23 deletions lazyllm/module/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
import functools
from datetime import datetime
from lazyllm import ThreadPoolExecutor, FileSystemQueue
from typing import Dict, List, Any, Union
from typing import Dict, List, Any, Union, Optional

import lazyllm
from lazyllm import FlatList, Option, launchers, LOG, package, kwargs, encode_request, globals
from ..components.prompter import PrompterBase, ChatPrompter, EmptyPrompter
from ..components.formatter import FormatterBase, EmptyFormatter
from ..components.utils import ModelManager
from ..flow import FlowBase, Pipeline, Parallel
from ..launcher import LazyLLMLaunchersBase as Launcher
import uuid
from ..client import get_redis, redis_client

Expand Down Expand Up @@ -179,7 +180,9 @@ def start(self): return self._update(mode=['server'], recursive=True)
def restart(self): return self.start()
def wait(self): pass

def stop(self): raise NotImplementedError(f'Function stop is not implemented in {self.__class__}!')
def stop(self):
for m in self._submodules:
m.stop()

@property
def options(self):
Expand Down Expand Up @@ -434,6 +437,7 @@ def _get_deploy_tasks(self):

def stop(self):
self._launcher.cleanup()
self._get_deploy_tasks.flag.reset()

def __del__(self):
self.stop()
Expand Down Expand Up @@ -477,42 +481,53 @@ def __init__(self, base_model='', target_path='', stream=False, train=None, fine
self._train, self._finetune, self._deploy = train, finetune, deploy
self._stream = stream
self._father = []
self._launchers = []
self._launchers: Dict[str, Dict[str, Launcher]] = dict(default=dict(), manual=dict())
self._deployer = None
self._specific_target_path = None

def _add_father(self, father):
if father not in self._father: self._father.append(father)

def _get_args(self, arg_cls, disable=[]):
args = getattr(self, f'_{arg_cls}_args', dict())
def _get_train_or_deploy_args(self, arg_cls: str, disable: List[str] = []):
args = getattr(self, f'_{arg_cls}_args', dict()).copy()
if len(set(args.keys()).intersection(set(disable))) > 0:
raise ValueError(f'Key `{", ".join(disable)}` can not be set in '
'{arg_cls}_args, please pass them from Module.__init__()')
if 'launcher' in args:
args['launcher'] = args['launcher'].clone() if args['launcher'] else launchers.remote(sync=False)
self._launchers.append(args['launcher'])
if 'url' not in args:
args['launcher'] = args['launcher'].clone() if args.get('launcher') else launchers.remote(sync=False)
self._launchers['default'][arg_cls] = args['launcher']
return args

def _get_train_tasks(self):
trainset_getf = lambda: lazyllm.package(self._trainset, None) \
if isinstance(self._trainset, str) else self._trainset # noqa E731
def _get_train_tasks_impl(self, mode: Optional[str] = None, **kw):
mode = mode or self._mode
assert mode in ('train', 'finetune'), 'mode must be train or finetune'

trainset_getf = (lambda: lazyllm.package(self._trainset, None)) if isinstance(
self._trainset, str) else self._trainset
target_path = self._generate_target_path()
if not os.path.exists(target_path):
os.system(f'mkdir -p {target_path}')
if self._mode == 'train':
args = self._get_args('train', disable=['base_model', 'target_path'])
train = self._train(base_model=self._base_model, target_path=target_path, **args)
elif self._mode == 'finetune':
args = self._get_args('finetune', disable=['base_model', 'target_path'])
train = self._finetune(base_model=self._base_model, target_path=target_path, **args)
else:
raise RuntimeError('mode must be train or finetune')

kw = kw or self._get_train_or_deploy_args(mode, disable=['base_model', 'target_path'])
task = getattr(self, f'_{mode}')(base_model=self._base_model, target_path=target_path, **kw)
return [trainset_getf, task]

def _get_train_tasks(self):
def after_train(real_target_path):
self._finetuned_model_path = real_target_path
return real_target_path
return Pipeline(trainset_getf, train, after_train)
return Pipeline(*self._get_train_tasks_impl(), after_train)

def _trian(self, name: str, ngpus: int = 1, mode: str = None, batch_size: int = 16,
micro_batch_size: int = 2, num_epochs: int = 3, learning_rate: float = 5e-4,
lora_r: int = 8, lora_alpha: int = 32, lora_dropout: float = 0.05, **kw):
assert name and isinstance(name, str), 'Invalid name: {name}, expect a valid string'
assert name not in self._launchers['manual'], 'Duplicate name: {name}'
self._launchers['manual'][name] = kw['launcher'] = launchers.remote(sync=False, ngpus=ngpus)

Pipeline(*self._get_train_tasks_impl(
mode=mode, batch_size=batch_size, micro_batch_size=micro_batch_size, num_epochs=num_epochs,
learning_rate=learning_rate, lora_r=lora_r, lora_alpha=lora_alpha, lora_dropout=lora_dropout, **kw))()

def _get_all_finetuned_models(self):
valid_paths = []
Expand Down Expand Up @@ -578,7 +593,7 @@ def _set_template(self, deployer):
f._stream_url_suffix = deployer.stream_url_suffix()

def _deploy_setter_hook(self):
self._deploy_args = self._get_args('deploy', disable=['target_path'])
self._deploy_args = self._get_train_or_deploy_args('deploy', disable=['target_path'])
if self._deploy is not lazyllm.deploy.AutoDeploy:
self._set_template(self._deploy)
if url := self._deploy_args.get('url', None):
Expand All @@ -588,8 +603,7 @@ def _deploy_setter_hook(self):
self._get_deploy_tasks.flag.set()

def __del__(self):
for launcher in self._launchers:
launcher.cleanup()
[[launcher.cleanup() for launcher in group.values()] for group in self._launchers.values()]

def _generate_target_path(self):
base_model_name = os.path.basename(self._base_model)
Expand Down Expand Up @@ -650,6 +664,15 @@ def wait(self):
for launcher in self._impl._launchers:
launcher.wait()

def stop(self, task_name: Optional[str] = None):
launcher = self._impl._launchers['manual' if task_name else 'default'][task_name or 'deploy']
if not task_name: self._impl._get_deploy_tasks.flag.reset()
launcher.cleanup()

def status(self, task_name: Optional[str] = None):
launcher = self._impl._launchers['manual' if task_name else 'default'][task_name or 'deploy']
return launcher.status

# modify default value to ''
def prompt(self, prompt=''):
if self.base_model != '' and prompt == '' and ModelManager.get_model_type(self.base_model) != 'llm':
Expand Down
2 changes: 1 addition & 1 deletion lazyllm/tools/rag/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def get_status_cond_and_params(status: Union[str, List[str]],
prefix = f'{prefix}.' if prefix else ''
if isinstance(status, str):
if status != DocListManager.Status.all:
conds.append('{prefix}status = ?')
conds.append(f'{prefix}status = ?')
params.append(status)
elif isinstance(status, (tuple, list)):
conds.append(f'{prefix}status IN ({",".join("?" * len(status))})')
Expand Down
4 changes: 2 additions & 2 deletions tests/advanced_tests/standard_test/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ def test_http(self):
]
edges = [dict(iid="__start__", oid="1"), dict(iid="1", oid="__end__")]
engine = LightEngine()
engine.start(nodes, edges)
ret = engine.run()
gid = engine.start(nodes, edges)
ret = engine.run(gid)
assert '商汤科技' in ret['content']
Loading
Loading