diff --git a/man/mkcomposefs.md b/man/mkcomposefs.md index b903e28..54d6c99 100644 --- a/man/mkcomposefs.md +++ b/man/mkcomposefs.md @@ -70,6 +70,10 @@ will be a mountable composefs image. actual image format version used will be adjusted upwards if that is beneficial for the image, up to the max version. +**\-\-threads** +: Number of threads to be used to calculate the file digests and copy. + Default is the number of processors. + # FORMAT VERSIONING Composefs images are binary reproduceable, meaning that for a given diff --git a/tools/mkcomposefs.c b/tools/mkcomposefs.c index 4cc7df1..a34c6c1 100644 --- a/tools/mkcomposefs.c +++ b/tools/mkcomposefs.c @@ -37,6 +37,8 @@ #include #include #include +#include +#include static void oom(void) { @@ -65,7 +67,8 @@ static __attribute__((format(printf, 1, 2))) char *make_error(const char *fmt, . #define OPT_USER_XATTRS 112 #define OPT_FROM_FILE 113 #define OPT_MIN_VERSION 114 -#define OPT_MAX_VERSION 115 +#define OPT_THREADS 115 +#define OPT_MAX_VERSION 116 static size_t split_at(const char **start, size_t *length, char split_char, bool *partial) @@ -872,43 +875,283 @@ static ssize_t write_cb(void *_file, void *buf, size_t count) return fwrite(buf, 1, count, file); } -static int fill_store(struct lcfs_node_s *node, const char *path, - const char *digest_store_path) +struct work_item { + struct lcfs_node_s *node; + char *path; +}; + +struct work_item_header { + struct work_item *items; + int count; + int capacity; +}; + +static bool add_to_work_collection(struct work_item_header *header, + struct lcfs_node_s *node, const char *path) { - cleanup_free char *tmp_path = NULL; - const char *fname; - int ret; + if (!header) + return false; + + if (header->count == header->capacity) { + header->capacity = header->count == 0 ? 16 : header->capacity * 2; + struct work_item *new_children; + new_children = reallocarray(header->items, sizeof(*header->items), + header->capacity); + if (new_children == NULL) { + errno = ENOMEM; + return false; + } + + header->items = new_children; + } + header->items[header->count].node = node; + header->items[header->count].path = strdup(path); + if (header->items[header->count].path == NULL) { + return false; + } + ++header->count; + + return true; +} + +static void cleanup_work_items(struct work_item_header *header) +{ + if (!header) + return; - fname = lcfs_node_get_name(node); + for (int i = 0; i < header->count; ++i) { + free(header->items[i].path); + } + + free(header->items); +} + +static bool construct_copy_data(struct lcfs_node_s *node, + struct work_item_header *header, char *path) +{ + cleanup_free char *tmp_path = NULL; + const char *fname = lcfs_node_get_name(node); if (fname) { - ret = join_paths(&tmp_path, path, fname); - if (ret < 0) - return ret; + if (join_paths(&tmp_path, path, fname) < 0) + return false; + path = tmp_path; } if (lcfs_node_dirp(node)) { - size_t n_children = lcfs_node_get_n_children(node); + const size_t n_children = lcfs_node_get_n_children(node); for (size_t i = 0; i < n_children; i++) { - struct lcfs_node_s *child = lcfs_node_get_child(node, i); - ret = fill_store(child, path, digest_store_path); - if (ret < 0) - return ret; + if (!construct_copy_data(lcfs_node_get_child(node, i), + header, path)) { + return false; + } } } else if ((lcfs_node_get_mode(node) & S_IFMT) == S_IFREG && lcfs_node_get_content(node) == NULL && lcfs_node_get_payload(node) != NULL) { - const char *payload = lcfs_node_get_payload(node); + if (!add_to_work_collection(header, node, path)) { + return false; + } + } - ret = copy_file_with_dirs_if_needed(path, digest_store_path, - payload, true); - if (ret < 0) - return ret; + return true; +} + +static bool construct_compute_data(struct lcfs_node_s *node, + struct work_item_header *header, const char *path) +{ + cleanup_free char *tmp_path = NULL; + const char *fname = lcfs_node_get_name(node); + + if (fname) { + if (join_paths(&tmp_path, path, fname) < 0) + return false; + + path = tmp_path; + } + + if ((node->inode.st_mode & S_IFMT) == S_IFREG) { + if (!add_to_work_collection(header, node, path)) { + return false; + } } + if (!lcfs_node_dirp(node)) + return true; + + size_t n_children = lcfs_node_get_n_children(node); + for (size_t i = 0; i < n_children; i++) { + struct lcfs_node_s *child = lcfs_node_get_child(node, i); + if (!construct_compute_data(child, header, path)) { + return false; + } + } + + return true; +} + +struct work_item_iterator { + pthread_mutex_t mutex_node_iterator; + int current_item; + bool cancel_request; +}; + +static struct work_item *get_next_work_item(struct work_item_header *header, + struct work_item_iterator *iterator) +{ + if (!iterator) + return NULL; + + bool cancel = false; + struct work_item *ret = NULL; + + pthread_mutex_lock(&(iterator->mutex_node_iterator)); + if (iterator->cancel_request) + cancel = true; + else if (iterator->current_item < header->count) { + ret = &(header->items[iterator->current_item]); + iterator->current_item++; + } + pthread_mutex_unlock(&(iterator->mutex_node_iterator)); + return cancel ? NULL : ret; +} + +static void request_cancel(struct work_item_iterator *iterator) +{ + pthread_mutex_lock(&(iterator->mutex_node_iterator)); + iterator->cancel_request = true; + pthread_mutex_unlock(&(iterator->mutex_node_iterator)); +} + +typedef int (*THREAD_PROCESS_PROC)(struct work_item *, void *); + +static int process_copy(struct work_item *item, void *digest_store_path) +{ + if (copy_file_with_dirs_if_needed(item->path, (const char *)digest_store_path, + lcfs_node_get_payload(item->node), + true) < 0) { + return -1; + } return 0; } +static int process_compute(struct work_item *item, void *data) +{ + return lcfs_node_set_from_content(item->node, AT_FDCWD, item->path, + LCFS_BUILD_COMPUTE_DIGEST | + LCFS_BUILD_BY_DIGEST) ? + 0 : + -1; +} + +struct thread_data { + THREAD_PROCESS_PROC proc; + struct work_item_header *header; + struct work_item_iterator *iterator; + void *data; +}; + +static void *thread_proc(void *data) +{ + struct thread_data *info = (struct thread_data *)data; + + while (true) { + struct work_item *item = + get_next_work_item(info->header, info->iterator); + + if (!item) + return 0; + + if (!item->node) + return 0; + + if (info->proc(item, info->data) != 0) { + request_cancel(info->iterator); + return 0; + } + } + return 0; +} + +static bool execute_in_threads(const int requested_threads, + struct work_item_header *header, + THREAD_PROCESS_PROC proc, void *data) +{ + struct work_item_iterator iterator; + if (0 != pthread_mutex_init(&iterator.mutex_node_iterator, NULL)) { + return false; + } + + iterator.current_item = 0; + iterator.cancel_request = false; + + struct thread_data thread_info; + thread_info.data = data; + thread_info.proc = proc; + thread_info.header = header; + thread_info.iterator = &iterator; + + pthread_t *threads = NULL; + const int thread_count = requested_threads - 1; + if (thread_count >= 1) { + threads = calloc(thread_count, sizeof(pthread_t)); + if (threads == NULL) + return false; + + for (int i = 0; i < thread_count; i++) { + pthread_create(&threads[i], NULL, thread_proc, &thread_info); + } + } + + // Let this thread also process items instead of waiting for the worker threads + thread_proc(&thread_info); + + if (thread_count >= 1) { + for (int i = 0; i < thread_count; i++) + pthread_join(threads[i], NULL); + + free(threads); + } + return !iterator.cancel_request; +} + +static int compute_digest(const int thread_count, struct lcfs_node_s *node, + const char *path) +{ + struct work_item_header header; + header.items = NULL; + header.capacity = 0; + header.count = 0; + + if (!construct_compute_data(node, &header, path)) { + return -1; + } + + bool ret = execute_in_threads(thread_count, &header, process_compute, NULL); + cleanup_work_items(&header); + + return ret ? 0 : -1; +} + +static int fill_store(const int thread_count, struct lcfs_node_s *node, + const char *path, const char *digest_store_path) +{ + struct work_item_header header; + header.items = NULL; + header.capacity = 0; + header.count = 0; + + if (!construct_copy_data(node, &header, (char *)path)) { + return -1; + } + + bool ret = execute_in_threads(thread_count, &header, process_copy, + (void *)digest_store_path); + cleanup_work_items(&header); + + return ret ? 0 : -1; +} + static void digest_to_string(const uint8_t *csum, char *buf) { static const char hexchars[] = "0123456789abcdef"; @@ -922,6 +1165,11 @@ static void digest_to_string(const uint8_t *csum, char *buf) buf[j] = '\0'; } +static int get_cpu_count(void) +{ + return get_nprocs(); +} + static void usage(const char *argv0) { const char *bin = basename(argv0); @@ -937,8 +1185,10 @@ static void usage(const char *argv0) " --print-digest-only Print the digest of the image, don't write image\n" " --from-file The source is a dump file, not a directory\n" " --min-version=N Use this minimal format version (default=%d)\n" - " --max-version=N Use this maxium format version (default=%d)\n", - bin, LCFS_DEFAULT_VERSION_MIN, LCFS_DEFAULT_VERSION_MAX); + " --max-version=N Use this maxium format version (default=%d)\n" + " --threads=N Use this to calculate digest and copy files in threads (default=%d)\n", + bin, LCFS_DEFAULT_VERSION_MIN, LCFS_DEFAULT_VERSION_MAX, + get_cpu_count()); } int main(int argc, char **argv) @@ -1004,6 +1254,12 @@ int main(int argc, char **argv) flag: NULL, val: OPT_MIN_VERSION }, + { + name: "threads", + has_arg: required_argument, + flag: NULL, + val: OPT_THREADS + }, {}, }; struct lcfs_write_options_s options = { 0 }; @@ -1025,13 +1281,11 @@ int main(int argc, char **argv) long min_version = 0; long max_version = 0; char *end; + int threads = get_cpu_count(); #ifdef FUZZER #endif - /* We always compute the digest and reference by digest */ - buildflags |= LCFS_BUILD_COMPUTE_DIGEST | LCFS_BUILD_BY_DIGEST; - while ((opt = getopt_long(argc, argv, ":CR", longopts, NULL)) != -1) { switch (opt) { case OPT_USE_EPOCH: @@ -1074,6 +1328,18 @@ int main(int argc, char **argv) exit(EXIT_FAILURE); } break; + + case OPT_THREADS: + threads = strtol(optarg, &end, 10); + if (*optarg == 0 || *end != 0) { + fprintf(stderr, "Invalid threads count %s\n", optarg); + exit(EXIT_FAILURE); + } + if (threads <= 0) { + fprintf(stderr, "Invalid threads count %d\n", threads); + exit(EXIT_FAILURE); + } + break; case ':': fprintf(stderr, "option needs a value\n"); exit(EXIT_FAILURE); @@ -1160,12 +1426,20 @@ int main(int argc, char **argv) if (close_input) fclose(input); } else { + // Digest calculation and inline will be done in parallel + buildflags &= ~LCFS_BUILD_COMPUTE_DIGEST; + buildflags &= ~LCFS_BUILD_BY_DIGEST; + buildflags |= LCFS_BUILD_NO_INLINE; + root = lcfs_build(AT_FDCWD, src_path, buildflags, &failed_path); if (root == NULL) err(EXIT_FAILURE, "error accessing %s", failed_path); + if (compute_digest(threads, root, src_path) < 0) + err(EXIT_FAILURE, "error computing digest %s", failed_path); + if (digest_store_path && - fill_store(root, src_path, digest_store_path) < 0) + fill_store(threads, root, src_path, digest_store_path) < 0) err(EXIT_FAILURE, "cannot fill store"); }