diff --git a/src/ccode/job.c b/src/ccode/job.c index 0d63982..731761e 100755 --- a/src/ccode/job.c +++ b/src/ccode/job.c @@ -6,31 +6,23 @@ #include #include +#include + +typedef struct _worker_job { + jobs* j; + int id; +} worker_job; static void* worker_thread(void* arg) { - jobs* j = (jobs*)arg; - while (1) { - if (j->finished >= j->total) { - break; - } - int i; - bool e = false; /* search for job */ - for (i = 0; i < j->total; ++i) { - if (j->jobs[i].callback != NULL) { - void (*callback)(void*, ...) = j->jobs[i].callback; - j->jobs[i].callback = NULL; - callback((void*)j->jobs[i].ctx, (void*)j->jobs[i].args); - j->finished++; - j->current--; - e = true; - break; - } - } - if(!e){ - break; - } + worker_job* jb= (worker_job*)arg; + jobs *j = jb->j; + int i; + for (i = jb->id; i < j->total; i+=j->parallel) { + fdebug("Run job: %d\t%d\t\t%d\t%d\n", j->total, j->parallel, i, jb->id); + j->jobs[i].callback((void*)j->jobs[i].ctx, (void*)j->jobs[i].args); + j->finished++; } - pthread_exit(NULL); + return NULL; } void jobs_unref(jobs *j) { @@ -49,18 +41,23 @@ void jobs_add(jobs* j, void (*callback)(void*, ...), void* ctx, void* args, ...) j->jobs[j->total++] = new_job; j->current++; pthread_cond_signal(&j->cond); + fdebug("Add job:%d\t%d\t%d\n", j->total, j->max, j->parallel); } } void jobs_run(jobs* j) { - pthread_t* threads = (pthread_t*)malloc(j->parallel * sizeof(pthread_t)); + pthread_t* threads = (pthread_t*)calloc(j->parallel, sizeof(pthread_t)); int i; for (i = 0; i < j->parallel; ++i) { - pthread_create(&threads[i], NULL, worker_thread, (void*)j); + worker_job *jb = (worker_job*)calloc(1,sizeof(worker_job)); + jb->j = j; + jb->id = i; + pthread_create(&threads[i], NULL, worker_thread, (void*)jb); } for (i = 0; i < j->parallel; ++i) { pthread_join(threads[i], NULL); } + fdebug("Done jobs:%d\t%d\t%d\n", j->finished, j->max, j->parallel); free(threads); } @@ -73,5 +70,6 @@ jobs* jobs_new() { j->parallel = JOB_PARALLEL; j->jobs = (job*)malloc(j->max * sizeof(job)); pthread_cond_init(&j->cond, NULL); + fdebug("New jobs"); return j; } diff --git a/src/data/repository.vala b/src/data/repository.vala index 42b3757..9f69eb4 100755 --- a/src/data/repository.vala +++ b/src/data/repository.vala @@ -325,22 +325,80 @@ public string[] list_available_packages () { return ret.get(); } + +private string[] index_a; +private void create_index_single(repo_index_ctx *ctx) { + array a = new array(); + var tar = new archive (); + var yaml = new yamlfile (); + string file = ctx->file; + string path = ctx->path; + // name lists + string[] srcs = {}; + string[] pkgs = {}; + // tmp variable + string nam = ""; + string dat = ""; + string rel = ""; + string md5sum = calculate_md5sum (file); + string sha256sum = calculate_sha256sum (file); + uint64 size = filesize (file); + tar.load (file); + file = file[path.length:]; + info ("Index: " + file); + string metadata = tar.readfile ("metadata.yaml"); + string ymparea = yaml.get_area (metadata, "ymp"); + if (!get_bool ("ignore-check")) { + if (yaml.has_area (ymparea, "source")) { + dat = yaml.get_area (ymparea, "source"); + nam = yaml.get_value (dat, "name"); + rel = yaml.get_value (dat, "release"); + if (nam + "-" + rel in srcs) { + warning (_ ("A source has multiple versions: %s").printf (nam)); + } + srcs += nam + "-" + rel; + } + if (yaml.has_area (ymparea, "package")) { + dat = yaml.get_area (ymparea, "package"); + nam = yaml.get_value (dat, "name"); + rel = yaml.get_value (dat, "release"); + if (nam + "-" + rel in pkgs) { + warning (_ ("A package has multiple versions: %s").printf (nam)); + } + pkgs += nam + "-" + rel; + } + } + foreach (string line in ssplit (ymparea, "\n")) { + if (line == "" || line == null) { + continue; + } + a.add(" %s\n".printf(line)); + } + a.add(" md5sum: %s\n".printf(md5sum)); + a.add(" sha256sum: %s\n".printf(sha256sum)); + a.add(" size: %s\n".printf(size.to_string ())); + a.add(" uri: %s\n\n".printf(file)); + index_a[ctx->number] = a.get_string(); +} + +private class repo_index_ctx { + public string path; + public string file; + public int number; +} + //DOC: `string create_index_data (string fpath):` //DOC: generate remote repository index data public string create_index_data (string fpath) { - array a = new array(); + var a = new array(); a.add("index:\n"); - string md5sum = ""; - string sha256sum = ""; - uint64 size=0; + string path = srealpath (fpath); string index_name = get_value ("name"); if (index_name == "") { error_add (_ ("Index name is not defined. Please use --name=xxx")); error (1); } - var tar = new archive (); - var yaml = new yamlfile (); a.add(" name: %s\n".printf(index_name)); foreach (string file in find (path)) { if (endswith (file, ".ymp")) { @@ -360,56 +418,25 @@ public string create_index_data (string fpath) { } } } - // name lists - string[] srcs = {}; - string[] pkgs = {}; - // tmp variable - string nam = ""; - string dat = ""; - string rel = ""; - string metadata = ""; - string ymparea = ""; + + jobs j = new jobs(); + int i=0; foreach (string file in find (path)) { - if (endswith (file, ".ymp")) { - md5sum = calculate_md5sum (file); - sha256sum = calculate_sha256sum (file); - size = filesize (file); - tar.load (file); - file = file[path.length:]; - info ("Index: " + file); - metadata = tar.readfile ("metadata.yaml"); - ymparea = yaml.get_area (metadata, "ymp"); - if (!get_bool ("ignore-check")) { - if (yaml.has_area (ymparea, "source")) { - dat = yaml.get_area (ymparea, "source"); - nam = yaml.get_value (dat, "name"); - rel = yaml.get_value (dat, "release"); - if (nam + "-" + rel in srcs) { - warning (_ ("A source has multiple versions: %s").printf (nam)); - } - srcs += nam + "-" + rel; - } - if (yaml.has_area (ymparea, "package")) { - dat = yaml.get_area (ymparea, "package"); - nam = yaml.get_value (dat, "name"); - rel = yaml.get_value (dat, "release"); - if (nam + "-" + rel in pkgs) { - warning (_ ("A package has multiple versions: %s").printf (nam)); - } - pkgs += nam + "-" + rel; - } - } - foreach (string line in ssplit (ymparea, "\n")) { - if (line == "" || line == null) { - continue; - } - a.add(" %s\n".printf(line)); - } - a.add(" md5sum: %s\n".printf(md5sum)); - a.add(" sha256sum: %s\n".printf(sha256sum)); - a.add(" size: %s\n".printf(size.to_string ())); - a.add(" uri: %s\n\n".printf(file)); + if (!endswith (file, ".ymp")) { // skip for non ymp files + continue; } + // create context for index + repo_index_ctx *ctx = new repo_index_ctx(); + ctx->path = path; // index path + ctx->file= file; // file path + ctx->number=i; // file index + j.add((void*)create_index_single, ctx, ctx); // run async + i++; // increase index + } + index_a = new string[i]; // allocate string array + j.run(); // run all jobs + for(int ii = 0;ii < i;ii++){ + a.add(index_a[ii]); // copy all data into array library } return a.get_string(); }