From bb8c8c820534e2ef60b3f3a7da89d66c5e9158d3 Mon Sep 17 00:00:00 2001 From: Vojtech Trefny Date: Fri, 19 Jan 2024 15:41:10 +0100 Subject: [PATCH] Add a new function to check if a filesystem is empty This will be used by Anaconda to determine whether a given filesystem can be used for / without reformating. --- blivet/devicelibs/btrfs.py | 21 ++++++ blivet/formats/fs.py | 64 +++++++++++++++++++ .../storage_tests/devices_test/btrfs_test.py | 61 ++++++++++++++++++ tests/storage_tests/formats_test/fs_test.py | 34 ++++++++++ 4 files changed, 180 insertions(+) diff --git a/blivet/devicelibs/btrfs.py b/blivet/devicelibs/btrfs.py index 8ef989675..0e9fb5fcf 100644 --- a/blivet/devicelibs/btrfs.py +++ b/blivet/devicelibs/btrfs.py @@ -20,7 +20,15 @@ # Author(s): David Lehman # +import os + +import gi +gi.require_version("BlockDev", "3.0") + +from gi.repository import BlockDev + from . import raid +from ..errors import BTRFSError from ..size import Size from ..tasks import availability @@ -45,3 +53,16 @@ def is_btrfs_name_valid(name): return '\x00' not in name + + +def get_mountpoint_subvolumes(mountpoint): + """ Get list of subvolume names on given mounted btrfs filesystem + """ + if not os.path.ismount(mountpoint): + raise ValueError("%s doesn't seem to be a mountpoint" % mountpoint) + try: + subvols = BlockDev.btrfs.list_subvolumes(mountpoint) + except BlockDev.BtrfsError as e: + raise BTRFSError(str(e)) + else: + return [s.path for s in subvols] diff --git a/blivet/formats/fs.py b/blivet/formats/fs.py index 9148f4ed6..932307a58 100644 --- a/blivet/formats/fs.py +++ b/blivet/formats/fs.py @@ -28,6 +28,7 @@ import uuid as uuid_mod import random import stat +from contextlib import contextmanager from parted import fileSystemType, PARTITION_BOOT @@ -61,6 +62,7 @@ from ..i18n import N_ from .. import udev from ..mounts import mounts_cache +from ..devicelibs import btrfs from .fslib import kernel_filesystems, FSResize @@ -97,6 +99,9 @@ class FS(DeviceFormat): # support for resize: grow/shrink, online/offline _resize_support = 0 + # directories that even a newly created empty filesystem can contain (e.g. lost+found) + _system_dirs = [] + config_actions_map = {"label": "write_label", "mountpoint": "change_mountpoint"} @@ -557,6 +562,49 @@ def test_mount(self): return ret + @contextmanager + def _do_temp_mount(self): + if not self.exists: + raise FSError("format doesn't exist") + + if not self.mountable: + raise FSError("format cannot be mounted") + + if not self.device: + raise FSError("no device associated with the format") + + if self.status: + yield self.system_mountpoint + + else: + tmpdir = tempfile.mkdtemp(prefix="blivet-tmp.%s" % os.path.basename(self.device)) + try: + util.mount(device=self.device, mountpoint=tmpdir, fstype=self.type, + options=self.mountopts) + except FSError as e: + log.debug("temp mount failed: %s", e) + raise + + try: + yield tmpdir + finally: + util.umount(mountpoint=tmpdir) + os.rmdir(tmpdir) + + @property + def is_empty(self): + """ Check whether this filesystem os empty or not + + Note: If the filesystem is not mounted, this will temporarily mount it + to a temporary directory. + """ + + with self._do_temp_mount() as mnt: + content = os.listdir(mnt) + if content and not all(c in self._system_dirs for c in content): + return False + return True + def _pre_setup(self, **kwargs): """ Check to see if the filesystem should be mounted. @@ -893,6 +941,7 @@ class Ext2FS(FS): _writeuuid_class = fswriteuuid.Ext2FSWriteUUID parted_system = fileSystemType["ext2"] _metadata_size_factor = 0.93 # ext2 metadata may take 7% of space + _system_dirs = ["lost+found"] def _post_setup(self, **kwargs): super(Ext2FS, self)._post_setup(**kwargs) @@ -1037,6 +1086,21 @@ def container_uuid(self): def container_uuid(self, uuid): self.vol_uuid = uuid + @property + def is_empty(self): + """ Check whether this filesystem os empty or not + + Note: If the filesystem is not mounted, this will temporarily mount it + to a temporary directory. + """ + + with self._do_temp_mount() as mnt: + content = os.listdir(mnt) + subvols = btrfs.get_mountpoint_subvolumes(mnt) + if content and not all(c in self._system_dirs + subvols for c in content): + return False + return True + register_device_format(BTRFS) diff --git a/tests/storage_tests/devices_test/btrfs_test.py b/tests/storage_tests/devices_test/btrfs_test.py index 7f981dfe2..ab693653b 100644 --- a/tests/storage_tests/devices_test/btrfs_test.py +++ b/tests/storage_tests/devices_test/btrfs_test.py @@ -1,4 +1,5 @@ import os +import tempfile import unittest from ..storagetestcase import StorageTestCase @@ -152,3 +153,63 @@ def test_btrfs_raid_raid0(self): def test_btrfs_raid_raid1(self): self._test_btrfs_raid(blivet.devicelibs.raid.RAID1) + + def test_btrfs_fs_is_empty(self): + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + + self.storage.initialize_disk(disk) + + part = self.storage.new_partition(size=blivet.size.Size("1 GiB"), fmt_type="btrfs", + parents=[disk]) + self.storage.create_device(part) + + blivet.partitioning.do_partitioning(self.storage) + + vol = self.storage.new_btrfs(name=self.volname, parents=[part]) + self.storage.create_device(vol) + + self.assertIsNotNone(vol.uuid) + + sub1 = self.storage.new_btrfs_sub_volume(parents=[vol], name="blivetTestSubVol1") + self.storage.create_device(sub1) + + sub2 = self.storage.new_btrfs_sub_volume(parents=[vol], name="blivetTestSubVol2") + self.storage.create_device(sub2) + + sub3 = self.storage.new_btrfs_sub_volume(parents=[sub2], name="blivetTestSubVol2/blivetTestSubVol3") + self.storage.create_device(sub3) + + self.storage.do_it() + self.storage.reset() + self.storage.reset() + + vol = self.storage.devicetree.get_device_by_name(self.volname) + self.assertIsNotNone(vol) + + self.assertTrue(vol.format.is_empty) + for sub in vol.subvolumes: + self.assertTrue(sub.format.is_empty) + + # create a new directory in the second subvolume + with tempfile.TemporaryDirectory() as mountpoint: + vol.format.mount(mountpoint=mountpoint) + os.makedirs(os.path.join(mountpoint, "blivetTestSubVol2/test")) + vol.format.unmount() + + self.assertTrue(vol.format.is_empty) + + # first subvolume is empty + sub1 = self.storage.devicetree.get_device_by_name("blivetTestSubVol1") + self.assertIsNotNone(sub1) + self.assertTrue(sub1.format.is_empty) + + # second subvolume is NOT empty + sub2 = self.storage.devicetree.get_device_by_name("blivetTestSubVol2") + self.assertIsNotNone(sub2) + self.assertFalse(sub2.format.is_empty) + + # third subvolume is also empty + sub3 = self.storage.devicetree.get_device_by_name("blivetTestSubVol2/blivetTestSubVol3") + self.assertIsNotNone(sub3) + self.assertTrue(sub3.format.is_empty) diff --git a/tests/storage_tests/formats_test/fs_test.py b/tests/storage_tests/formats_test/fs_test.py index e566670d5..1a0656ac7 100644 --- a/tests/storage_tests/formats_test/fs_test.py +++ b/tests/storage_tests/formats_test/fs_test.py @@ -87,6 +87,23 @@ def test_create_options(self): out = capture_output(["blkid", "-sUUID", "-ovalue", self.loop_devices[0]]) self.assertEqual(out.strip(), uuid) + def test_fs_is_empty(self): + an_fs = self._fs_class() + if not an_fs.formattable: + self.skipTest("can not create filesystem %s" % an_fs.name) + an_fs.device = self.loop_devices[0] + self.assertIsNone(an_fs.create()) + + self.assertTrue(an_fs.is_empty) + + with tempfile.TemporaryDirectory() as mountpoint: + an_fs.mount(mountpoint=mountpoint) + os.makedirs(os.path.join(mountpoint, "test")) + an_fs.unmount() + self.assertFalse(an_fs.is_empty) + + self.assertFalse(an_fs.is_empty) + class FATFSTestCase(fstesting.FSAsRoot): _fs_class = fs.FATFS @@ -195,6 +212,23 @@ def test_too_big2(self): # XXX this tests assumes that resizing to max size - 1 B will fail, but xfs_grow won't self.skipTest("Not checking resize for this test category.") + def test_fs_is_empty(self): + an_fs = self._fs_class() + if not an_fs.formattable: + self.skipTest("can not create filesystem %s" % an_fs.name) + an_fs.device = self.loop_devices[0] + self.assertIsNone(an_fs.create()) + + self.assertTrue(an_fs.is_empty) + + with tempfile.TemporaryDirectory() as mountpoint: + an_fs.mount(mountpoint=mountpoint) + os.makedirs(os.path.join(mountpoint, "test")) + an_fs.unmount() + self.assertFalse(an_fs.is_empty) + + self.assertFalse(an_fs.is_empty) + class HFSPlusTestCase(fstesting.FSAsRoot): _fs_class = fs.HFSPlus