forked from aquaskyline/MICA-aligner
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMICA-PE-WriteThread.h
214 lines (181 loc) · 8.32 KB
/
MICA-PE-WriteThread.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
//
// MICA-PE-WriteThread.h
//
// mica
//
// Copyright (C) 2013, HKU
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//
///////////////////////////////////////////////////////////////////////////////////////////////
#ifndef __MICA_PE_WRITETHREAD_H__
#define __MICA_PE_WRITETHREAD_H__
///////////////////////////////////////
// Including Standard Libraries
///////////////////////////////////////
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <omp.h>
#include <pthread.h>
#include <unistd.h>
#include <errno.h>
///////////////////////////////////////
// Including SOAP2-DP Libraries
///////////////////////////////////////
#include "2bwt-flex/ListConsumer.h"
#include "2bwt-flex/2BWT-PEAlgnmt.h"
#include "2bwt-flex/2bwt-lib/Timing.h"
#include "2bwt-flex/utilities/Logging.h"
#include "2bwt-flex/utilities/SimpleQueue.h"
///////////////////////////////////////
// Including MIC Libraries
///////////////////////////////////////
#include "MIC-SRA2BWTMdl.h"
#include "MIC-SRAAlgnmt.h"
#include "MIC-PEAlgnmt.h"
#include "MIC-DPAlgnmt.h"
#include "MemMan.h"
#include "MicMappingQuality.h"
#include "MICA-PE-ReadThread.h"
///////////////////////////////////////
// DEBUG FLAGS
///////////////////////////////////////
//Define the below parameter to output the write thread
//audit events into standard output
//#define DEBUG_MICA_PEWT_LOGGING
//Define the below parameter to output the write thread
//read handling event into standard output
//#define DEBUG_MICA_PEWT_READ_HANDLING
#define MICA_PE_WRITE_THREAD_MAX_BATCH 3
#define MICA_PE_WRITE_THREAD_HEALTH_OK 0
#define MICA_PE_WRITE_THREAD_HEALTH_DEPLETED 1
#define MICA_PE_WRITE_THREAD_PAYLOAD_OUTPUT_TYPE_UNKNOWN 0
#define MICA_PE_WRITE_THREAD_PAYLOAD_OUTPUT_TYPE_MIC_MIX 1
#define MICA_PE_WRITE_THREAD_PAYLOAD_OUTPUT_TYPE_SKIP_MIC 2
#define MICA_PE_WRITE_HEALTH_STRING_MAX_LENGTH 20
typedef struct PEWTArgs {
SRAArguments * sraArgTemplate;
PEArguments * peArgTemplate;
unsigned char * charMap;
unsigned int maxNumReadPerCore;
int maxNumOfMICThreads;
int InputAlignmentModel;
} PEWTArgs;
typedef struct PEWTPerfStats {
unsigned long long readNumHandled;
unsigned long long readNumByHandlers[MIC_PE_OUTPUT_STATUS_COUNT];
double processTime;
} PEWTPerfStats;
typedef struct PEATPerfStats {
unsigned long long readNumHandled;
unsigned long long readNumAligned;
double processTime;
} PEATPerfStats;
typedef struct PEAlgnmtThreadArg {
void * writeThread;
int threadIdx;
PEATPerfStats algnmtThreadStats;
} PEAlgnmtThreadArg;
typedef struct PEWriteThread {
// Global MICA-PE Argument Configuration
PEWTArgs pewtArguments;
// Statistics
unsigned long long consPairCount;
unsigned long long consPairRead;
unsigned long long cpuConsSaCount;
unsigned long long cpuConsReadCount;
unsigned long long consReadCount;
double totalOutputTime;
// POSIX Thread
int numAlignmentThread;
pthread_t outputThreadBody;
pthread_t * alignmentThreadBody;
PEAlgnmtThreadArg * algnmtThreadArg;
// POSIX Mutex and Condition Variables
pthread_mutex_t mutexThreadHealth;
pthread_mutex_t mutexFreeQueue;
pthread_cond_t condFreeQueue;
pthread_mutex_t mutexAlignmentQueue;
pthread_cond_t condAlignmentQueue;
pthread_mutex_t mutexOutputQueue;
pthread_cond_t condOutputQueue;
////////////////////////////////////////
// Thread Payload - Configuration
////////////////////////////////////////
unsigned int queryInBatch[MICA_PE_WRITE_THREAD_MAX_BATCH];
unsigned int batchSize[MICA_PE_WRITE_THREAD_MAX_BATCH];
unsigned int numThreads[MICA_PE_WRITE_THREAD_MAX_BATCH];
unsigned int firstReadIdx[MICA_PE_WRITE_THREAD_MAX_BATCH];
int algnFuncReasonFlag[MICA_PE_WRITE_THREAD_MAX_BATCH];
////////////////////////////////////////
// Thread Payload - Input
////////////////////////////////////////
char * readNameBufferFrame[MICA_PE_WRITE_THREAD_MAX_BATCH];
unsigned char * readBodyBufferFrame[MICA_PE_WRITE_THREAD_MAX_BATCH];
uint16_t * readLengthBufferFrame[MICA_PE_WRITE_THREAD_MAX_BATCH];
char * readQualityBufferFrame[MICA_PE_WRITE_THREAD_MAX_BATCH];
char * mateNameBufferFrame[MICA_PE_WRITE_THREAD_MAX_BATCH];
unsigned char * mateBodyBufferFrame[MICA_PE_WRITE_THREAD_MAX_BATCH];
uint16_t * mateLengthBufferFrame[MICA_PE_WRITE_THREAD_MAX_BATCH];
char * mateQualityBufferFrame[MICA_PE_WRITE_THREAD_MAX_BATCH];
PEReadOutput * peBatchOutput[MICA_PE_WRITE_THREAD_MAX_BATCH];
PEInputParam * peInputParam[MICA_PE_WRITE_THREAD_MAX_BATCH];
////////////////////////////////////////
// Thread Payload - Output
////////////////////////////////////////
uint8_t outputType[MICA_PE_WRITE_THREAD_MAX_BATCH];
uint16_t * occCount[MICA_PE_WRITE_THREAD_MAX_BATCH];
uint8_t * outputBufferStatus[MICA_PE_WRITE_THREAD_MAX_BATCH];
unsigned int * outputBufferFrame[MICA_PE_WRITE_THREAD_MAX_BATCH];
MICSRAOccMetadata * outputBufferMeta[MICA_PE_WRITE_THREAD_MAX_BATCH];
MICDPOccurrence * dpOutputBufferFrame[MICA_PE_WRITE_THREAD_MAX_BATCH];
PEMappingQuality * mappingQualities[MICA_PE_WRITE_THREAD_MAX_BATCH];
SRAOCCCollector * cpuSraOccCollector[MICA_PE_WRITE_THREAD_MAX_BATCH];
DPOCCCollector * cpuDpOccCollector[MICA_PE_WRITE_THREAD_MAX_BATCH];
// Ring Buffer Information & Thread Health
int bufferSize;
SimpleQueue * freeQueue;
SimpleQueue * alignmentQueue;
SimpleQueue * outputQueue;
uint8_t threadHealth;
int parentControllerIdx;
// Health string and stats
char healthString[MICA_PE_WRITE_HEALTH_STRING_MAX_LENGTH];
PEWTPerfStats performanceStats;
} PEWriteThread;
PEWriteThread * PEWTCreate(unsigned int maxNumReadPerCore, int maxNumOfMICThreads, int numWriteThreadBuffer, int numAlignmentThread, int parentControllerIdx);
void PEWTRegisterArguments(PEWriteThread * writeThread,
SRAArguments * sraArgs, PEArguments * peArgs,
int InputAlignmentModel,
unsigned char * charMap);
void PEWTCloneBuffer(PEWriteThread * writeThread,
unsigned int numThreads, unsigned int batchSize, unsigned int firstReadIdx, unsigned int queryInBatch,
char * readNameBufferFrame, unsigned char * readBodyBufferFrame, uint16_t * readLengthBufferFrame,
char * mateNameBufferFrame, unsigned char * mateBodyBufferFrame, uint16_t * mateLengthBufferFrame,
char * readQualityBufferFrame, char * mateQualityBufferFrame,
PEInputParam * peInputParam, PEReadOutput * peBatchOutput,
uint8_t payloadOutputType,
uint16_t * occCount, uint8_t * outputBufferStatus, unsigned int * outputBufferFrame,
MICSRAOccMetadata * outputBufferMeta, MICDPOccurrence * dpOutputBufferFrame, PEMappingQuality * mappingQualities,
int algnFuncReasonFlag);
void PEWTRoll(PEWriteThread * writeThread);
void PEWTSignal(PEWriteThread * writeThread, int threadHealth);
void PEWTThreadJoin(PEWriteThread * writeThread);
void PEWTPollHealthString(PEWriteThread * writeThread);
char * PEWTGetHealthText(PEWriteThread * writeThread);
void PEWTFree(PEWriteThread * writeThread);
#endif