-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpzpaq.cpp
894 lines (803 loc) · 28.3 KB
/
pzpaq.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
/* pzpaq.cpp v0.01 - Parallel ZPAQ compressor
(C) 2011, Dell Inc. Written by Matt Mahoney
LICENSE
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation; either version 3 of
the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details at
Visit <http://www.gnu.org/copyleft/gpl.html>.
pzpaq is a parallel ZPAQ compatible compressor. It compresses
or decompresses multiple files in parallel for better speed.
It can also compress a file or a solid archive in smaller blocks
for better speed at some cost in compressed size.
Command interface is similar to compress, gzip, or bzip2.
See usage() below for brief description.
See http://mattmahoney.net/dc/pzpaq.html for complete documenation.
See http://mattmahoney.net/dc/zpaq.html for the latest version
of this software and for libzpaq which you will need to compile.
To compile in Linux:
g++ -O3 -DNDEBUG pzpaq.cpp libzpaq.cpp libzpaqo.cpp -lpthread
To compile in Windows
g++ -O3 -DNDEBUG pzpaq.cpp libzpaq.cpp libzpaqo.cpp -lpthreadGC2 -DX32
When compiled with -DX32, the program will not work with files over 2 GB.
For Windows you also need these files from pthreads-win32 from
http://sourceware.org/pthreads-win32/
libpthreadGC2.a in C:\mingw\lib or -L
pthread.h in C:\mingw\include or -I
sched.h
semaphore.h
pthreadGC2.dll in PATH (to run)
*/
#include "libzpaq.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <time.h>
#include <assert.h>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <string>
#include <vector>
void usage() {
fprintf(stderr,
"pzpaq 0.01 - Parallel ZPAQ compressor\n"
"(C) 2011, Dell Inc. Written by Matt Mahoney\n"
"This is free software under GPL v3. http://www.gnu.org/copyleft/gpl.html\n"
"\n"
"Usage: pzpaq [-options]... [files]...\n"
"Default is to compress, replacing each file with file.zpaq\n"
"If no files are specified, then compress stdin to stdout. Options:\n"
"-123 Compress fast, mid, or max (default -2 = mid)\n"
"-bN Compress in N byte blocks, -b0=infinite (default = size/threads)\n"
"-c Concatenate to standard output, keep input files\n"
"-d Decompress, replacing file.zpaq with file\n"
"-e Extract to current directory using saved names, keep input files\n"
"-h Help (print this message)\n"
"-k Keep (don't delete) input files\n"
"-l List compressed file contents\n"
"-mN Memory limit of N MB (default -m500)\n"
"-sS Suffix S1,S2... for temporary files (default -s.tmp)\n"
"-tN (De)compress blocks in parallel using N Threads (default -t2)\n"
"-v Verbose\n"
"-x Extract to original directory using saved paths, keep input files\n"
"-- Stop option processing\n"
#ifdef X32
"Does not work with files larger than 2 GB\n"
#endif
);
exit(1);
}
// If fseeko() and ftello() are not supported then compile
// with -X32 to use fseek() and ftell(). pzpaq will not work
// with files larger than 2 GB in this case.
#ifdef X32
#define fseeko(f,n,w) fseek(f,n,w)
#define ftello(f) ftell(f)
#endif
// Call f and check that the return code is 0
#define check(f) { \
int rc=f; \
if (rc) fprintf(stderr, "Line %d: %s: error %d\n", __LINE__, #f, rc); \
}
// signed size of a string or vector
template <typename T> int size(const T& x) {
return x.size();
}
// Options readable by all threads
int command='2'; // -123dexl (compress, decompress, list)
const int MIN_BOPT=0x1000; // minimum bopt
const int MAX_BOPT=0x7fffffff; // maximum bopt
int bopt=-1; // -b block size, 0 = infinite, -1 = default size/topt
bool copt=false; // -c (output to stdout)
bool kopt=false; // -k (keep input files)
int mopt=500; // -m memory limit in MB
const char* sopt=".tmp"; // -s (temp file extension)
int topt=2; // -t, at least 1 (number of threads)
bool verbose=false; // -v
// Possible job states. A thread is initialized as READY. When main()
// is ready to start the thread it is set to RUNNING and runs it. When
// the thread finishes, it sets its state to FINISHED, signals
// main (using cv, protected by mutex), and exits with a status
// of 0 or a pointer to an error message as a static string.
// main then waits on the thread, receives the return status, and
// updates the state to OK or ERR.
typedef enum {READY, RUNNING, FINISHED, ERR, OK} State;
pthread_cond_t cv=PTHREAD_COND_INITIALIZER; // to signal FINISHED
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER; // protects cv
// A filename and a size
struct FileSize {
const char* filename; // input file, "" for stdin
off_t size; // input size, -1 if unknown
FileSize(const char* f=0, int s=-1): filename(f), size(s) {}
};
// Instructions to thread to compress or decompress one block.
struct Job {
State state; // job state, protected by mutex
std::vector<FileSize> input; // list of files to input
std::string output; // output file, "" for stdout, saved names override
off_t start; // where to start input of first file
int memory; // how much memory is needed in MB (for scheduler)
int part; // position in sequence for concatenation, 0=first
pthread_t tid; // thread ID (for scheduler)
Job();
void print(int i) const;
};
// Initialize
Job::Job(): state(READY), start(0), memory(0), part(0) {
// tid is not initialized until state==RUNNING
}
// Print contents
void Job::print(int i=0) const {
fprintf(stderr, "Job %d: state=%d start=%1.0f memory=%d part=%d output=%s\n",
i, state, double(start), memory, part, output.c_str());
for (int j=0; j<size(input); ++j)
fprintf(stderr, " %s %1.0f\n", input[j].filename, double(input[j].size));
}
// Thread exit. A msg of 0 means OK and any other pointer means ERR.
void libzpaq::error(const char* msg) {
if (msg) fprintf(stderr, "pzpaq error: %s\n", msg);
throw msg;
}
// File for libzpaq (de)compression
struct File: public libzpaq::Reader, public libzpaq::Writer {
FILE* f;
int get() {return getc(f);}
void put(int c) {putc(c, f);}
File(FILE* f_=0): f(f_) {}
};
// To count bytes read or written
struct FileCount: public libzpaq::Reader, public libzpaq::Writer {
FILE* f;
double count;
FileCount(FILE* f_): f(f_), count(0) {}
int get() {int c=getc(f); count+=(c!=EOF); return c;}
void put(int c) {putc(c, f); count+=1;}
};
// To output to a string
struct StringWriter: public libzpaq::Writer {
std::string s;
void put(int c) {s+=char(c);}
};
// File that automatically computes size and checksum of each
// byte of input or output
struct FileSHA1: public libzpaq::Reader, public libzpaq::Writer {
FILE* f;
libzpaq::SHA1 sha1;
int get() {int c=getc(f); if (c!=EOF) sha1.put(c); return c;}
void put(int c) {sha1.put(c); putc(c, f);}
FileSHA1(FILE* f_=0): f(f_) {}
};
// Remove path from filename
std::string strip(const std::string& filename) {
for (int i=size(filename)-1; i>=0; --i) {
if (filename[i]=='/' || filename[i]=='\\' || (i==1 && filename[i]==':'))
return filename.substr(i+1);
}
return filename;
}
// Convert int to string
std::string itos(off_t x) {
std::string s;
if (x==0) return "0";
if (x<0) return "-"+itos(-x);
while (x>0) {
s=char(x%10+'0')+s;
x/=10;
}
return s;
}
// Append file2 to file1 and delete file2. Return true if the append
// is successful. "" means stdout, stdin.
bool append(const char* file1, const char* file2) {
if (verbose)
fprintf(stderr, "Appending to %s from %s ... ", file1, file2);
FILE* in=stdin;
if (file2 && *file2) in=fopen(file2, "rb");
if (!in) {
perror(file2);
return false;
}
FILE* out=stdout;
if (file1 && *file1) out=fopen(file1, "ab");
if (!out) {
perror(file1);
if (in!=stdin) fclose(in);
return false;
}
const int BUFSIZE=4096;
char buf[BUFSIZE];
int n;
while ((n=fread(buf, 1, BUFSIZE, in))>0)
fwrite(buf, 1, n, out);
if (verbose)
fprintf(stderr, "%1.0f\n", double(ftello(out)));
if (out!=stdout) fclose(out);
if (in!=stdin) fclose(in);
if (in!=stdin && remove(file2))
perror(file2);
return true;
}
// Decompress. The input is a list of files to decompress,
// a size for each, an output file name, and a starting offset for
// the first file. A size of -1 means that all blocks should be
// decompressed, or else just the first block of the first file
// starting at the specified offset. An input file name of ""
// means to read from standard input. An output file name of ""
// means standard output. If the command is -e or -x and not -c then
// filenames stored in the archive override the output filename.
// Decompresion fails and the rest of the job is abandoned under
// the following conditions: an output file cannot be created
// (for example the path does not exist), an input file is not readable,
// or the input is corrupted, or a bad checksum is detected,
// or no compressed input is found.
void decompress(const Job& job) {
// Decompress each file
for (int i=0; i<size(job.input); ++i) {
// Open input
File in(stdin);
const FileSize& fs=job.input[i];
if (!fs.filename) libzpaq::error("null filename");
if (fs.filename && fs.filename[i])
in.f=fopen(fs.filename, "rb");
if (!in.f) {
perror(fs.filename);
libzpaq::error("cannot read file");
}
// Find start of block in first file
if (i==0 && job.start>0 && fseeko(in.f, job.start, SEEK_SET))
libzpaq::error("fseeko");
// Decompress file
libzpaq::Decompresser d;
d.setInput(&in);
std::string output=job.output;
if (job.part) output+=sopt+itos(job.part);
File out(0);
while (d.findBlock()) {
StringWriter filename, comment;
while (d.findFilename(&filename)) {
d.readComment(&comment);
libzpaq::SHA1 sha1;
d.setSHA1(&sha1);
// Get new output filename
if (filename.s!="" && !copt && command!='d') {
if (command=='x')
output=filename.s;
else if (command=='e')
output=strip(filename.s);
if (verbose) {
fprintf(stderr, "Decompressing %s %s -> %s\n",
filename.s.c_str(), comment.s.c_str(), output.c_str());
}
if (out.f && out.f!=stdout) {
fclose(out.f);
out.f=0;
}
}
filename.s="";
comment.s="";
// Set output
if (!out.f) {
out.f=stdout;
if (output!="") {
out.f=fopen(output.c_str(), "wb");
if (!out.f) {
perror(output.c_str());
libzpaq::error("file creation failed");
}
}
}
d.setOutput(&out);
// Decompress segment
d.decompress();
if (verbose) {
fprintf(stderr, "%s -> %s %1.0f\n",
fs.filename, output.c_str(), sha1.size());
}
// Verify checksum
char sha1string[21];
d.readSegmentEnd(sha1string);
if (sha1string[0] && memcmp(sha1string+1, sha1.result(), 20)) {
fprintf(stderr, "%s -> %s checksum error\n",
fs.filename, output.c_str());
libzpaq::error("checksum mismatch");
}
}
// End of block
if (fs.size!=-1)
break;
}
// End of input file
if (out.f && out.f!=stdout)
fclose(out.f);
if (in.f && in.f!=stdin)
fclose(in.f);
if (!out.f) {
fprintf(stderr, "%s: ", fs.filename);
libzpaq::error("no compressed data found");
}
}
}
// Compress job.input to job.output in 1 block with each input file
// in a separate segment. For the special case of compressing from
// an unknown size and a block size specified in bopt, compress
// to multiple blocks of size bopt.
void compress(const Job& job) {
// Get output file name
std::string output=job.output;
if (job.part) output+=sopt+itos(job.part);
// Open output file
libzpaq::Compressor c;
FileCount out(stdout);
if (output!="") out.f=fopen(output.c_str(), "wb");
if (!out.f) {
perror(output.c_str());
libzpaq::error("output open failed");
}
c.setOutput(&out);
c.writeTag();
// Compress multiple files in one block, or multiple blocks if
// an input size is unknown and not finished.
for (bool done=false; !done;) {
c.startBlock(command-'0');
// Compress one segment per input file. Save filename if start is 0.
// The comment is file size or "size+start" if start > 0.
for (int i=0; i<size(job.input); ++i) {
if (job.start>0 && i==0)
c.startSegment(0,
(itos(job.input[i].size)+"+"+itos(job.start)).c_str());
else
c.startSegment(job.input[i].filename,
itos(job.input[i].size).c_str());
if (i==0)
c.postProcess();
// Open input file unless "" (stdin)
FileSHA1 in(stdin);
if (job.input[i].filename[0]) in.f=fopen(job.input[i].filename, "rb");
if (!in.f) {
perror(job.input[i].filename);
libzpaq::error("input open failed");
}
c.setInput(&in);
if (i==0 && job.start>0) {
if (fseeko(in.f, job.start, SEEK_SET))
libzpaq::error("fseeko failed");
}
if (verbose) {
fprintf(stderr, "Compressing %s", job.input[i].filename);
if (i==0 && job.start>0)
fprintf(stderr, "+%1.0f", double(job.start));
fprintf(stderr, " %1.0f -> %s\n",
double(job.input[i].size), output.c_str());
}
// Compress 1 block or to EOF if -b0
if (bopt>0 && job.input[i].size<0) {
c.compress(bopt);
done=in.sha1.size()<bopt;
}
else {
c.compress(job.input[i].size>bopt ? -1: int(job.input[i].size));
done=true;
}
if (verbose) {
fprintf(stderr, "%s %1.0f -> %s %1.0f\n", job.input[i].filename,
in.sha1.size(), output.c_str(), out.count);
}
c.endSegment(in.sha1.result());
if (in.f!=stdin) fclose(in.f);
}
c.endBlock();
}
// Close output
if (out.f!=stdout) fclose(out.f);
}
// List the contents of an archive to stdout
void list(const char* filename) {
FileCount in(stdin);
if (filename && *filename) {
printf("%s\n", filename);
in.f=fopen(filename, "rb");
if (!in.f) {
perror(filename);
return;
}
}
try {
libzpaq::Decompresser d;
in.count=1;
d.setInput(&in);
double memory=0;
StringWriter name, comment;
char s[21]; // checksum
for (int i=1; d.findBlock(&memory); ++i) {
printf("Block %d command %d needs %d MB\n",
i, d.getModel(), int((memory+999999.5)/1000000));
while (d.findFilename(&name)) {
d.readComment(&comment);
d.readSegmentEnd(s);
if (s[0])
printf(" %02x%02x%02x%02x ",
s[1]&255, s[2]&255, s[3]&255, s[4]&255);
else
printf(" ");
printf("%s %s -> %1.0f\n",
name.s.c_str(), comment.s.c_str(), in.count);
name.s="";
comment.s="";
in.count=0;
}
}
}
catch (const char* msg) {}
if (in.f!=stdin) fclose(in.f);
printf("\n");
}
// Worker thread
void *thread(void *arg) {
// Do the work and receive status in msg
Job* job=(Job*)arg;
const char* result=0; // error message unless OK
try {
if (isdigit(command))
compress(*job);
else if (command=='d' || command=='x' || command=='e')
decompress(*job);
}
catch (const char* msg) {
result=msg;
}
// Let controlling thread know we're done and the result
check(pthread_mutex_lock(&mutex));
job->state=FINISHED;
check(pthread_cond_signal(&cv));
check(pthread_mutex_unlock(&mutex));
return (void*)result;
}
int main(int argc, char** argv) {
// Start timer
time_t start_time=time(0);
// Process arguments
bool opt=true; // false after --
std::vector<FileSize> files; // list of files and sizes
for (int i=1; i<argc; ++i) {
if (opt && argv[i][0]=='-') {
bool arg=false; // option has an argument?
for (int j=1; !arg && argv[i][j]; ++j) {
switch(argv[i][j]) {
case '1':
case '2':
case '3':
case 'd':
case 'e':
case 'x':
case 'l': command=argv[i][j]; break;
case 'b': bopt=atoi(argv[i]+j+1); arg=true; break;
case 'c': copt=true; break;
case 'k': kopt=true; break;
case 'm': mopt=atoi(argv[i]+j+1); arg=true; break;
case 's': sopt=argv[i]+j+1; arg=true; break;
case 't': topt=atoi(argv[i]+j+1); arg=true; break;
case 'v': verbose=true; break;
case '-': opt=false; break;
default: usage(); // -h or others
}
}
}
else
files.push_back(FileSize(argv[i]));
}
if (topt<1) usage();
if (size(files)==0) {
topt=1; // can't multithread from stdin
files.push_back(""); // add stdin to list
}
kopt |= copt || command=='e' || command=='x';
// set stdin and stdout to binary mode in Windows
#ifndef unix
if (command!='l')
setmode(1, O_BINARY); // stdout
setmode(0, O_BINARY); // stdin
#endif
// Get file sizes, -1 = unknown. Remove nonexistent files
for (int i=0; i<size(files); ++i) {
assert(files[i].filename);
if (files[i].filename[0]) { // not stdin?
FILE* f=fopen(files[i].filename, "rb");
if (!f) { // remove nonexistent files
perror(files[i].filename);
for (int j=i+1; j<size(files); ++j)
files[j-1]=files[j];
files.pop_back();
}
else {
if (!fseeko(f, 0, SEEK_END))
files[i].size=ftello(f);
if (files[i].size==-1)
perror(files[i].filename);
fclose(f);
}
}
}
// Get default block size. If any sizes are unknown then
// default is -b0 (no blocks)
if (bopt<0 && isdigit(command)) {
off_t sum=0;
for (int i=0; i<size(files); ++i) {
if (files[i].size<0) { // unknown size
sum=-1;
break;
}
sum+=files[i].size;
}
if (sum<0)
bopt=0;
else {
off_t t=(sum+topt-1)/topt;
bopt=t<MAX_BOPT ? int(t) : MAX_BOPT;
if (bopt<MIN_BOPT) bopt=MIN_BOPT;
}
}
// Print processed command line
if (verbose) {
fprintf(stderr, "%s -%c -b%d %s %s -m%d -s%s -t%d -v",
argv[0], command, bopt, copt?"-c":"", kopt?"-k":"", mopt, sopt, topt);
for (int i=0; i<size(files); ++i)
fprintf(stderr, " %s", files[i].filename);
fprintf(stderr, "\n\n");
}
// List
if (command=='l') {
for (int i=0; i<size(files); ++i)
list(files[i].filename);
return 0;
}
// List of jobs
std::vector<Job> jobs;
// Schedule decompression for commands -d, -e, or -x.
// stdin is 1 job. Otherwise the input files are scanned for blocks
// and each block of each input file is one job.
// job.start is the offset of the start of the block.
// job.input has one file. It is "" for stdin or else
// the filename. The size is not important.
// If the first segment is not named or ignored by -d or -c then
// job.output and job.part determine the output file name.
// job.part is the distance in blocks to the block in
// the file that names the segment (at least 1) and
// job.output is that name. The name comes from the
// last named segment, with a path for -x or without for -e.
// If no named segment, or names are ignored by -d then
// job.output is derived by removing the .zpaq extension
// from the input filename, or appending sopt (.tmp) if there
// is no .zpaq extension, or is "" if -c or input is "" (stdin).
if (command=='d' || command=='e' || command=='x') {
int part=0;
std::string output;
if (copt) output=""; // -c
for (int i=0; i<size(files); ++i) {
try {
// stdin
if (files[i].size<0 || !files[i].filename || !files[i].filename[0]) {
Job job;
job.input.push_back(files[i]);
jobs.push_back(job);
}
else {
// Open input file
File in(fopen(files[i].filename, "rb"));
if (!in.f)
perror(files[i].filename);
else {
// Get initial output name by dropping .zpaq or adding .tmp
if (!copt) {
int l=strlen(files[i].filename);
if (l>5 && !strcmp(files[i].filename+l-5, ".zpaq"))
output=std::string(files[i].filename).substr(0, l-5);
else if (l>0)
output=std::string(files[i].filename)+sopt;
if (command=='e')
output=strip(output);
}
// Scan input for blocks
off_t offset=0;
libzpaq::Decompresser d;
d.setInput(&in);
double memory;
StringWriter filename;
if (!copt) part=0;
while (d.findBlock(&memory)) {
// Schedule a job for this block
Job job;
job.input.push_back(files[i]);
job.start=offset;
job.output=output;
job.memory=int((memory+999999.5)/1000000);
job.part=part;
// Update output by finding the last named segment
bool first_segment=true;
while (d.findFilename(&filename)) {
d.readComment();
d.readSegmentEnd();
offset=ftello(in.f)+1; // start of next block after EOB
if (filename.s!="" && command!='d' && !copt) {
if (command=='e')
output=strip(filename.s);
else if (command=='x')
output=filename.s;
part=0;
if (first_segment) {
job.part=0;
job.output=output;
}
}
first_segment=false;
filename.s="";
}
++part;
jobs.push_back(job);
}
fclose(in.f);
}
}
}
// In case of error, go on to the next input file
catch (const char* msg) {
fprintf(stderr, "%s: %s\n", files[i].filename, msg);
}
}
}
// Schedule compression according to -c, -b (copt, bopt)
// If -c -b0, then job is all input
// If -c -b, then a job is 1 block, splitting input files
// If -b0, then a job is 1 file
// If -b then a job is 1 block of 1 file
// Unknown file sizes like stdin are treated like size 0
if (isdigit(command)) {
const int memory[]={38, 112, 247}; // command -> MB needed
int fn=0; // number of files scheduled
off_t len=0; // number of bytes of files[fn] scheduled
int part=0; // number of jobs since start of file
while (fn<size(files)) { // until all input is scheduled
Job job; // Schedule 1 job per loop
job.start=len;
job.part=part++;
job.memory=memory[command-'1'];
if (!copt) job.output=files[fn].filename;
if (job.output!="") job.output+=".zpaq";
// If -c then add files to job until block is full
// else job is 1 block or rest of file whichever is smaller.
// If -b0 then block has infinite size.
// If file size is unknown then pretend it is 0.
for (off_t remaining=bopt-(bopt==0); remaining && fn<size(files);) {
job.input.push_back(files[fn]);
job.input.back().size-=len;
// Remaining space in block is at least as big as rest of file?
if (bopt==0 || remaining>=job.input.back().size) { // add it
remaining-=job.input.back().size;
++fn;
len=0;
if (!copt) part=0;
}
else { // fill block with part of file
len+=job.input.back().size=remaining;
remaining=0;
}
if (!copt)
break;
}
jobs.push_back(job);
}
}
// Print list of jobs
if (verbose) {
for (int i=0; i<size(jobs); ++i)
jobs[i].print(i);
}
// Loop until all jobs return OK or ERR: start a job whenever one
// is eligible. If none is eligible then wait for one to finish and
// try again. If none are eligible and none are running then it is
// an error.
int memory_count=0; // MB in use, not to exceed mopt
int thread_count=0; // number RUNNING, not to exceed topt
int job_count=0; // number of jobs with state OK or ERR
pthread_attr_t attr; // thread joinable attribute
check(pthread_attr_init(&attr));
check(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE));
// Aquire lock on jobs[i].state.
// Threads can access only while waiting on a FINISHED signal.
check(pthread_mutex_lock(&mutex));
while(job_count<size(jobs)) {
// If there is more than 1 thread then run the biggest jobs first
// that satisfies the memory bound. If 1 then take the next ready job
// that satisfies the bound. If no threads are running, then ignore
// the memory bound.
int bi=-1; // find a job to start
if (thread_count<topt) {
for (int i=0; i<size(jobs); ++i) {
if (jobs[i].state==READY
&& (thread_count==0 || jobs[i].memory+memory_count<=mopt)
&& (bi<0 || jobs[i].input[0].size>jobs[bi].input[0].size)) {
bi=i;
if (topt==1) break;
}
}
}
// If found then run it
if (bi>=0) {
jobs[bi].state=RUNNING;
++thread_count;
memory_count+=jobs[bi].memory;
check(pthread_create(&jobs[bi].tid, &attr, thread, &jobs[bi]));
}
// If no jobs can start then wait for one to finish
else {
if (thread_count<1) { // no jobs to wait on?
fprintf(stderr, "Not enough memory, try larger -m\n");
break;
}
check(pthread_cond_wait(&cv, &mutex)); // wait
// Join any finished threads. Usually that is the one
// that signaled it, but there may be others.
for (int i=0; i<size(jobs); ++i) {
if (jobs[i].state==FINISHED) {
void* status;
check(pthread_join(jobs[i].tid, &status));
jobs[i].state=status?ERR:OK;
++job_count;
--thread_count;
memory_count-=jobs[i].memory;
}
}
}
}
check(pthread_mutex_unlock(&mutex));
// Append temporary files if both tmp and destination are OK.
// If destination is ERR and tmp is OK then delete tmp.
for (int i=0; i<size(jobs); ++i) {
const int part=jobs[i].part;
if (part>0 && part<=i) {
std::string tmp=jobs[i].output+sopt+itos(part);
if (jobs[i].state==OK) {
if (jobs[i-part].state==OK)
append(jobs[i].output.c_str(), tmp.c_str());
else {
if (verbose)
fprintf(stderr, "Deleting %s\n", tmp.c_str());
if (remove(tmp.c_str()))
perror(tmp.c_str());
}
}
}
}
// Delete input files if there was no error in any output part
if (!kopt) {
for (int i=0; i<size(jobs); ++i) {
if (jobs[i].state==OK) {
for (int j=0; j<size(jobs[i].input); ++j) {
if ((j>0 || jobs[i].start==0) && jobs[i].input[j].filename[0]) {
if (verbose)
fprintf(stderr, "Deleting %s\n", jobs[i].input[j].filename);
if (remove(jobs[i].input[j].filename))
perror(jobs[i].input[j].filename);
}
}
}
}
}
// Report unfinished jobs and time
if (verbose) {
for (int i=0; i<size(jobs); ++i) {
if (jobs[i].state!=OK) {
fprintf(stderr, "failed: ");
jobs[i].print(i);
}
}
fprintf(stderr, "%1.0f seconds\n", double(time(0)-start_time));
}
return 0;
}