Skip to content

Commit

Permalink
Refactor get_updated_ona_data
Browse files Browse the repository at this point in the history
  • Loading branch information
truenomad committed Feb 6, 2025
1 parent f17ad71 commit 0dbc5ba
Showing 1 changed file with 80 additions and 60 deletions.
140 changes: 80 additions & 60 deletions R/get_ona.R
Original file line number Diff line number Diff line change
Expand Up @@ -697,28 +697,21 @@ generate_urls <- function(full_data, file_path,
#' @param urls array of url strings
#' @return tibble with all data
call_urls <- function(urls, api_token) {
## use futures for parallel operations
doParallel::registerDoParallel(cores = (parallel::detectCores() - 2))
future::plan(future::multisession)
options(doFuture.rng.onMisuse = "ignore")

progressr::handlers("cli")

progressr::with_progress({
p <- progressr::progressor(along = urls)

results <- foreach::`%dopar%`(
foreach::foreach(
url = urls
),
{

results <- purrr::map_dfr(
urls,
function(url) {
p() # Update progress
# Retrieve data from the API
data <- get_paginated_data(api_url = url, api_token = api_token)

# Extract form ID from the URL using regex
form_id <- gsub(".*data/(\\d+).*", "\\1", url)

# If data is non-empty, add the form_id as a new column
if (nrow(data) > 0) {
data <- dplyr::mutate(
Expand All @@ -727,17 +720,14 @@ call_urls <- function(urls, api_token) {
date_last_updated = Sys.Date()
)
}

data
}
)
})

# Combine all the results into one tibble
combined_data <- dplyr::bind_rows(results)

gc() # Clean up memory

combined_data
results
}

#' Update ONA Data
Expand Down Expand Up @@ -774,13 +764,13 @@ get_updated_ona_data <- function(base_url = "https://api.whonghub.org",
data_file_name = "my_ona_data") {
# check base url validity
base_url <- validate_base_url(base_url)

if (is.null(file_path)) {
# Prompt the user to enter the file path
file_path <- readline(
"Enter the file path (or press Enter to use the current directory): "
)

# If the entered file path is empty or incorrect, default to
# the current working directory
if (nchar(file_path) == 0 || !file.exists(file_path)) {
Expand All @@ -790,14 +780,14 @@ get_updated_ona_data <- function(base_url = "https://api.whonghub.org",
message("Using the provided file path.")
}
}

# Check if the form id is available for download -----------------------------

resp_data <- prep_ona_data_endpoints(
base_url = base_url,
api_token = api_token
)

# Check if all form_ids are present in resp_data$id
if (!all(form_ids %in% unique(resp_data$id))) {
cli::cli_abort(
Expand All @@ -809,26 +799,26 @@ get_updated_ona_data <- function(base_url = "https://api.whonghub.org",
)
}
# Process if to update data --------------------------------------------------

# construct file names for data
file_name <- paste0(file_path, "/", data_file_name, ".rds")

# Load existing data if it exists
if (file.exists(file_name)) {
# get data
full_data_orig <- poliprep::read(file_name)
} else {
full_data_orig <- data.frame()
}

urls <- generate_urls(
full_data_orig,
file_path, data_file_name, base_url, form_ids
)


# If getting multiple columns, include these in url --------------------------

if (!is.null(selected_columns)) {
# Convert selected_columns to JSON array string
if (!is.null(selected_columns)) {
Expand All @@ -837,31 +827,31 @@ get_updated_ona_data <- function(base_url = "https://api.whonghub.org",
} else {
fields_json <- NULL
}

url_list <- NULL

for (url in urls) {
# Build the full URL for each form id
results <- httr::modify_url(url, query = fields_json)

url_list[[url]] <- results
}
} else {
url_list <- urls
}

# Download data -------------------------------------------------------------

new_data <- call_urls(url_list, api_token = api_token) |>
# drop any empty columns
dplyr::select(
dplyr::where(
~ any(!is.na(.))
)
)

# update the existing data ---------------------------------------------------

# Combine new data with existing data
full_data <- dplyr::bind_rows(full_data_orig, new_data) |>
dplyr::arrange(
Expand All @@ -872,12 +862,12 @@ get_updated_ona_data <- function(base_url = "https://api.whonghub.org",
dplyr::group_by(`_id`, form_id_num) |>
dplyr::slice(1) |>
dplyr::ungroup()

# log results ----------------------------------------------------------------

if (log_results) {
logs <- NULL

for (form_id in unique(full_data$form_id_num)) {
if (nrow(full_data_orig) != 0) {
df <- full_data_orig |>
Expand All @@ -886,21 +876,32 @@ get_updated_ona_data <- function(base_url = "https://api.whonghub.org",
} else {
df <- data.frame()
}

df_new <- new_data |>
dplyr::filter(form_id_num == form_id) |>
janitor::remove_empty(which = "cols")



if (nrow(new_data) > 0) {
df_new <- new_data |>
dplyr::filter(form_id_num == form_id) |>
janitor::remove_empty(which = "cols")

df_new_tot_cols <- ncol(df_new)
df_new_tot_rows <- nrow(df_new)

} else {
df_new <- NULL
df_new_tot_cols <- 0
df_new_tot_rows <- 0
}

# Construct the log message
log_message <- data.frame(
form_id = form_id,
update_date = Sys.Date(),
total_columns = ncol(df_new),
total_rows = format(nrow(df) + nrow(df_new), big.mark = ","),
new_columns = ncol(df_new) - ncol(df),
new_rows = format(nrow(df_new), big.mark = ",")
total_columns = df_new_tot_cols,
total_rows = format(nrow(df) + df_new_tot_rows, big.mark = ","),
new_columns = df_new_tot_rows - ncol(df),
new_rows = format(df_new_tot_rows, big.mark = ",")
)

logs[[form_id]] <- log_message |>
dplyr::mutate(
new_rows = ifelse(
Expand All @@ -911,30 +912,49 @@ get_updated_ona_data <- function(base_url = "https://api.whonghub.org",
)
)
}

log_messages <- do.call(rbind, logs)


log_messages <- do.call(rbind, logs) |>
dplyr::mutate(
new_columns = as.character(new_columns),
new_rows = as.character(new_rows)
)

# construct file names for logging
log_file_name <- paste0(file_path, "/", "ona_data_update_log.rds")

if (file.exists(log_file_name)) {
log_data <- poliprep::read(log_file_name)
log_data <- poliprep::read(log_file_name)
log_data <- dplyr::bind_rows(log_data, log_messages) |>
dplyr::distinct()
} else {
log_data <- log_messages
}

# Save log file
poliprep::save(
janitor::clean_names(log_data), log_file_name
)

total_new_rows <- sum(as.numeric(log_messages$new_rows), na.rm = TRUE)

if (total_new_rows > 0) {
cli::cli_alert_success(
paste0(
"Data update completed. \n",
"Total new rows added: ",
crayon::yellow(total_new_rows)
)
)
} else {
cli::cli_alert_info("No new data available. Everything is up to date.")
}

}

# Return output message and save results -------------------------------------

# save full data
poliprep::save(full_data, file_name)

return(full_data)
}

0 comments on commit 0dbc5ba

Please sign in to comment.