diff --git a/doc/configuration.rst b/doc/configuration.rst index 909a4fe81d..ad085a3439 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -637,6 +637,18 @@ marker_table created if it doesn't already exist. Defaults to "table_updates". +[prometheus] +------------ + +use_task_family_in_labels + Should task family be used as a prometheus bucket label. + Default value is true. + +task_parameters_to_use_in_labels + List of task arguments' names used as additional prometheus bucket labels. + Passed in a form of a json list. + + [redshift] ---------- @@ -1045,6 +1057,7 @@ metric_namespace Optional prefix to add to the beginning of every metric sent to Datadog. Default value is "luigi". + Per Task Retry-Policy --------------------- diff --git a/luigi/contrib/prometheus_metric.py b/luigi/contrib/prometheus_metric.py index bcfc163f21..f2e6d7f29f 100644 --- a/luigi/contrib/prometheus_metric.py +++ b/luigi/contrib/prometheus_metric.py @@ -1,40 +1,61 @@ from prometheus_client import CollectorRegistry, Counter, Gauge, generate_latest, CONTENT_TYPE_LATEST +from luigi import parameter from luigi.metrics import MetricsCollector +from luigi.task import Config + + +class prometheus(Config): + use_task_family_in_labels = parameter.BoolParameter( + default=True, parsing=parameter.BoolParameter.EXPLICIT_PARSING + ) + task_parameters_to_use_in_labels = parameter.ListParameter(default=[]) class PrometheusMetricsCollector(MetricsCollector): - def __init__(self): + def _generate_task_labels(self, task): + return { + label: task.family if label == "family" else task.params.get(label) + for label in self.labels + } + + def __init__(self, *args, **kwargs): super(PrometheusMetricsCollector, self).__init__() self.registry = CollectorRegistry() + config = prometheus(**kwargs) + self.labels = list(config.task_parameters_to_use_in_labels) + if config.use_task_family_in_labels: + self.labels += ["family"] + if not self.labels: + raise ValueError("Prometheus labels cannot be empty (see prometheus configuration)") self.task_started_counter = Counter( 'luigi_task_started_total', 'number of started luigi tasks', - ['family'], + self.labels, registry=self.registry ) self.task_failed_counter = Counter( 'luigi_task_failed_total', 'number of failed luigi tasks', - ['family'], + self.labels, registry=self.registry ) self.task_disabled_counter = Counter( 'luigi_task_disabled_total', 'number of disabled luigi tasks', - ['family'], + self.labels, registry=self.registry ) self.task_done_counter = Counter( 'luigi_task_done_total', 'number of done luigi tasks', - ['family'], + self.labels, registry=self.registry ) self.task_execution_time = Gauge( 'luigi_task_execution_time_seconds', 'luigi task execution time in seconds', - ['family'], + self.labels, registry=self.registry ) @@ -42,22 +63,22 @@ def generate_latest(self): return generate_latest(self.registry) def handle_task_started(self, task): - self.task_started_counter.labels(family=task.family).inc() - self.task_execution_time.labels(family=task.family) + self.task_started_counter.labels(**self._generate_task_labels(task)).inc() + self.task_execution_time.labels(**self._generate_task_labels(task)) def handle_task_failed(self, task): - self.task_failed_counter.labels(family=task.family).inc() - self.task_execution_time.labels(family=task.family).set(task.updated - task.time_running) + self.task_failed_counter.labels(**self._generate_task_labels(task)).inc() + self.task_execution_time.labels(**self._generate_task_labels(task)).set(task.updated - task.time_running) def handle_task_disabled(self, task, config): - self.task_disabled_counter.labels(family=task.family).inc() - self.task_execution_time.labels(family=task.family).set(task.updated - task.time_running) + self.task_disabled_counter.labels(**self._generate_task_labels(task)).inc() + self.task_execution_time.labels(**self._generate_task_labels(task)).set(task.updated - task.time_running) def handle_task_done(self, task): - self.task_done_counter.labels(family=task.family).inc() + self.task_done_counter.labels(**self._generate_task_labels(task)).inc() # time_running can be `None` if task was already complete if task.time_running is not None: - self.task_execution_time.labels(family=task.family).set(task.updated - task.time_running) + self.task_execution_time.labels(**self._generate_task_labels(task)).set(task.updated - task.time_running) def configure_http_handler(self, http_handler): http_handler.set_header('Content-Type', CONTENT_TYPE_LATEST) diff --git a/test/contrib/prometheus_metric_test.py b/test/contrib/prometheus_metric_test.py index 6ceb7994c2..67e0feb132 100644 --- a/test/contrib/prometheus_metric_test.py +++ b/test/contrib/prometheus_metric_test.py @@ -15,18 +15,28 @@ WORKER = 'myworker' TASK_ID = 'TaskID' TASK_FAMILY = 'TaskFamily' +A_PARAM_VALUE = "1" +B_PARAM_VALUE = "2" +C_PARAM_VALUE = "3" @pytest.mark.contrib -class PrometheusMetricTest(unittest.TestCase): +class PrometheusMetricBaseTest(unittest.TestCase): + COLLECTOR_KWARGS = {} + EXPECTED_LABELS = {"family": TASK_FAMILY} + def setUp(self): - self.collector = PrometheusMetricsCollector() + self.collector = PrometheusMetricsCollector(**self.COLLECTOR_KWARGS) self.s = Scheduler(metrics_collector=MetricsCollectors.prometheus) - self.gauge_name = 'luigi_task_execution_time_seconds' - self.labels = {'family': TASK_FAMILY} + self.gauge_name = "luigi_task_execution_time_seconds" def startTask(self): - self.s.add_task(worker=WORKER, task_id=TASK_ID, family=TASK_FAMILY) + self.s.add_task( + worker=WORKER, + task_id=TASK_ID, + family=TASK_FAMILY, + params={"a": A_PARAM_VALUE, "b": B_PARAM_VALUE, "c": C_PARAM_VALUE}, + ) task = self.s._state.get_task(TASK_ID) task.time_running = 0 task.updated = 5 @@ -38,9 +48,11 @@ def test_handle_task_started(self): counter_name = 'luigi_task_started_total' gauge_name = self.gauge_name - labels = self.labels + labels = self.EXPECTED_LABELS - assert self.collector.registry.get_sample_value(counter_name, labels=self.labels) == 1 + assert ( + self.collector.registry.get_sample_value(counter_name, labels=labels) == 1 + ) assert self.collector.registry.get_sample_value(gauge_name, labels=labels) == 0 def test_handle_task_failed(self): @@ -49,7 +61,7 @@ def test_handle_task_failed(self): counter_name = 'luigi_task_failed_total' gauge_name = self.gauge_name - labels = self.labels + labels = self.EXPECTED_LABELS assert self.collector.registry.get_sample_value(counter_name, labels=labels) == 1 assert self.collector.registry.get_sample_value(gauge_name, labels=labels) == task.updated - task.time_running @@ -60,7 +72,7 @@ def test_handle_task_disabled(self): counter_name = 'luigi_task_disabled_total' gauge_name = self.gauge_name - labels = self.labels + labels = self.EXPECTED_LABELS assert self.collector.registry.get_sample_value(counter_name, labels=labels) == 1 assert self.collector.registry.get_sample_value(gauge_name, labels=labels) == task.updated - task.time_running @@ -71,7 +83,7 @@ def test_handle_task_done(self): counter_name = 'luigi_task_done_total' gauge_name = self.gauge_name - labels = self.labels + labels = self.EXPECTED_LABELS assert self.collector.registry.get_sample_value(counter_name, labels=labels) == 1 assert self.collector.registry.get_sample_value(gauge_name, labels=labels) == task.updated - task.time_running @@ -80,3 +92,21 @@ def test_configure_http_handler(self): mock_http_handler = mock.MagicMock() self.collector.configure_http_handler(mock_http_handler) mock_http_handler.set_header.assert_called_once_with('Content-Type', CONTENT_TYPE_LATEST) + + +@pytest.mark.contrib +class PrometheusMetricTaskParamsOnlyTest(PrometheusMetricBaseTest): + COLLECTOR_KWARGS = { + "use_task_family_in_labels": False, + "task_parameters_to_use_in_labels": ["a", "c"], + } + EXPECTED_LABELS = {"a": A_PARAM_VALUE, "c": C_PARAM_VALUE} + + +@pytest.mark.contrib +class PrometheusMetricTaskFamilyAndTaskParamsTest(PrometheusMetricBaseTest): + COLLECTOR_KWARGS = { + "use_task_family_in_labels": True, + "task_parameters_to_use_in_labels": ["b"], + } + EXPECTED_LABELS = {"family": TASK_FAMILY, "b": B_PARAM_VALUE}