Skip to content

Commit

Permalink
speedup job library and index create is now parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
sulincix committed May 30, 2024
1 parent df0cdc3 commit 89473bd
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 78 deletions.
46 changes: 22 additions & 24 deletions src/ccode/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,23 @@
#include <stdarg.h>

#include <jobs.h>
#include <logger.h>

typedef struct _worker_job {
jobs* j;
int id;
} worker_job;

static void* worker_thread(void* arg) {
jobs* j = (jobs*)arg;
while (1) {
if (j->finished >= j->total) {
break;
}
int i;
bool e = false; /* search for job */
for (i = 0; i < j->total; ++i) {
if (j->jobs[i].callback != NULL) {
void (*callback)(void*, ...) = j->jobs[i].callback;
j->jobs[i].callback = NULL;
callback((void*)j->jobs[i].ctx, (void*)j->jobs[i].args);
j->finished++;
j->current--;
e = true;
break;
}
}
if(!e){
break;
}
worker_job* jb= (worker_job*)arg;
jobs *j = jb->j;
int i;
for (i = jb->id; i < j->total; i+=j->parallel) {
fdebug("Run job: %d\t%d\t\t%d\t%d\n", j->total, j->parallel, i, jb->id);
j->jobs[i].callback((void*)j->jobs[i].ctx, (void*)j->jobs[i].args);
j->finished++;
}
pthread_exit(NULL);
return NULL;
}

void jobs_unref(jobs *j) {
Expand All @@ -49,18 +41,23 @@ void jobs_add(jobs* j, void (*callback)(void*, ...), void* ctx, void* args, ...)
j->jobs[j->total++] = new_job;
j->current++;
pthread_cond_signal(&j->cond);
fdebug("Add job:%d\t%d\t%d\n", j->total, j->max, j->parallel);
}
}

void jobs_run(jobs* j) {
pthread_t* threads = (pthread_t*)malloc(j->parallel * sizeof(pthread_t));
pthread_t* threads = (pthread_t*)calloc(j->parallel, sizeof(pthread_t));
int i;
for (i = 0; i < j->parallel; ++i) {
pthread_create(&threads[i], NULL, worker_thread, (void*)j);
worker_job *jb = (worker_job*)calloc(1,sizeof(worker_job));
jb->j = j;
jb->id = i;
pthread_create(&threads[i], NULL, worker_thread, (void*)jb);
}
for (i = 0; i < j->parallel; ++i) {
pthread_join(threads[i], NULL);
}
fdebug("Done jobs:%d\t%d\t%d\n", j->finished, j->max, j->parallel);
free(threads);
}

Expand All @@ -73,5 +70,6 @@ jobs* jobs_new() {
j->parallel = JOB_PARALLEL;
j->jobs = (job*)malloc(j->max * sizeof(job));
pthread_cond_init(&j->cond, NULL);
fdebug("New jobs");
return j;
}
135 changes: 81 additions & 54 deletions src/data/repository.vala
Original file line number Diff line number Diff line change
Expand Up @@ -325,22 +325,80 @@ public string[] list_available_packages () {
return ret.get();
}


private string[] index_a;
private void create_index_single(repo_index_ctx *ctx) {
array a = new array();
var tar = new archive ();
var yaml = new yamlfile ();
string file = ctx->file;
string path = ctx->path;
// name lists
string[] srcs = {};
string[] pkgs = {};
// tmp variable
string nam = "";
string dat = "";
string rel = "";
string md5sum = calculate_md5sum (file);
string sha256sum = calculate_sha256sum (file);
uint64 size = filesize (file);
tar.load (file);
file = file[path.length:];
info ("Index: " + file);
string metadata = tar.readfile ("metadata.yaml");
string ymparea = yaml.get_area (metadata, "ymp");
if (!get_bool ("ignore-check")) {
if (yaml.has_area (ymparea, "source")) {
dat = yaml.get_area (ymparea, "source");
nam = yaml.get_value (dat, "name");
rel = yaml.get_value (dat, "release");
if (nam + "-" + rel in srcs) {
warning (_ ("A source has multiple versions: %s").printf (nam));
}
srcs += nam + "-" + rel;
}
if (yaml.has_area (ymparea, "package")) {
dat = yaml.get_area (ymparea, "package");
nam = yaml.get_value (dat, "name");
rel = yaml.get_value (dat, "release");
if (nam + "-" + rel in pkgs) {
warning (_ ("A package has multiple versions: %s").printf (nam));
}
pkgs += nam + "-" + rel;
}
}
foreach (string line in ssplit (ymparea, "\n")) {
if (line == "" || line == null) {
continue;
}
a.add(" %s\n".printf(line));
}
a.add(" md5sum: %s\n".printf(md5sum));
a.add(" sha256sum: %s\n".printf(sha256sum));
a.add(" size: %s\n".printf(size.to_string ()));
a.add(" uri: %s\n\n".printf(file));
index_a[ctx->number] = a.get_string();
}

private class repo_index_ctx {
public string path;
public string file;
public int number;
}

//DOC: `string create_index_data (string fpath):`
//DOC: generate remote repository index data
public string create_index_data (string fpath) {
array a = new array();
var a = new array();
a.add("index:\n");
string md5sum = "";
string sha256sum = "";
uint64 size=0;

string path = srealpath (fpath);
string index_name = get_value ("name");
if (index_name == "") {
error_add (_ ("Index name is not defined. Please use --name=xxx"));
error (1);
}
var tar = new archive ();
var yaml = new yamlfile ();
a.add(" name: %s\n".printf(index_name));
foreach (string file in find (path)) {
if (endswith (file, ".ymp")) {
Expand All @@ -360,56 +418,25 @@ public string create_index_data (string fpath) {
}
}
}
// name lists
string[] srcs = {};
string[] pkgs = {};
// tmp variable
string nam = "";
string dat = "";
string rel = "";
string metadata = "";
string ymparea = "";

jobs j = new jobs();
int i=0;
foreach (string file in find (path)) {
if (endswith (file, ".ymp")) {
md5sum = calculate_md5sum (file);
sha256sum = calculate_sha256sum (file);
size = filesize (file);
tar.load (file);
file = file[path.length:];
info ("Index: " + file);
metadata = tar.readfile ("metadata.yaml");
ymparea = yaml.get_area (metadata, "ymp");
if (!get_bool ("ignore-check")) {
if (yaml.has_area (ymparea, "source")) {
dat = yaml.get_area (ymparea, "source");
nam = yaml.get_value (dat, "name");
rel = yaml.get_value (dat, "release");
if (nam + "-" + rel in srcs) {
warning (_ ("A source has multiple versions: %s").printf (nam));
}
srcs += nam + "-" + rel;
}
if (yaml.has_area (ymparea, "package")) {
dat = yaml.get_area (ymparea, "package");
nam = yaml.get_value (dat, "name");
rel = yaml.get_value (dat, "release");
if (nam + "-" + rel in pkgs) {
warning (_ ("A package has multiple versions: %s").printf (nam));
}
pkgs += nam + "-" + rel;
}
}
foreach (string line in ssplit (ymparea, "\n")) {
if (line == "" || line == null) {
continue;
}
a.add(" %s\n".printf(line));
}
a.add(" md5sum: %s\n".printf(md5sum));
a.add(" sha256sum: %s\n".printf(sha256sum));
a.add(" size: %s\n".printf(size.to_string ()));
a.add(" uri: %s\n\n".printf(file));
if (!endswith (file, ".ymp")) { // skip for non ymp files
continue;
}
// create context for index
repo_index_ctx *ctx = new repo_index_ctx();
ctx->path = path; // index path
ctx->file= file; // file path
ctx->number=i; // file index
j.add((void*)create_index_single, ctx, ctx); // run async
i++; // increase index
}
index_a = new string[i]; // allocate string array
j.run(); // run all jobs
for(int ii = 0;ii < i;ii++){
a.add(index_a[ii]); // copy all data into array library
}
return a.get_string();
}

0 comments on commit 89473bd

Please sign in to comment.