Skip to content

Commit

Permalink
[DPE-3167] 5/edge: Correct retry before initialising users and update…
Browse files Browse the repository at this point in the history
… CI (#230)

* Update release workflow for 5/edge:

use data platform workflows

* Remove unused release.yaml file

* Update version of used datap platform workflows to version 6

* Update user creation logic

* Update .github/workflows/release_5_edge.yaml

Co-authored-by: Carl Csaposs <[email protected]>

* Use constant instead of hardcoded number

* Fix lint erros

* Fix unit tests

* WIP

* Update use creation

* WIP: Fix user creation and unit tests

* fix unit tests

* Fix HA tests

* Increas timeout before check for a

cluster status after the network was restored

* Fix lint errors

* Pin charmcaft revision

* Update workflow versions to lates

* Fix indentation in build scripts

* Pin charmcraft version

* Address review comments

---------

Co-authored-by: Ubuntu <[email protected]>
Co-authored-by: Carl Csaposs <[email protected]>
Co-authored-by: Mia Altieri <[email protected]>
  • Loading branch information
4 people authored Dec 14, 2023
1 parent f3ccf7d commit eba4782
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 49 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ jobs:
matrix:
charms: [".", "tests/integration/ha_tests/application_charm", "tests/integration/relation_tests/application-charm"]
name: Build ${{matrix.charms}} charm
uses: canonical/data-platform-workflows/.github/workflows/build_charm_without_cache.yaml@v5
uses: canonical/data-platform-workflows/.github/workflows/build_charm_without_cache.yaml@v8
with:
charmcraft-snap-channel: "latest/edge"
charmcraft-snap-revision: 1349
path-to-charm-directory: ${{matrix.charms}}


Expand Down
37 changes: 19 additions & 18 deletions .github/workflows/release_5_edge.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,22 @@ jobs:
- lib-check
uses: ./.github/workflows/ci.yaml

release-to-charmhub:
name: Release to CharmHub
needs:
- lib-check
- ci-tests
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Upload charm to charmhub
uses: canonical/charming-actions/[email protected]
with:
credentials: "${{ secrets.CHARMHUB_TOKEN }}"
github-token: "${{ secrets.GITHUB_TOKEN }}"
channel: "5/edge"
build:
name: Build charm
uses: canonical/data-platform-workflows/.github/workflows/build_charm_without_cache.yaml@v8
with:
charmcraft-snap-revision: 1349

release:
name: Release charm
needs:
- ci-tests
- build
uses: canonical/data-platform-workflows/.github/workflows/release_charm.yaml@v8
with:
channel: 5/edge
artifact-name: ${{ needs.build.outputs.artifact-name }}
secrets:
charmhub-token: ${{ secrets.CHARMHUB_TOKEN }}
permissions:
contents: write # Needed to create GitHub release
2 changes: 1 addition & 1 deletion .github/workflows/sync_issue_to_jira.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
jobs:
sync:
name: Sync GitHub issue to Jira
uses: canonical/data-platform-workflows/.github/workflows/sync_issue_to_jira.yaml@v2
uses: canonical/data-platform-workflows/.github/workflows/sync_issue_to_jira.yaml@v8
with:
jira-base-url: https://warthogs.atlassian.net
jira-project-key: DPE
Expand Down
27 changes: 18 additions & 9 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
ServiceInfo,
)
from pymongo.errors import PyMongoError
from tenacity import before_log, retry, stop_after_attempt, wait_fixed
from tenacity import RetryError, before_log, retry, stop_after_attempt, wait_fixed

from config import Config
from exceptions import AdminUserCreationError, MissingSecretError
Expand All @@ -76,6 +76,10 @@
UNIT_SCOPE = Config.Relations.UNIT_SCOPE
Scopes = Config.Relations.Scopes

USER_CREATING_MAX_ATTEMPTS = 5
USER_CREATION_COOLDOWN = 30
REPLICA_SET_INIT_CHECK_TIMEOUT = 10


class MongoDBCharm(CharmBase):
"""A Juju Charm to deploy MongoDB on Kubernetes."""
Expand Down Expand Up @@ -402,7 +406,12 @@ def _on_start(self, event) -> None:
return

self._initialise_replica_set(event)
self._initialise_users(event)
try:
self._initialise_users(event)
except RetryError:
logger.error("Failed to initialise users. Deferring start event.")
event.defer()
return

# mongod is now active
self.unit.status = ActiveStatus()
Expand Down Expand Up @@ -607,9 +616,8 @@ def _on_secret_changed(self, event):

# BEGIN: user management
@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(5),
reraise=True,
stop=stop_after_attempt(USER_CREATING_MAX_ATTEMPTS),
wait=wait_fixed(USER_CREATION_COOLDOWN),
before=before_log(logger, logging.DEBUG),
)
def _initialise_users(self, event: StartEvent) -> None:
Expand All @@ -631,7 +639,6 @@ def _initialise_users(self, event: StartEvent) -> None:
return

logger.info("User initialization")

try:
self._init_operator_user()
self._init_backup_user()
Expand All @@ -641,12 +648,14 @@ def _initialise_users(self, event: StartEvent) -> None:
self.users_initialized = True
except ExecError as e:
logger.error("Deferring on_start: exit code: %i, stderr: %s", e.exit_code, e.stderr)
event.defer()
return
raise # we need to raise to make retry work
except PyMongoError as e:
logger.error("Deferring on_start since: error=%r", e)
raise # we need to raise to make retry work
except AdminUserCreationError:
logger.error("Deferring on_start: Failed to create operator user.")
event.defer()
return
raise # we need to raise to make retry work

@retry(
stop=stop_after_attempt(3),
Expand Down
44 changes: 33 additions & 11 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,15 @@ async def mongod_ready(ops_test: OpsTest, unit: int) -> bool:


async def get_replica_set_primary(
ops_test: OpsTest, excluded: List[str] = [], application_name=APP_NAME
ops_test: OpsTest,
excluded: List[str] = [],
application_name=APP_NAME,
use_subprocess_to_get_password=False,
) -> Optional[Unit]:
"""Returns the primary unit name based no the replica set host."""
with await get_mongo_client(ops_test, excluded) as client:
with await get_mongo_client(
ops_test, excluded, use_subprocess_to_get_password=use_subprocess_to_get_password
) as client:
data = client.admin.command("replSetGetStatus")
unit_name = host_to_unit(primary_host(data))

Expand All @@ -347,35 +352,50 @@ async def count_primaries(ops_test: OpsTest) -> int:
return len([member for member in data["members"] if member["stateStr"] == "PRIMARY"])


async def fetch_replica_set_members(ops_test: OpsTest) -> List[str]:
async def fetch_replica_set_members(
ops_test: OpsTest, use_subprocess_to_get_password=False
) -> List[str]:
"""Fetches the hosts listed as replica set members in the MongoDB replica set configuration.
Args:
ops_test: reference to deployment.
use_subprocess_to_get_password: whether to use subprocess to get password.
"""
# connect to replica set uri
# get ips from MongoDB replica set configuration
with await get_mongo_client(ops_test) as client:
with await get_mongo_client(
ops_test, use_subprocess_to_get_password=use_subprocess_to_get_password
) as client:
data = client.admin.command("replSetGetConfig")

return [member["host"].split(":")[0] for member in data["config"]["members"]]


async def get_direct_mongo_client(ops_test: OpsTest, unit: str) -> MongoClient:
async def get_direct_mongo_client(
ops_test: OpsTest, unit: str, use_subprocess_to_get_password=False
) -> MongoClient:
"""Returns a direct mongodb client to specific unit."""
return MongoClient(
await mongodb_uri(ops_test, [int(unit.split("/")[1])]), directConnection=True
url = await mongodb_uri(
ops_test,
[int(unit.split("/")[1])],
use_subprocess_to_get_password=use_subprocess_to_get_password,
)
return MongoClient(url, directConnection=True)


async def get_mongo_client(ops_test: OpsTest, excluded: List[str] = []) -> MongoClient:
async def get_mongo_client(
ops_test: OpsTest, excluded: List[str] = [], use_subprocess_to_get_password=False
) -> MongoClient:
"""Returns a direct mongodb client potentially passing over some of the units."""
mongodb_name = await get_application_name(ops_test, APP_NAME)
for unit in ops_test.model.applications[mongodb_name].units:
if unit.name not in excluded and unit.workload_status == "active":
return MongoClient(
await mongodb_uri(ops_test, [int(unit.name.split("/")[1])]), directConnection=True
url = await mongodb_uri(
ops_test,
[int(unit.name.split("/")[1])],
use_subprocess_to_get_password=use_subprocess_to_get_password,
)
return MongoClient(url, directConnection=True)
assert False, "No fitting unit could be found"


Expand Down Expand Up @@ -657,7 +677,9 @@ async def wait_until_unit_in_status(
ops_test: OpsTest, unit_to_check: Unit, online_unit: Unit, status: str
) -> None:
"""Waits until a replica is in the provided status as reported by MongoDB or timeout occurs."""
with await get_direct_mongo_client(ops_test, online_unit.name) as client:
with await get_direct_mongo_client(
ops_test, online_unit.name, use_subprocess_to_get_password=True
) as client:
data = client.admin.command("replSetGetStatus")

for member in data["members"]:
Expand Down
14 changes: 11 additions & 3 deletions tests/integration/ha_tests/test_ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,28 +559,36 @@ async def test_network_cut(ops_test: OpsTest, continuous_writes, chaos_mesh):

# Wait until Mongodb actually detects isolated instance
logger.info(f"Waiting until Mongodb detects primary instance {primary.name} is not reachable")

await wait_until_unit_in_status(ops_test, primary, active_unit, "(not reachable/healthy)")

# verify new writes are continuing by counting the number of writes before and after a 5 second
# wait
logger.info("Validating writes are continuing to DB")
with await get_mongo_client(ops_test, excluded=[primary.name]) as client:
with await get_mongo_client(
ops_test, excluded=[primary.name], use_subprocess_to_get_password=True
) as client:
writes = client[TEST_DB][TEST_COLLECTION].count_documents({})
time.sleep(5)
more_writes = client[TEST_DB][TEST_COLLECTION].count_documents({})
assert more_writes > writes, "writes not continuing to DB"

# verify that a new primary got elected, old primary is still cut off
new_primary = await get_replica_set_primary(ops_test, excluded=[primary.name])
new_primary = await get_replica_set_primary(
ops_test, excluded=[primary.name], use_subprocess_to_get_password=True
)
assert new_primary.name != primary.name

# Remove networkchaos policy isolating instance from cluster
remove_instance_isolation(ops_test)

# we need to give juju some time to realize that the instance is back online
time.sleep(60)

await wait_until_unit_in_status(ops_test, primary, active_unit, "SECONDARY")

# verify presence of primary, replica set member configuration, and number of primaries
member_hosts = await fetch_replica_set_members(ops_test)
member_hosts = await fetch_replica_set_members(ops_test, use_subprocess_to_get_password=True)
assert set(member_hosts) == set(hostnames)
assert (
await count_primaries(ops_test) == 1
Expand Down
102 changes: 98 additions & 4 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import logging
import math
import subprocess
from datetime import datetime
from pathlib import Path
from random import choices
Expand Down Expand Up @@ -94,14 +95,17 @@ async def get_mongo_cmd(ops_test: OpsTest, unit_name: str):
return mongo_cmd


async def mongodb_uri(ops_test: OpsTest, unit_ids: List[int] = None) -> str:
async def mongodb_uri(
ops_test: OpsTest, unit_ids: List[int] = None, use_subprocess_to_get_password=False
) -> str:
if unit_ids is None:
unit_ids = UNIT_IDS

addresses = [await get_address_of_unit(ops_test, unit_id) for unit_id in unit_ids]
hosts = ",".join(addresses)
password = await get_password(ops_test, unit_id=0)

if use_subprocess_to_get_password:
password = get_password_using_subprocess(ops_test)
else:
password = await get_password(ops_test, 0)
return f"mongodb://operator:{password}@{hosts}/admin"


Expand Down Expand Up @@ -344,3 +348,93 @@ async def get_secret_content(ops_test, secret_id) -> Dict[str, str]:
_, stdout, _ = await ops_test.juju(*complete_command.split())
data = json.loads(stdout)
return data[secret_id]["content"]["Data"]


def create_pod_if_not_exists(namespace, pod_name, container_name, image_name):
"""Create a pod if not already exists."""
logger.info("Checking or creating helper mongo pod ...")
get_pod_cmd = f"kubectl get pod {pod_name} -n {namespace} -o json"
result = subprocess.run(get_pod_cmd, shell=True, capture_output=True, text=True)

if result.returncode == 0:
logger.info(f"pod '{pod_name}' in namespace '{namespace}' already exists.")
return

if "NotFound" in result.stderr:
pod_manifest = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {"name": pod_name, "namespace": namespace},
"spec": {
"restartPolicy": "Never",
"containers": [
{
"name": container_name,
"image": image_name,
"command": ["/bin/bash"],
"stdin": True,
"tty": True,
}
],
},
}

pod_manifest_json = json.dumps(pod_manifest)

create_pod_cmd = f"echo '{pod_manifest_json}' | kubectl apply -f -"
create_result = subprocess.run(create_pod_cmd, shell=True, capture_output=True, text=True)

if create_result.returncode == 0:
logger.info(f"pod '{pod_name}' created in namespace '{namespace}'.")
else:
logger.error(f"Failed to create pod: {create_result.stderr}")
else:
logger.error(f"Failed to check pod existence: {result.stderr}")


def is_pod_ready(namespace, pod_name):
"""Checks that the pod is ready."""
get_pod_cmd = f"kubectl get pod {pod_name} -n {namespace} -o json"
result = subprocess.run(get_pod_cmd, shell=True, capture_output=True, text=True)
logger.info(f"Checking pod {pod_name} is ready...")
if result.returncode != 0:
return False

pod_info = json.loads(result.stdout)
for condition in pod_info["status"].get("conditions", []):
if condition["type"] == "Ready" and condition["status"] == "True":
return True
return False


@retry(
stop=stop_after_attempt(5),
wait=wait_fixed(30),
reraise=True,
)
def get_password_using_subprocess(ops_test: OpsTest, username="operator") -> str:
"""Use the charm action to retrieve the password from provided unit.
Returns:
String with the password stored on the peer relation databag.
"""
cmd = ["juju", "switch", ops_test.model_name]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
logger.error(
"Failed to get password. Can't switch to juju model: '%s'. Error '%s'",
ops_test.model_name,
result.stderr,
)
raise Exception(f"Failed to get password: {result.stderr}")
cmd = ["juju", "run", f"{APP_NAME}/leader", "get-password", f"username={username}"]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
logger.error("get-password command returned non 0 exit code: %s", result.stderr)
raise Exception(f"get-password command returned non 0 exit code: {result.stderr}")
try:
password = result.stdout.decode("utf-8").split("password:")[-1].strip()
except Exception as e:
logger.error("Failed to get password: %s", e)
raise Exception(f"Failed to get password: {e}")
return password
Loading

0 comments on commit eba4782

Please sign in to comment.