diff --git a/man/mkcomposefs.md b/man/mkcomposefs.md index b903e28..f0c4512 100644 --- a/man/mkcomposefs.md +++ b/man/mkcomposefs.md @@ -70,6 +70,10 @@ will be a mountable composefs image. actual image format version used will be adjusted upwards if that is beneficial for the image, up to the max version. +**\-\-threads**=*count* +: Number of threads to be used to calculate the file digests and copy. + Default thread count is the number of processors when *--threads* is not specified. + # FORMAT VERSIONING Composefs images are binary reproduceable, meaning that for a given diff --git a/tools/mkcomposefs.c b/tools/mkcomposefs.c index a5074e3..55697fb 100644 --- a/tools/mkcomposefs.c +++ b/tools/mkcomposefs.c @@ -37,6 +37,8 @@ #include #include #include +#include +#include static void oom(void) { @@ -65,7 +67,8 @@ static __attribute__((format(printf, 1, 2))) char *make_error(const char *fmt, . #define OPT_USER_XATTRS 112 #define OPT_FROM_FILE 113 #define OPT_MIN_VERSION 114 -#define OPT_MAX_VERSION 115 +#define OPT_THREADS 115 +#define OPT_MAX_VERSION 116 static size_t split_at(const char **start, size_t *length, char split_char, bool *partial) @@ -893,43 +896,315 @@ static ssize_t write_cb(void *_file, void *buf, size_t count) return fwrite(buf, 1, count, file); } -static int fill_store(struct lcfs_node_s *node, const char *path, - const char *digest_store_path) +struct work_item { + struct lcfs_node_s *node; + char *path; +}; + +struct work_collection { + struct work_item *items; + int count; + int capacity; +}; + +static int add_to_work_collection(struct work_collection *collection, + struct lcfs_node_s *node, const char *path) { - cleanup_free char *tmp_path = NULL; - const char *fname; - int ret; + if (!collection) { + errno = EINVAL; + return -1; + } + if (collection->count == collection->capacity) { + int new_capacity = + collection->count == 0 ? 16 : collection->capacity * 2; + struct work_item *new_children; + new_children = + reallocarray(collection->items, + sizeof(*collection->items), new_capacity); + if (new_children == NULL) { + errno = ENOMEM; + return -1; + } + + collection->items = new_children; + collection->capacity = new_capacity; + } + collection->items[collection->count].path = strdup(path); + if (collection->items[collection->count].path == NULL) { + errno = ENOMEM; + return -1; + } + collection->items[collection->count].node = lcfs_node_ref(node); + + ++collection->count; + + return 0; +} + +static void cleanup_work_items(struct work_collection *collection) +{ + if (!collection) + return; + + for (int i = 0; i < collection->count; ++i) { + free(collection->items[i].path); + lcfs_node_unref(collection->items[i].node); + } + + free(collection->items); +} - fname = lcfs_node_get_name(node); +static int construct_copy_data(struct lcfs_node_s *node, + struct work_collection *collection, char *path) +{ + cleanup_free char *tmp_path = NULL; + const char *fname = lcfs_node_get_name(node); if (fname) { - ret = join_paths(&tmp_path, path, fname); - if (ret < 0) - return ret; + if (join_paths(&tmp_path, path, fname) < 0) + return -1; + path = tmp_path; } if (lcfs_node_dirp(node)) { - size_t n_children = lcfs_node_get_n_children(node); + const size_t n_children = lcfs_node_get_n_children(node); for (size_t i = 0; i < n_children; i++) { - struct lcfs_node_s *child = lcfs_node_get_child(node, i); - ret = fill_store(child, path, digest_store_path); - if (ret < 0) - return ret; + if (construct_copy_data(lcfs_node_get_child(node, i), + collection, path) < 0) { + return -1; + } } } else if ((lcfs_node_get_mode(node) & S_IFMT) == S_IFREG && lcfs_node_get_content(node) == NULL && lcfs_node_get_payload(node) != NULL) { - const char *payload = lcfs_node_get_payload(node); + if (add_to_work_collection(collection, node, path) < 0) { + return -1; + } + } - ret = copy_file_with_dirs_if_needed(path, digest_store_path, - payload, true); - if (ret < 0) - return ret; + return 0; +} + +static int construct_compute_data(struct lcfs_node_s *node, + struct work_collection *collection, + const char *path) +{ + cleanup_free char *tmp_path = NULL; + const char *fname = lcfs_node_get_name(node); + + if (fname) { + if (join_paths(&tmp_path, path, fname) < 0) + return -1; + + path = tmp_path; + } + + if ((node->inode.st_mode & S_IFMT) == S_IFREG) { + if (add_to_work_collection(collection, node, path) < 0) { + return -1; + } + } + + if (!lcfs_node_dirp(node)) + return 0; + + size_t n_children = lcfs_node_get_n_children(node); + for (size_t i = 0; i < n_children; i++) { + struct lcfs_node_s *child = lcfs_node_get_child(node, i); + if (construct_compute_data(child, collection, path) < 0) { + return -1; + } + } + + return 0; +} + +struct work_item_iterator { + pthread_mutex_t mutex_node_iterator; + int current_item; + int errorcode; + bool cancel_request; +}; + +static struct work_item *get_next_work_item(struct work_collection *collection, + struct work_item_iterator *iterator) +{ + if (!iterator || !collection) + return NULL; + + bool cancel = false; + struct work_item *ret = NULL; + + pthread_mutex_lock(&(iterator->mutex_node_iterator)); + if (iterator->cancel_request) + cancel = true; + else if (iterator->current_item < collection->count) { + ret = &(collection->items[iterator->current_item]); + iterator->current_item++; + } + pthread_mutex_unlock(&(iterator->mutex_node_iterator)); + return cancel ? NULL : ret; +} + +static void request_cancel(struct work_item_iterator *iterator, int errorcode) +{ + pthread_mutex_lock(&(iterator->mutex_node_iterator)); + // Record only the first cancels error code + if (!iterator->cancel_request) { + iterator->cancel_request = true; + iterator->errorcode = errorcode; } + pthread_mutex_unlock(&(iterator->mutex_node_iterator)); +} + +typedef int (*THREAD_PROCESS_PROC)(struct work_item *, void *); + +static int process_copy(struct work_item *item, void *digest_store_path) +{ + return copy_file_with_dirs_if_needed(item->path, + (const char *)digest_store_path, + lcfs_node_get_payload(item->node), + true); +} + +static int process_compute(struct work_item *item, void *data) +{ + int buildflag = (int)(long)data; + return lcfs_node_set_from_content(item->node, AT_FDCWD, item->path, buildflag); +} + +struct thread_data { + THREAD_PROCESS_PROC proc; + struct work_collection *collection; + struct work_item_iterator *iterator; + void *data; +}; + +static void *thread_proc(void *data) +{ + struct thread_data *info = (struct thread_data *)data; + + while (true) { + struct work_item *item = + get_next_work_item(info->collection, info->iterator); + + if (!item) + return 0; + + if (!item->node) { + request_cancel(info->iterator, EINVAL); + return 0; + } + if (info->proc(item, info->data) != 0) { + request_cancel(info->iterator, errno); + return 0; + } + } return 0; } +static int execute_in_threads(const int requested_threads, + struct work_collection *collection, + THREAD_PROCESS_PROC proc, void *data) +{ + struct work_item_iterator iterator; + int ret = pthread_mutex_init(&iterator.mutex_node_iterator, NULL); + if (0 != ret) { + errno = ret; + return -1; + } + + iterator.current_item = 0; + iterator.errorcode = 0; + iterator.cancel_request = false; + + struct thread_data thread_info; + thread_info.data = data; + thread_info.proc = proc; + thread_info.collection = collection; + thread_info.iterator = &iterator; + + cleanup_free pthread_t *threads = NULL; + const int thread_count = requested_threads - 1; + if (thread_count >= 1) { + threads = calloc(thread_count, sizeof(pthread_t)); + if (threads == NULL) { + errno = ENOMEM; + return -1; + } + + for (int i = 0; i < thread_count; i++) { + ret = pthread_create(&threads[i], NULL, thread_proc, + &thread_info); + if (ret != 0) { + request_cancel(&iterator, ret); + for (int j = 0; j < i; ++j) { + // not checking return as it is already in an error case + pthread_join(threads[j], NULL); + } + errno = ret; + return -1; + } + } + } + + // Let this thread also process items instead of waiting for the worker threads + thread_proc(&thread_info); + + if (thread_count >= 1) { + for (int i = 0; i < thread_count; i++) { + ret = pthread_join(threads[i], NULL); + if (ret != 0) { + // set the error code and continue joining threads + request_cancel(&iterator, ret); + } + } + } + if (iterator.cancel_request) { + errno = iterator.errorcode; + } + return iterator.cancel_request ? -1 : 0; +} + +static int compute_digest(const int thread_count, struct lcfs_node_s *node, + const char *path, int buildflag) +{ + struct work_collection collection; + collection.items = NULL; + collection.capacity = 0; + collection.count = 0; + + if (construct_compute_data(node, &collection, path) < 0) { + return -1; + } + + int ret = execute_in_threads(thread_count, &collection, process_compute, + (void *)(long)buildflag); + cleanup_work_items(&collection); + + return ret; +} + +static int fill_store(const int thread_count, struct lcfs_node_s *node, + const char *path, const char *digest_store_path) +{ + struct work_collection collection; + collection.items = NULL; + collection.capacity = 0; + collection.count = 0; + + if (construct_copy_data(node, &collection, (char *)path) < 0) { + return -1; + } + + int ret = execute_in_threads(thread_count, &collection, process_copy, + (void *)digest_store_path); + cleanup_work_items(&collection); + + return ret; +} + static void digest_to_string(const uint8_t *csum, char *buf) { static const char hexchars[] = "0123456789abcdef"; @@ -943,6 +1218,11 @@ static void digest_to_string(const uint8_t *csum, char *buf) buf[j] = '\0'; } +static int get_cpu_count(void) +{ + return get_nprocs(); +} + static void usage(const char *argv0) { const char *bin = basename(argv0); @@ -958,8 +1238,10 @@ static void usage(const char *argv0) " --print-digest-only Print the digest of the image, don't write image\n" " --from-file The source is a dump file, not a directory\n" " --min-version=N Use this minimal format version (default=%d)\n" - " --max-version=N Use this maxium format version (default=%d)\n", - bin, LCFS_DEFAULT_VERSION_MIN, LCFS_DEFAULT_VERSION_MAX); + " --max-version=N Use this maxium format version (default=%d)\n" + " --threads=N Use this to calculate digest and copy files in threads (default=%d)\n", + bin, LCFS_DEFAULT_VERSION_MIN, LCFS_DEFAULT_VERSION_MAX, + get_cpu_count()); } int main(int argc, char **argv) @@ -1025,6 +1307,12 @@ int main(int argc, char **argv) flag: NULL, val: OPT_MIN_VERSION }, + { + name: "threads", + has_arg: required_argument, + flag: NULL, + val: OPT_THREADS + }, {}, }; struct lcfs_write_options_s options = { 0 }; @@ -1046,6 +1334,7 @@ int main(int argc, char **argv) long min_version = 0; long max_version = 0; char *end; + int threads = get_cpu_count(); #ifdef FUZZER #endif @@ -1095,6 +1384,18 @@ int main(int argc, char **argv) exit(EXIT_FAILURE); } break; + + case OPT_THREADS: + threads = strtol(optarg, &end, 10); + if (*optarg == 0 || *end != 0) { + fprintf(stderr, "Invalid threads count %s\n", optarg); + exit(EXIT_FAILURE); + } + if (threads <= 0) { + fprintf(stderr, "Invalid threads count %d\n", threads); + exit(EXIT_FAILURE); + } + break; case ':': fprintf(stderr, "option needs a value\n"); exit(EXIT_FAILURE); @@ -1181,12 +1482,21 @@ int main(int argc, char **argv) if (close_input) fclose(input); } else { - root = lcfs_build(AT_FDCWD, src_path, buildflags, &failed_path); + // Digest calculation and inline will be done in parallel + int buildflag_copy = buildflags; + buildflag_copy &= ~LCFS_BUILD_COMPUTE_DIGEST; + buildflag_copy &= ~LCFS_BUILD_BY_DIGEST; + buildflag_copy |= LCFS_BUILD_NO_INLINE; + + root = lcfs_build(AT_FDCWD, src_path, buildflag_copy, &failed_path); if (root == NULL) err(EXIT_FAILURE, "error accessing %s", failed_path); + if (compute_digest(threads, root, src_path, buildflags) < 0) + err(EXIT_FAILURE, "error computing digest %s", failed_path); + if (digest_store_path && - fill_store(root, src_path, digest_store_path) < 0) + fill_store(threads, root, src_path, digest_store_path) < 0) err(EXIT_FAILURE, "cannot fill store"); }