Skip to content

Commit

Permalink
Merge pull request #16 from philiporlando/unnest-fgdb-to-gpkg
Browse files Browse the repository at this point in the history
Refactor into `fgdb_to_gpkg()` into smaller functions, add tests
  • Loading branch information
philiporlando authored Jan 1, 2024
2 parents 57c0e2e + 24889ac commit 8ed9725
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 56 deletions.
10 changes: 9 additions & 1 deletion fgdb_to_gpkg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
from .fgdb_to_gpkg import fgdb_to_gpkg
from .fgdb_to_gpkg import (
remove_gpkg_if_overwrite,
get_layer_lists,
convert_layer,
fgdb_to_gpkg,
)

__all__ = [
"remove_gpkg_if_overwrite",
"get_layer_lists",
"convert_layer",
"fgdb_to_gpkg",
]
148 changes: 94 additions & 54 deletions fgdb_to_gpkg/fgdb_to_gpkg.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,81 +4,124 @@
import os
import warnings
from tqdm import tqdm
from typing import List, Tuple


def remove_gpkg_if_overwrite(gpkg_path: str, overwrite: bool) -> None:
"""
Removes an existing GeoPackage file if overwrite is True.
:param gpkg_path: file path of the GeoPackage (.gpkg)
:type gpkg_path: str
:param overwrite: flag indicating whether to overwrite existing files
:type overwrite: bool
"""
if os.path.exists(gpkg_path) and overwrite:
os.remove(gpkg_path)


def get_layer_lists(
fgdb_path: str, gpkg_path: str, overwrite: bool
) -> Tuple[List[str], List[str]]:
"""
Retrieves the list of layers from the File GeoDatabase and, if applicable, from the GeoPackage.
:param fgdb_path: file path of an Esri File GeoDataBase (.gdb)
:type fgdb_path: str
:param gpkg_path: file path of the GeoPackage (.gpkg)
:type gpkg_path: str
:param overwrite: flag indicating whether to overwrite existing files
:type overwrite: bool
:return: Tuple of two lists containing feature classes and GeoPackage layers
:rtype: Tuple[List[str], List[str]]
"""
fc_list = fiona.listlayers(fgdb_path)
layer_list = (
fiona.listlayers(gpkg_path)
if os.path.exists(gpkg_path) and not overwrite
else []
)
return fc_list, layer_list


def convert_layer(
fc: str,
fgdb_path: str,
gpkg_path: str,
overwrite: bool,
layer_list: List[str],
**kwargs,
) -> None:
"""
Converts a single feature class from the File GeoDatabase to a layer in the GeoPackage.
:param fc: feature class name
:type fc: str
:param fgdb_path: file path of an Esri File GeoDataBase (.gdb)
:type fgdb_path: str
:param gpkg_path: file path of the GeoPackage (.gpkg)
:type gpkg_path: str
:param overwrite: flag indicating whether to overwrite existing files
:type overwrite: bool
:param layer_list: list of existing layers in the GeoPackage
:type layer_list: List[str]
:param kwargs: additional keyword arguments for geopandas.to_file()
"""
if not overwrite and fc in layer_list:
warnings.warn(
f"Layer {fc} already exists in {gpkg_path}. Skipping...", UserWarning
)
return

gdf = gpd.read_file(fgdb_path, layer=fc)
gdf.to_file(
gpkg_path, driver="GPKG", layer=fc, index=False, if_exists="append", **kwargs
)


def fgdb_to_gpkg(
fgdb_path: str, gpkg_path: str, overwrite: bool = True, **kwargs
) -> None:
"""Converts all feature classes within a File GeoDataBase to new layers within a GeoPackage.
"""
Converts all feature classes within a File GeoDataBase to new layers within a GeoPackage.
:param fgdb_path: file path of an Esri File GeoDataBase (.gdb)
:type fgdb_path: str
:param gpkg_path: file path of an OGC GeoPackage (.gpkg)
:type fgdb_path: str
:type gpkg_path: str
:param overwrite: overwrites existing GeoPackage before copying over new layers if True, defaults to True
:type bool, optional
:type overwrite: bool
:param **kwargs: additional keyword arguments to pass to geopandas.to_file()
:param kwargs: additional keyword arguments to pass to geopandas.to_file()
"""

# Ensure input File GeoDataBase exists
if not os.path.exists(fgdb_path):
raise FileNotFoundError(f"{fgdb_path} does not exist!")

try:
# Remove existing GeoPackage if overwrite is True
if os.path.exists(gpkg_path) and overwrite:
os.remove(gpkg_path)

# List all feature classes within File GeoDataBase
fc_list = fiona.listlayers(fgdb_path)

# Create progress bar
progress_bar = tqdm(total=len(fc_list), desc="Converting layers")

# List all layers within GeoPackage if it exists
gpkg_exists = os.path.exists(gpkg_path)
if gpkg_exists:
layer_list = fiona.listlayers(gpkg_path)

# Loop through each feature class
for fc in fc_list:
# Skip writing layer if it already exists in GeoPackage when not overwriting
if not overwrite and gpkg_exists:
if fc in layer_list:
warnings.warn(
f"Layer {fc} already exists in {gpkg_path}. Skipping...",
UserWarning,
)
continue

# Read the feature class into GeoDataFrame
gdf = gpd.read_file(fgdb_path, layer=fc)

# Write the GeoDataFrame to a GeoPackage
gdf.to_file(
gpkg_path,
driver="GPKG",
layer=fc,
index=False,
if_exists="append",
**kwargs,
)

# Update progress bar
progress_bar.update(1)

# Close progress bar
progress_bar.close()
remove_gpkg_if_overwrite(gpkg_path, overwrite)
fc_list, layer_list = get_layer_lists(fgdb_path, gpkg_path, overwrite)

with tqdm(total=len(fc_list), desc="Converting layers") as progress_bar:
for fc in fc_list:
convert_layer(fc, fgdb_path, gpkg_path, overwrite, layer_list, **kwargs)
progress_bar.update(1)

except Exception as e:
print(f"Error converting {fgdb_path} to {gpkg_path}: {e}")


if __name__ == "__main__":
# Set up argparse to parse command line arguments
parser = argparse.ArgumentParser(
description="Convert an Esri File GeoDatabase to a GeoPackage"
)
Expand All @@ -90,8 +133,5 @@ def fgdb_to_gpkg(
help="deletes an existing GeoPackage before copying layers from File GeoDataBase.",
)

# Parse command line arguments
args = parser.parse_args()

# Call the fgdb_to_gpkg function with the provided arguments
fgdb_to_gpkg(args.fgdb_path, args.gpkg_path, args.overwrite)
31 changes: 30 additions & 1 deletion tests/test_fgdb_to_gpkg.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@
from typing import Literal
from shapely.geometry.polygon import Polygon
from shapely.geometry.multipolygon import MultiPolygon
from fgdb_to_gpkg import fgdb_to_gpkg
from fgdb_to_gpkg import (
remove_gpkg_if_overwrite,
get_layer_lists,
convert_layer,
fgdb_to_gpkg,
)


# Setup fixture
@pytest.fixture
def setup_fgdb_gpkg() -> tuple[str, str, list[str]]:
# Setup fixture for creating a temporary File GeoDatabase and GeoPackage
Expand All @@ -33,6 +39,29 @@ def setup_fgdb_gpkg() -> tuple[str, str, list[str]]:
yield fgdb_path, gpkg_path, layer


def test_remove_gpkg_if_overwrite(setup_fgdb_gpkg: tuple[str, str, list[str]]):
_, gpkg_path, _ = setup_fgdb_gpkg
# Create a dummy file to simulate an existing GeoPackage
with open(gpkg_path, "w") as f:
f.write("dummy content")
remove_gpkg_if_overwrite(gpkg_path, True)
assert not os.path.exists(gpkg_path), "GeoPackage was removed as expected."


# Test for get_layer_lists function
def test_get_layer_lists(setup_fgdb_gpkg: tuple[str, str, list[str]]):
fgdb_path, gpkg_path, _ = setup_fgdb_gpkg
fc_list, layer_list = get_layer_lists(fgdb_path, gpkg_path, False)
# Check the returned lists as per your expectations


# Test for convert_layer function
def test_convert_layer(setup_fgdb_gpkg: tuple[str, str, list[str]]):
fgdb_path, gpkg_path, layer = setup_fgdb_gpkg
convert_layer(layer, fgdb_path, gpkg_path, False, [])
# Read and check if the layer was added to the GeoPackage


def test_fgdb_to_gpkg(setup_fgdb_gpkg: tuple[str, str, Literal["test_fc"]]):
# Test basic functionality of fgdb_to_gpkg
fgdb_path, gpkg_path, layer = setup_fgdb_gpkg
Expand Down

0 comments on commit 8ed9725

Please sign in to comment.