Skip to content

Commit

Permalink
Merge pull request #404 from HSF/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored Feb 26, 2025
2 parents 6dd6335 + 8185fa1 commit c01a122
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 76 deletions.
26 changes: 25 additions & 1 deletion atlas/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,46 @@
import sysconfig
from setuptools import setup, find_packages
from setuptools.command.install import install
from wheel.bdist_wheel import bdist_wheel


# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


EXCLUDED_PACKAGE = ["idds"]


class CustomInstallCommand(install):
"""Custom install command to exclude top-level 'idds' during installation."""
def run(self):
# Remove 'idds' from the list of packages before installation
logger.info("idds-atlas installing")
logger.info(f"self.distribution.packages: {self.distribution.packages}")
self.distribution.packages = [
pkg for pkg in self.distribution.packages if pkg != 'idds'
pkg for pkg in self.distribution.packages if pkg not in EXCLUDED_PACKAGE
]
logger.info(f"self.distribution.packages: {self.distribution.packages}")
super().run()


class CustomBdistWheel(bdist_wheel):
"""Custom wheel builder to exclude the 'idds' package but keep subpackages."""
def finalize_options(self):
# Exclude only the top-level 'idds', not its subpackages
logger.info("idds-workflow wheel installing")
logger.info(f"self.distribution.packages: {self.distribution.packages}")
included_packages = [
pkg for pkg in find_packages('lib/')
if pkg not in EXCLUDED_PACKAGE
]
self.distribution.packages = included_packages
logger.info(f"self.distribution.packages: {self.distribution.packages}")
super().finalize_options()

def run(self):
logger.info("CustomBdistWheel is running!") # Debug print
super().run()


Expand Down Expand Up @@ -121,6 +144,7 @@ def parse_requirements(requirements_files):
scripts=scripts,
cmdclass={
'install': CustomInstallCommand, # Exclude 'idds' during installation
'bdist_wheel': CustomBdistWheel,
},
project_urls={
'Documentation': 'https://github.com/HSF/iDDS/wiki',
Expand Down
28 changes: 26 additions & 2 deletions client/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,46 @@
import sysconfig
from setuptools import setup, find_packages
from setuptools.command.install import install
from wheel.bdist_wheel import bdist_wheel


# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


EXCLUDED_PACKAGE = ["idds"]


class CustomInstallCommand(install):
"""Custom install command to exclude top-level 'idds' during installation."""
def run(self):
# Remove 'idds' from the list of packages before installation
logger.info("idds-client installing")
logger.info("idds-atlas installing")
logger.info(f"self.distribution.packages: {self.distribution.packages}")
self.distribution.packages = [
pkg for pkg in self.distribution.packages if pkg != 'idds'
pkg for pkg in self.distribution.packages if pkg not in EXCLUDED_PACKAGE
]
logger.info(f"self.distribution.packages: {self.distribution.packages}")
super().run()


class CustomBdistWheel(bdist_wheel):
"""Custom wheel builder to exclude the 'idds' package but keep subpackages."""
def finalize_options(self):
# Exclude only the top-level 'idds', not its subpackages
logger.info("idds-workflow wheel installing")
logger.info(f"self.distribution.packages: {self.distribution.packages}")
included_packages = [
pkg for pkg in find_packages('lib/')
if pkg not in EXCLUDED_PACKAGE
]
self.distribution.packages = included_packages
logger.info(f"self.distribution.packages: {self.distribution.packages}")
super().finalize_options()

def run(self):
logger.info("CustomBdistWheel is running!") # Debug print
super().run()


Expand Down Expand Up @@ -121,6 +144,7 @@ def parse_requirements(requirements_files):
scripts=scripts,
cmdclass={
'install': CustomInstallCommand, # Exclude 'idds' during installation
'bdist_wheel': CustomBdistWheel,
},
project_urls={
'Documentation': 'https://github.com/HSF/iDDS/wiki',
Expand Down
28 changes: 26 additions & 2 deletions doma/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,46 @@
import sysconfig
from setuptools import setup, find_packages
from setuptools.command.install import install
from wheel.bdist_wheel import bdist_wheel


# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


EXCLUDED_PACKAGE = ["idds"]


class CustomInstallCommand(install):
"""Custom install command to exclude top-level 'idds' during installation."""
def run(self):
# Remove 'idds' from the list of packages before installation
logger.info("idds-client installing")
logger.info("idds-atlas installing")
logger.info(f"self.distribution.packages: {self.distribution.packages}")
self.distribution.packages = [
pkg for pkg in self.distribution.packages if pkg != 'idds'
pkg for pkg in self.distribution.packages if pkg not in EXCLUDED_PACKAGE
]
logger.info(f"self.distribution.packages: {self.distribution.packages}")
super().run()


class CustomBdistWheel(bdist_wheel):
"""Custom wheel builder to exclude the 'idds' package but keep subpackages."""
def finalize_options(self):
# Exclude only the top-level 'idds', not its subpackages
logger.info("idds-workflow wheel installing")
logger.info(f"self.distribution.packages: {self.distribution.packages}")
included_packages = [
pkg for pkg in find_packages('lib/')
if pkg not in EXCLUDED_PACKAGE
]
self.distribution.packages = included_packages
logger.info(f"self.distribution.packages: {self.distribution.packages}")
super().finalize_options()

def run(self):
logger.info("CustomBdistWheel is running!") # Debug print
super().run()


Expand Down Expand Up @@ -121,6 +144,7 @@ def parse_requirements(requirements_files):
scripts=scripts,
cmdclass={
'install': CustomInstallCommand, # Exclude 'idds' during installation
'bdist_wheel': CustomBdistWheel,
},
project_urls={
'Documentation': 'https://github.com/HSF/iDDS/wiki',
Expand Down
148 changes: 90 additions & 58 deletions main/lib/idds/agents/coordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,83 +224,115 @@ def insert_event(self, event):

def send(self, event):
with self._lock:
self.insert_event(event)
try:
self.insert_event(event)
except Exception as ex:
self.logger.error(f"Failed to send event: {ex}")
self.logger.error(traceback.format_exc())

def send_bulk(self, events):
with self._lock:
for event in events:
self.insert_event(event)
try:
self.insert_event(event)
except Exception as ex:
self.logger.error(f"Failed to send event: {ex}")
self.logger.error(traceback.format_exc())

def get(self, event_type, num_events=1, wait=0):
with self._lock:
events = []
for i in range(num_events):
if event_type in self.events_index:
for scheduled_priority in [EventPriority.High, EventPriority.Medium, EventPriority.Low]:
if (scheduled_priority in self.events_index[event_type] and self.events_index[event_type][scheduled_priority]):
event_id = self.events_index[event_type][scheduled_priority][0]
event = self.events[event_id]
if event.scheduled_time <= time.time():
event_id = self.events_index[event_type][scheduled_priority].pop(0)
event = self.events[event_id]
del self.events[event_id]
self.events_ids[event.get_event_id()].remove(event_id)

if event._event_type in self.accounts:
self.accounts[event._event_type]['total_queued_events'] -= 1
self.accounts[event._event_type]['total_processed_events'] += 1

self.logger.debug("Get event %s" % (event.to_json(strip=True)))
events.append(event)
try:
if event_type in self.events_index:
for scheduled_priority in [EventPriority.High, EventPriority.Medium, EventPriority.Low]:
if (scheduled_priority in self.events_index[event_type] and self.events_index[event_type][scheduled_priority]):
event_id = self.events_index[event_type][scheduled_priority][0]
if event_id not in self.events:
self.events_index[event_type][scheduled_priority].pop(0)
else:
event = self.events[event_id]
if event.scheduled_time <= time.time():
event_id = self.events_index[event_type][scheduled_priority].pop(0)
event = self.events[event_id]
del self.events[event_id]
self.events_ids[event.get_event_id()].remove(event_id)

if event._event_type in self.accounts:
self.accounts[event._event_type]['total_queued_events'] -= 1
self.accounts[event._event_type]['total_processed_events'] += 1

self.logger.debug("Get event %s" % (event.to_json(strip=True)))
events.append(event)
except Exception as ex:
self.logger.error(f"Failed to send event: {ex}")
self.logger.error(traceback.format_exc())

if not events:
if event_type in self.accounts:
if self.accounts[event_type]['total_queued_events'] == 0:
self.accounts[event_type]['lack_events'] = True
try:
if event_type in self.accounts:
if self.accounts[event_type]['total_queued_events'] == 0:
self.accounts[event_type]['lack_events'] = True
except Exception as ex:
self.logger.error(f"Failed to send event: {ex}")
self.logger.error(traceback.format_exc())

return events

def send_report(self, event, status, start_time, end_time, source, result):
event_id = event.get_event_id()
event_ret_status = status
event_name = event._event_type.name
if not event_ret_status and result:
event_ret_status = result.get("status", None)
if event_id not in self.report:
self.report[event_id] = {"status": event_ret_status,
"total_files": None,
"processed_files": None,
"event_types": {}}
self.report[event_id]['status'] = event_ret_status
self.report[event_id]['event_types'][event_name] = {'start_time': start_time,
'end_time': end_time,
'source': source,
'status': event_ret_status,
'result': result}
try:
event_id = event.get_event_id()
event_ret_status = status
event_name = event._event_type.name
if not event_ret_status and result:
event_ret_status = result.get("status", None)
if event_id not in self.report:
self.report[event_id] = {"status": event_ret_status,
"total_files": None,
"processed_files": None,
"event_types": {}}
self.report[event_id]['status'] = event_ret_status
self.report[event_id]['event_types'][event_name] = {'start_time': start_time,
'end_time': end_time,
'source': source,
'status': event_ret_status,
'result': result}
except Exception as ex:
self.logger.error(f"Failed to send event: {ex}")
self.logger.error(traceback.format_exc())

def clean_cache_info(self):
with self._lock:
event_ids = list(self.events_ids.keys())
for event_id in event_ids:
if not self.events_ids[event_id]:
del self.events_ids[event_id]

event_ids = list(self.report.keys())
for event_id in event_ids:
event_types = list(self.report[event_id]['event_types'].keys())
for event_type in event_types:
end_time = self.report[event_id]['event_types'][event_type].get('end_time', None)
if not end_time or end_time < time.time() - 86400 * 10:
del self.report[event_id]['event_types'][event_type]
if not self.report[event_id]['event_types']:
del self.report[event_id]
try:
event_ids = list(self.events_ids.keys())
for event_id in event_ids:
if not self.events_ids[event_id]:
del self.events_ids[event_id]

event_ids = list(self.report.keys())
for event_id in event_ids:
event_types = list(self.report[event_id]['event_types'].keys())
for event_type in event_types:
end_time = self.report[event_id]['event_types'][event_type].get('end_time', None)
if not end_time or end_time < time.time() - 86400 * 10:
del self.report[event_id]['event_types'][event_type]
if not self.report[event_id]['event_types']:
del self.report[event_id]
except Exception as ex:
self.logger.error(f"Failed to send event: {ex}")
self.logger.error(traceback.format_exc())

def show_queued_events(self):
if self.show_queued_events_time is None or self.show_queued_events_time + self.show_queued_events_time_interval < time.time():
self.show_queued_events_time = time.time()
for event_type in self.events_index:
self.logger.info("Number of events has processed: %s: %s" % (event_type.name, self.accounts.get(event_type, {}).get('total_processed_events', None)))
for prio in self.events_index[event_type]:
self.logger.info("Number of queued events: %s %s: %s" % (event_type.name, prio.name, len(self.events_index[event_type][prio])))
try:
if self.show_queued_events_time is None or self.show_queued_events_time + self.show_queued_events_time_interval < time.time():
self.show_queued_events_time = time.time()
for event_type in self.events_index:
self.logger.info("Number of events has processed: %s: %s" % (event_type.name, self.accounts.get(event_type, {}).get('total_processed_events', None)))
for prio in self.events_index[event_type]:
self.logger.info("Number of queued events: %s %s: %s" % (event_type.name, prio.name, len(self.events_index[event_type][prio])))
except Exception as ex:
self.logger.error(f"Failed to send event: {ex}")
self.logger.error(traceback.format_exc())

def coordinate(self):
self.select_coordinator()
Expand Down
Loading

0 comments on commit c01a122

Please sign in to comment.