Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter data #181

Merged
merged 13 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 146 additions & 145 deletions .github/workflows/process_data.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import os
from urllib.parse import urljoin, urlparse
Expand All @@ -10,101 +11,87 @@
KEY_IDENTITY = os.getenv("KEY_IDENTITY")
KEY_CREDENTIAL = os.getenv("KEY_CREDENTIAL")
ITEM_SET_ID = os.getenv("ITEM_SET_ID")
CSV_PATH = os.getenv("CSV_PATH", "_data/sgb-metadata-csv.csv")
JSON_PATH = os.getenv("JSON_PATH", "_data/sgb-metadata-json.json")
maehr marked this conversation as resolved.
Show resolved Hide resolved

# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)


# Function to get items from a collection
def get_items_from_collection(collection_id):
url = urljoin(OMEKA_API_URL, "items")
all_items = []
params = {
"item_set_id": collection_id,
"key_identity": KEY_IDENTITY,
"key_credential": KEY_CREDENTIAL,
"per_page": 100,
}

while True:
response = requests.get(url, params=params)
if response.status_code != 200:
logging.error(f"Error: {response.status_code}")
break
items = response.json()
all_items.extend(items)
next_url = None
for link in response.links:
if response.links[link]["rel"] == "next":
next_url = response.links[link]["url"]
break
if not next_url:
break
url = next_url
return all_items


# Function to get media for an item
def get_media(item_id):
url = urljoin(OMEKA_API_URL, f"media?item_id={item_id}")
params = {"key_identity": KEY_IDENTITY, "key_credential": KEY_CREDENTIAL}
response = requests.get(url, params=params)
if response.status_code != 200:
logging.error(f"Error: {response.status_code}")
return None
return response.json()
# --- Helper Functions for Data Extraction ---
def is_valid_url(url):
"""Checks if a URL is valid."""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except ValueError:
return False


# Function to download file
def download_file(url, dest_path):
"""Downloads a file from a given URL to the specified destination path."""
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
try:
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(dest_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
except requests.exceptions.HTTPError as http_err:
logging.error(f"HTTP error occurred: {http_err}")
except Exception as err:
logging.error(f"Other error occurred: {err}")
except requests.exceptions.RequestException as err:
logging.error(f"File download error: {err}")

maehr marked this conversation as resolved.
Show resolved Hide resolved

def get_paginated_items(url, params):
"""Fetches all items from a paginated API endpoint."""
items = []
while url:
try:
response = requests.get(url, params=params)
response.raise_for_status()
except requests.exceptions.RequestException as err:
logging.error(f"Error fetching items: {err}")
break
items.extend(response.json())
url = response.links.get("next", {}).get("url")
params = None
maehr marked this conversation as resolved.
Show resolved Hide resolved

return items

# Function to check if URL is valid
def is_valid_url(url):
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except ValueError:
return False

def get_items_from_collection(collection_id):
"""Fetches all items from a specified collection."""
params = {
"item_set_id": collection_id,
"key_identity": KEY_IDENTITY,
"key_credential": KEY_CREDENTIAL,
"per_page": 100,
}
return get_paginated_items(urljoin(OMEKA_API_URL, "items"), params)

# Helper functions to extract data
def extract_prop_value(props, prop_id):
return next(
(
prop.get("@value", "")
for prop in props
if prop.get("property_id") == prop_id
),
"",

def get_media(item_id):
"""Fetches media associated with a specific item ID."""
params = {"key_identity": KEY_IDENTITY, "key_credential": KEY_CREDENTIAL}
return get_paginated_items(
urljoin(OMEKA_API_URL, f"media?item_id={item_id}"), params
)


def extract_prop_uri(props, prop_id):
return next(
(
f"[{prop.get('o:label', '')}]({prop.get('@id', '')})"
for prop in props
if prop.get("property_id") == prop_id
),
"",
)
# --- Data Extraction and Transformation Functions ---
def extract_property(props, prop_id, as_uri=False):
"""Extracts a property value or URI from properties based on property ID."""
for prop in props:
if prop.get("property_id") == prop_id:
if as_uri:
return f"[{prop.get('o:label', '')}]({prop.get('@id', '')})"
return prop.get("@value", "")
return ""


def extract_combined_list(props):
def extract_combined_values(props):
"""Combines text values and URIs from properties into a single list."""
values = [
prop.get("@value", "").replace(";", "&#59")
for prop in props
Expand All @@ -115,50 +102,28 @@ def extract_combined_list(props):
for prop in props
if "@id" in prop
]
combined = values + uris
return values + uris


def extract_combined_values_csv(props):
"""Combines text values and URIs into a semicolon-separated string."""
combined = extract_combined_values(props)
return ";".join(combined)


def extract_item_data(item):
# Download the thumbnail image if available and valid
image_url = item.get("thumbnail_display_urls", {}).get("large", "")
local_image_path = ""
def download_thumbnail(image_url):
"""Downloads the thumbnail image if the URL is valid."""
if image_url and is_valid_url(image_url):
filename = os.path.basename(image_url)
local_image_path = f"objects/{filename}"
if not os.path.exists(local_image_path):
download_file(image_url, local_image_path)

logging.info(f"Item ID: {item['o:id']}")

return {
"objectid": extract_prop_value(item.get("dcterms:identifier", []), 10),
"parentid": "",
"title": extract_prop_value(item.get("dcterms:title", []), 1),
"description": extract_prop_value(item.get("dcterms:description", []), 4),
"subject": extract_combined_list(item.get("dcterms:subject", [])),
"era": extract_prop_value(item.get("dcterms:temporal", []), 41),
"isPartOf": extract_combined_list(item.get("dcterms:isPartOf", [])),
"creator": extract_combined_list(item.get("dcterms:creator", [])),
"publisher": extract_combined_list(item.get("dcterms:publisher", [])),
"source": extract_combined_list(item.get("dcterms:source", [])),
"date": extract_prop_value(item.get("dcterms:date", []), 7),
"type": extract_prop_uri(item.get("dcterms:type", []), 8),
"format": extract_prop_value(item.get("dcterms:format", []), 9),
"extent": extract_prop_value(item.get("dcterms:extent", []), 25),
"language": extract_prop_value(item.get("dcterms:language", []), 12),
"relation": extract_combined_list(item.get("dcterms:relation", [])),
"rights": extract_prop_value(item.get("dcterms:rights", []), 15),
"license": extract_prop_value(item.get("dcterms:license", []), 49),
"display_template": "compound_object",
"object_location": "",
"image_small": local_image_path,
"image_thumb": local_image_path,
"image_alt_text": item.get("o:alt_text", ""),
}
return local_image_path
return ""
maehr marked this conversation as resolved.
Show resolved Hide resolved


def infer_display_template(format_value):
"""Infers the display template type based on the format value."""
if "image" in format_value.lower():
return "image"
elif "pdf" in format_value.lower():
Expand All @@ -169,18 +134,48 @@ def infer_display_template(format_value):
return "record"


def extract_item_data(item):
"""Extracts relevant data from an item and downloads its thumbnail if available."""
local_image_path = download_thumbnail(
item.get("thumbnail_display_urls", {}).get("large", "")
)

return {
"objectid": extract_property(item.get("dcterms:identifier", []), 10),
"parentid": "",
"title": extract_property(item.get("dcterms:title", []), 1),
"description": extract_property(item.get("dcterms:description", []), 4),
"subject": extract_combined_values(item.get("dcterms:subject", [])),
"era": extract_property(item.get("dcterms:temporal", []), 41),
"isPartOf": extract_combined_values(item.get("dcterms:isPartOf", [])),
"creator": extract_combined_values(item.get("dcterms:creator", [])),
"publisher": extract_combined_values(item.get("dcterms:publisher", [])),
"source": extract_combined_values(item.get("dcterms:source", [])),
"date": extract_property(item.get("dcterms:date", []), 7),
"type": extract_property(item.get("dcterms:type", []), 8, as_uri=True),
"format": extract_property(item.get("dcterms:format", []), 9),
"extent": extract_property(item.get("dcterms:extent", []), 25),
"language": extract_property(item.get("dcterms:language", []), 12),
"relation": extract_combined_values(item.get("dcterms:relation", [])),
"rights": extract_property(item.get("dcterms:rights", []), 15),
"license": extract_property(item.get("dcterms:license", []), 49),
"display_template": "compound_object",
"object_location": "",
"image_small": local_image_path,
"image_thumb": local_image_path,
"image_alt_text": item.get("o:alt_text", ""),
}


def extract_media_data(media, item_dc_identifier):
format_value = extract_prop_value(media.get("dcterms:format", []), 9)
"""Extracts relevant data from a media item associated with a specific item."""
format_value = extract_property(media.get("dcterms:format", []), 9)
display_template = infer_display_template(format_value)

# Download the thumbnail image if available and valid
image_url = media.get("thumbnail_display_urls", {}).get("large", "")
local_image_path = ""
if image_url and is_valid_url(image_url):
filename = os.path.basename(image_url)
local_image_path = f"objects/{filename}"
if not os.path.exists(local_image_path):
download_file(image_url, local_image_path)
local_image_path = download_thumbnail(
media.get("thumbnail_display_urls", {}).get("large", "")
)

# Extract media data
object_location = (
Expand All @@ -191,24 +186,24 @@ def extract_media_data(media, item_dc_identifier):
logging.info(f"is_public: {media.get('o:is_public')}")

return {
"objectid": extract_prop_value(media.get("dcterms:identifier", []), 10),
"objectid": extract_property(media.get("dcterms:identifier", []), 10),
"parentid": item_dc_identifier,
"title": extract_prop_value(media.get("dcterms:title", []), 1),
"description": extract_prop_value(media.get("dcterms:description", []), 4),
"subject": extract_combined_list(media.get("dcterms:subject", [])),
"era": extract_prop_value(media.get("dcterms:temporal", []), 41),
"isPartOf": extract_combined_list(media.get("dcterms:isPartOf", [])),
"creator": extract_combined_list(media.get("dcterms:creator", [])),
"publisher": extract_combined_list(media.get("dcterms:publisher", [])),
"source": extract_combined_list(media.get("dcterms:source", [])),
"date": extract_prop_value(media.get("dcterms:date", []), 7),
"type": extract_prop_uri(media.get("dcterms:type", []), 8),
"title": extract_property(media.get("dcterms:title", []), 1),
"description": extract_property(media.get("dcterms:description", []), 4),
"subject": extract_combined_values(media.get("dcterms:subject", [])),
"era": extract_property(media.get("dcterms:temporal", []), 41),
"isPartOf": extract_combined_values(media.get("dcterms:isPartOf", [])),
"creator": extract_combined_values(media.get("dcterms:creator", [])),
"publisher": extract_combined_values(media.get("dcterms:publisher", [])),
"source": extract_combined_values(media.get("dcterms:source", [])),
"date": extract_property(media.get("dcterms:date", []), 7),
"type": extract_property(media.get("dcterms:type", []), 8, as_uri=True),
"format": format_value,
"extent": extract_prop_value(media.get("dcterms:extent", []), 25),
"language": extract_prop_value(media.get("dcterms:language", []), 12),
"relation": extract_combined_list(media.get("dcterms:relation", [])),
"rights": extract_prop_value(media.get("dcterms:rights", []), 15),
"license": extract_prop_value(media.get("dcterms:license", []), 49),
"extent": extract_property(media.get("dcterms:extent", []), 25),
"language": extract_property(media.get("dcterms:language", []), 12),
"relation": extract_combined_values(media.get("dcterms:relation", [])),
"rights": extract_property(media.get("dcterms:rights", []), 15),
"license": extract_property(media.get("dcterms:license", []), 49),
"display_template": display_template,
"object_location": object_location,
"image_small": local_image_path,
Expand All @@ -217,37 +212,43 @@ def extract_media_data(media, item_dc_identifier):
}


# Main function to download item set and generate CSV
# --- Main Processing Function ---
def main():
# Download item set
collection_id = ITEM_SET_ID
items_data = get_items_from_collection(collection_id)
# Fetch item data
items_data = get_items_from_collection(ITEM_SET_ID)

# Extract item data
item_records = []
media_records = []
# Process each item and associated media
item_records, media_records = [], []
for item in items_data:
item_record = extract_item_data(item)
item_records.append(item_record)

# Extract media data for each item
item_dc_identifier = item_record["objectid"]
media_data = get_media(item.get("o:id", ""))
# Process media data for each item
maehr marked this conversation as resolved.
Show resolved Hide resolved
media_data = get_media(item["o:id"])
if media_data:
for media in media_data:
media_records.append(extract_media_data(media, item_dc_identifier))
media_records.append(extract_media_data(media, item_record["objectid"]))

# Combine item and media records
combined_records = item_records + media_records
# Save data to CSV and JSON formats
save_to_files(item_records + media_records, CSV_PATH, JSON_PATH)

# Create DataFrame
df = pd.DataFrame(combined_records)

# Save to CSV
csv_path = "_data/sgb-metadata.csv"
os.makedirs(os.path.dirname(csv_path), exist_ok=True)
df.to_csv(csv_path, index=False)
def save_to_files(records, csv_path, json_path):
"""Saves data to both CSV and JSON files."""
with open(json_path, "w", encoding="utf-8") as f:
json.dump(records, f, ensure_ascii=False)
logging.info(f"JSON file has been saved to {json_path}")

# Convert list of records to a DataFrame and save as CSV
records = [
{
key: ";".join(value) if isinstance(value, list) else value
for key, value in record.items()
}
for record in records
]
df = pd.DataFrame(records)
df.to_csv(csv_path, index=False)
logging.info(f"CSV file has been saved to {csv_path}")
maehr marked this conversation as resolved.
Show resolved Hide resolved


Expand Down
3 changes: 2 additions & 1 deletion _config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ lang: de
#
# Set the metadata for your collection (the name of the CSV file in your _data directory that describes the objects in your collection)
# Use the filename of your CSV **without** the ".csv" extension! E.g. _data/demo-metadata.csv --> "demo-metadata"
metadata: sgb-metadata
metadata: sgb-metadata-csv
metadata-json: sgb-metadata-json
maehr marked this conversation as resolved.
Show resolved Hide resolved
# page generation settings [optional!]
# [optional: only used if you need to tweak CB defaults or generate from more than one data file]
# page_gen:
Expand Down
Loading