Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: Remember failed workflows/td files and run them first in CI #31263

Merged
merged 1 commit into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion misc/python/materialize/ci_util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def upload_junit_report(suite: str, junit_report: Path) -> None:
"""
if not buildkite.is_in_buildkite():
return
ui.section(f"Uploading report for suite {suite!r} to Buildkite Test Analytics")
suite = suite.upper().replace("-", "_")
token = os.getenv(f"BUILDKITE_TEST_ANALYTICS_API_KEY_{suite}")
if not token:
Expand Down
51 changes: 49 additions & 2 deletions misc/python/materialize/mzcompose/composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@
from contextlib import contextmanager
from inspect import Traceback, getframeinfo, getmembers, isfunction, stack
from tempfile import TemporaryFile
from typing import Any, TextIO, cast
from typing import Any, TextIO, TypeVar, cast

import psycopg
import sqlparse
import yaml
from psycopg import Connection, Cursor

from materialize import MZ_ROOT, mzbuild, spawn, ui
from materialize import MZ_ROOT, buildkite, mzbuild, spawn, ui
from materialize.mzcompose import cluster_replica_size_map, loader
from materialize.mzcompose.service import Service
from materialize.mzcompose.services.materialized import (
Expand Down Expand Up @@ -1564,3 +1564,50 @@ def cloud_hostname(self, quiet: bool = False) -> str:
# It is necessary to append the 'https://' protocol; otherwise, urllib can't parse it correctly.
cloud_hostname = urllib.parse.urlparse("https://" + cloud_url).hostname
return str(cloud_hostname)

T = TypeVar("T")

def test_parts(self, parts: list[T], process_func: Callable[[T], Any]) -> None:
from materialize.test_analytics.config.test_analytics_db_config import (
create_test_analytics_config,
)
from materialize.test_analytics.test_analytics_db import TestAnalyticsDb

priority: dict[str, int] = {}
test_analytics: TestAnalyticsDb | None = None

if buildkite.is_in_buildkite():
print("~~~ Fetching part priorities")
test_analytics_config = create_test_analytics_config(self)
test_analytics = TestAnalyticsDb(test_analytics_config)
try:
priority = test_analytics.builds.get_part_priorities(timeout=15)
print(f"Priorities: {priority}")
except Exception as e:
print(f"Failed to fetch part priorities, using default order: {e}")

sorted_parts = sorted(
parts, key=lambda part: priority.get(str(part), 0), reverse=True
)
exceptions: list[Exception] = []

try:
for part in sorted_parts:
try:
process_func(part)
except Exception as e:
if buildkite.is_in_buildkite():
assert test_analytics
test_analytics.builds.add_build_job_failure(str(part))
# raise
# We could also keep running, but then runtime is still
# slow when a test fails, and the annotation only shows up
# after the test finished:
exceptions.append(e)
Comment on lines +1602 to +1606
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is what I'm unsure about in this PR. We collect the test failures at the end of the test run and only then mark the test as failed and write the annotation. Changing this would be a really major change.

So we currently have two options:

  • Stop the run after the first part (workflow, td/slt file) failed, and then immediately get feedback on that one, but not on the remaining parts.
  • Keep running all parts, and get later feedback with all.
    I'm currently going with the second approach, but that means you only notice an early failure if you check the logs manually.

So I don't see a way around changing the annotation logic to be able to annotate during runs already, if we want faster feedback and still run all parts after failures. Opinions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth running all tests, to avoid death by a thousand cuts.

If someone cancels a workflow, can we still collect any failures that happened so far?

Copy link
Contributor Author

@def- def- Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, perfect. I already tend to watch CI for early failures and can cancel as necessary, so this seems like the best option. Nice!

finally:
if buildkite.is_in_buildkite():
assert test_analytics
test_analytics.database_connector.submit_update_statements()
if exceptions:
print(f"Further exceptions were raised:\n{exceptions[1:]}")
raise exceptions[0]
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,56 @@ def update_build_job_success(
)

self.database_connector.add_update_statements(sql_statements)

def add_build_job_failure(
self,
part: str,
) -> None:
job_id = buildkite.get_var(BuildkiteEnvVar.BUILDKITE_JOB_ID)

sql_statements = []
sql_statements.append(
f"""
INSERT INTO build_job_failure
(
build_job_id,
part
)
VALUES
(
{as_sanitized_literal(job_id)},
{as_sanitized_literal(part)}
)
"""
)

self.database_connector.add_update_statements(sql_statements)

def get_part_priorities(self, timeout: int) -> dict[str, int]:
branch = buildkite.get_var(BuildkiteEnvVar.BUILDKITE_BRANCH)
build_step_key = buildkite.get_var(BuildkiteEnvVar.BUILDKITE_STEP_KEY)
with self.database_connector.create_cursor() as cur:
cur.execute('SET cluster = "test_analytics"')
cur.execute(f"SET statement_timeout = '{timeout}s'".encode("utf-8"))
# 2 for failures in this PR
# 1 for failed recently in CI
cur.execute(
f"""
SELECT part, MAX(prio)
FROM (
SELECT part, 2 AS prio
FROM mv_build_job_failed_on_branch
WHERE branch = {as_sanitized_literal(branch)}
AND build_step_key = {as_sanitized_literal(build_step_key)}
UNION
SELECT part, 1 AS prio
FROM mv_build_job_failed
WHERE build_step_key = {as_sanitized_literal(build_step_key)}
)
GROUP BY part;
""".encode(
"utf-8"
)
)
results = cur.fetchall()
return {part: prio for part, prio in results}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- Copyright Materialize, Inc. and contributors. All rights reserved.
--
-- Use of this software is governed by the Business Source License
-- included in the LICENSE file at the root of this repository.
--
-- As of the Change Date specified in that file, in accordance with
-- the Business Source License, use of this software will be governed
-- by the Apache License, Version 2.0.

-- part of a build job that failed, can be a testdrive file or a workflow
CREATE TABLE build_job_failure (
build_job_id TEXT NOT NULL,
part TEXT NOT NULL
);

ALTER TABLE build_job_failure OWNER TO qa;
GRANT SELECT, INSERT, UPDATE ON TABLE build_job_failure TO "hetzner-ci";
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- Copyright Materialize, Inc. and contributors. All rights reserved.
--
-- Use of this software is governed by the Business Source License
-- included in the LICENSE file at the root of this repository.
--
-- As of the Change Date specified in that file, in accordance with
-- the Business Source License, use of this software will be governed
-- by the Apache License, Version 2.0.

CREATE OR REPLACE MATERIALIZED VIEW mv_build_job_failed_on_branch AS
SELECT build_step_key, branch, part
FROM build
JOIN build_job ON build.build_id = build_job.build_id
JOIN build_job_failure ON build_job.build_job_id = build_job_failure.build_job_id;

CREATE OR REPLACE MATERIALIZED VIEW mv_build_job_failed AS
SELECT build_step_key, part
FROM build_job
JOIN build_job_failure ON build_job.build_job_id = build_job_failure.build_job_id;

CREATE DEFAULT INDEX ON mv_build_job_failed_on_branch;
CREATE DEFAULT INDEX ON mv_build_job_failed;

ALTER MATERIALIZED VIEW mv_build_job_failed_on_branch OWNER TO qa;
GRANT SELECT ON TABLE mv_build_job_failed_on_branch TO "hetzner-ci";
ALTER MATERIALIZED VIEW mv_build_job_failed OWNER TO qa;
GRANT SELECT ON TABLE mv_build_job_failed TO "hetzner-ci";
13 changes: 9 additions & 4 deletions test/0dt/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
Materialized,
)
from materialize.mzcompose.services.mysql import MySql
from materialize.mzcompose.services.mz import Mz
from materialize.mzcompose.services.postgres import (
CockroachOrPostgresMetadata,
Postgres,
Expand All @@ -49,6 +50,7 @@
Kafka(),
SchemaRegistry(),
CockroachOrPostgresMetadata(),
Mz(app_password=""),
Materialized(
name="mz_old",
sanity_restart=False,
Expand Down Expand Up @@ -77,14 +79,17 @@


def workflow_default(c: Composition) -> None:
for name in buildkite.shard_list(
list(c.workflows.keys()), lambda workflow: workflow
):
def process(name: str) -> None:
if name == "default":
continue
return
with c.test_case(name):
c.workflow(name)

workflows = buildkite.shard_list(
list(c.workflows.keys()), lambda workflow: workflow
)
c.test_parts(workflows, process)


def workflow_read_only(c: Composition) -> None:
"""Verify read-only mode."""
Expand Down
7 changes: 6 additions & 1 deletion test/aws-localstack/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)
from materialize.mzcompose.services.localstack import Localstack
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.mz import Mz
from materialize.mzcompose.services.testdrive import Testdrive

ENVIRONMENT_NAME = f"environment-{DEFAULT_ORG_ID}-{DEFAULT_ORDINAL}"
Expand All @@ -43,6 +44,7 @@

SERVICES = [
Localstack(),
Mz(app_password=""),
Materialized(
depends_on=["localstack"],
environment_extra=[
Expand All @@ -66,10 +68,13 @@


def workflow_default(c: Composition) -> None:
for name in ["secrets-manager", "aws-connection", "copy-to-s3"]:
def process(name: str) -> None:
with c.test_case(name):
c.workflow(name)

workflows = ["secrets-manager", "aws-connection", "copy-to-s3"]
c.test_parts(workflows, process)


def workflow_secrets_manager(c: Composition) -> None:
c.up("localstack")
Expand Down
9 changes: 7 additions & 2 deletions test/balancerd/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from materialize.mzcompose.services.balancerd import Balancerd
from materialize.mzcompose.services.frontegg import FronteggMock
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.mz import Mz
from materialize.mzcompose.services.test_certs import TestCerts
from materialize.mzcompose.services.testdrive import Testdrive

Expand Down Expand Up @@ -118,6 +119,7 @@ def app_password(email: str) -> str:
"secrets:/secrets",
],
),
Mz(app_password=""),
Materialized(
options=[
# Enable TLS on the public port to verify that balancerd is connecting to the balancerd
Expand Down Expand Up @@ -185,11 +187,14 @@ def pg8000_sql_cursor(
def workflow_default(c: Composition) -> None:
c.down(destroy_volumes=True)

for name in c.workflows:
def process(name: str) -> None:
if name in ["default", "plaintext"]:
continue
return
with c.test_case(name):
c.workflow(name)

c.test_parts(list(c.workflows.keys()), process)

with c.test_case("plaintext"):
c.workflow("plaintext")

Expand Down
12 changes: 5 additions & 7 deletions test/bounded-memory/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -1091,16 +1091,14 @@ class KafkaScenario(Scenario):


def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
for name in c.workflows:
if name == "default":
continue

if name == "minimization-search":
continue

def process(name: str) -> None:
if name in ["default", "minimization-search"]:
return
with c.test_case(name):
c.workflow(name)

c.test_parts(list(c.workflows.keys()), process)


def workflow_main(c: Composition, parser: WorkflowArgumentParser) -> None:
"""Process various datasets in a memory-constrained environment in order
Expand Down
9 changes: 6 additions & 3 deletions test/chbench/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.metabase import Metabase
from materialize.mzcompose.services.mysql import MySql
from materialize.mzcompose.services.mz import Mz
from materialize.mzcompose.services.schema_registry import SchemaRegistry
from materialize.mzcompose.services.zookeeper import Zookeeper

Expand All @@ -28,6 +29,7 @@
SchemaRegistry(),
Debezium(),
MySql(root_password="rootpw"),
Mz(app_password=""),
Materialized(),
Metabase(),
Service(
Expand All @@ -42,13 +44,14 @@


def workflow_default(c: Composition) -> None:
for name in c.workflows:
def process(name: str) -> None:
if name == "default":
continue

return
with c.test_case(name):
c.workflow(name)

c.test_parts(list(c.workflows.keys()), process)


def workflow_no_load(c: Composition, parser: WorkflowArgumentParser) -> None:
"""Run CH-benCHmark without any load on Materialize"""
Expand Down
9 changes: 6 additions & 3 deletions test/cloud-canary/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:

args = parser.parse_args()

globs = list(
files = list(
itertools.chain.from_iterable(
[
glob.glob(file_glob, root_dir=MZ_ROOT / "test" / "cloud-canary")
Expand Down Expand Up @@ -275,15 +275,18 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
# Takes about 40 min to spin up
redpanda = (
Redpanda(c, cleanup=args.cleanup)
if any(["redpanda" in filename for filename in globs])
if any(["redpanda" in filename for filename in files])
else None
)

try:
print("Running .td files ...")
td(c, text="> CREATE CLUSTER canary_sources SIZE '25cc'")
for filename in globs:

def process(filename: str) -> None:
td(c, filename, redpanda=redpanda)

c.test_parts(files, process)
test_failed = False
finally:
if args.cleanup and redpanda is not None:
Expand Down
Loading