Skip to content

Commit

Permalink
Merge pull request #14 from Puriney/master
Browse files Browse the repository at this point in the history
💪 robust design of workflow
  • Loading branch information
Puriney authored Mar 22, 2018
2 parents cb5900e + 348a3d9 commit 4296672
Showing 1 changed file with 84 additions and 85 deletions.
169 changes: 84 additions & 85 deletions celseq2/workflow/celseq2.snakemake
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
######################################################################
## Dependencies ##
from celseq2.helper import join_path, base_name, dir_name, print_logger
from celseq2.helper import rmfolder, mkfolder
from celseq2.helper import rmfolder, mkfolder, is_nonempty_file
from celseq2.helper import cook_sample_sheet, popen_communicate
from celseq2.prepare_annotation_model import cook_anno_model
from celseq2.count_umi import count_umi, _flatten_umi_set
Expand Down Expand Up @@ -53,6 +53,8 @@ FASTQ_QUAL_MIN_OF_BC = config.get('FASTQ_QUAL_MIN_OF_BC', None) # 10
CUT_LENGTH = config.get('CUT_LENGTH', None) # 35
## Alignment ##
ALIGNER = config.get('ALIGNER', None) # 'bowtie2', 'star'
assert (ALIGNER), 'Error: Specify aligner.'
assert (ALIGNER in ['bowtie2', 'star']), 'Error: Unknown aligner.'

## UMI Count ##
ALN_QUAL_MIN = config.get('ALN_QUAL_MIN', None) # 0
Expand Down Expand Up @@ -117,12 +119,15 @@ workdir: DIR_PROJ

rule all:
input:
csv = expand(join_path(DIR_PROJ, SUBDIR_EXPR, '{expid}', 'expr.csv'),
expid=list(set(sample_list))),
hdf = expand(join_path(DIR_PROJ, SUBDIR_EXPR, '{expid}', 'expr.h5'),
expid=list(set(sample_list))),
report = expand(join_path(DIR_PROJ, SUBDIR_REPORT, '{itemid}',
'alignment_'+ALIGNER+'.csv'), itemid=item_names),
'_done_UMI',
'_done_report',
'_done_annotation',
'_done_umimatrix_per_experiment',
run:
if glob.glob('celseq2_job*.sh*'):
shell('mv -f celseq2_job*.sh* {}'.format(SUBDIR_QSUB))



rule COUNT_MATRIX:
input:
Expand All @@ -147,7 +152,7 @@ rule COUNT_MATRIX:
shell(cmd)
print_logger('UMI-count matrix is saved at {}'.format(input.csv))


'''
rule celseq2:
message: 'Finished Entire Pipeline.'
input:
Expand Down Expand Up @@ -183,8 +188,6 @@ rule celseq2:
output:
touch('_done_celseq2')
run:
if glob.glob('celseq2_job*.sh*'):
shell('mv -f celseq2_job*.sh* {}'.format(SUBDIR_QSUB))
if ALIGNER == 'star':
shell('rm {}'.format('_done_star_genome_loaded'))
print('Free memory loaded by STAR', flush=True)
Expand All @@ -194,7 +197,7 @@ rule celseq2:
shell(cmd)

print_logger('Expression UMI matrix is saved at {}'.format(input.csv))

'''

rule setup_dir:
input: SAMPLE_TABLE_FPATH
Expand All @@ -221,29 +224,27 @@ rule setup_dir:
## HT-seq Count UMI ##
rule cook_annotation:
input:
flag = '_done_setupdir',
gff = GFF,
'_done_setupdir',
output:
anno = join_path(DIR_PROJ, SUBDIR_ANNO,
base_name(GFF) + '.pickle'),
flag = '_done_annotation',
message: 'Cooking Annotation'
run:
_ = cook_anno_model(input.gff, feature_atrr=FEATURE_ID,
_ = cook_anno_model(GFF, feature_atrr=FEATURE_ID,
feature_type=FEATURE_CONTENT,
stranded=True,
dumpto=output.anno,
verbose=verbose)
shell('touch {output.flag}')


# Combo-demultiplexing
rule combo_demultiplexing:
input:
flag2 = '_done_annotation',
# flag1 = '_done_setupdir',
flag1 = '_done_setupdir',
output:
dynamic(join_path(DIR_PROJ, SUBDIR_FASTQ, '{itemid}', '{bc}.fastq')),
fq = dynamic(join_path(DIR_PROJ, SUBDIR_FASTQ, '{itemid}', '{bc}.fastq')),
message: 'Performing combo-demultiplexing'
threads: len(item_names)
run:
Expand Down Expand Up @@ -283,8 +284,6 @@ rule combo_demultiplexing:


## Alignment ##
assert (ALIGNER), 'Error: Specify aligner.'
assert (ALIGNER in ['bowtie2', 'star']), 'Error: Unknown aligner.'

if ALIGNER == 'bowtie2':
rule align_bowtie2:
Expand Down Expand Up @@ -323,10 +322,7 @@ if ALIGNER == 'bowtie2':


if ALIGNER == 'star':
assert STAR_INDEX_DIR
rule star_load_genome:
input:
flag = '_done_annotation',
output:
flag = '_done_star_genome_loaded',
message: 'Loading genome to memory for STAR'
Expand All @@ -338,15 +334,14 @@ if ALIGNER == 'star':
cmd += '&& echo loaded >> {output.flag} '
shell(cmd)


if ALIGNER == 'star':
assert STAR
rule align_star:
input:
flag = '_done_star_genome_loaded',
flag = rules.star_load_genome.output.flag, #'_done_star_genome_loaded',
fq = join_path(DIR_PROJ, SUBDIR_FASTQ, '{itemid}', '{bc}.fastq'),
output:
sam = join_path(DIR_PROJ, SUBDIR_ALIGN, '{itemid}', '{bc}.sam'),
log = join_path(DIR_PROJ, SUBDIR_LOG, '{itemid}',
'Align-STAR_Cell-{bc}.log'),
starsam = join_path(DIR_PROJ, SUBDIR_ALIGN, '{itemid}', '{bc}',
'Aligned.out.sam'),
starlog = join_path(DIR_PROJ, SUBDIR_ALIGN, '{itemid}', '{bc}',
Expand All @@ -368,60 +363,28 @@ if ALIGNER == 'star':
shell(cmd)
shell('ln -s {output.starsam} {output.sam} ')
shell('touch -h {output.sam} ')
shell('ln -s {output.starlog} {output.log} ')
shell('touch -h {output.log} ')

rule parse_star_log:
input:
starlog = join_path(DIR_PROJ, SUBDIR_ALIGN, '{itemid}', '{bc}',
'Log.final.out'),
log = join_path(DIR_PROJ, SUBDIR_LOG, '{itemid}',
'Align-STAR_Cell-{bc}.log'),
output:
report = join_path(DIR_PROJ, SUBDIR_LOG, '{itemid}', ALIGNER,
'{bc}.pickle'),
run:
with open(input.starlog, 'r') as fin:
with open(input.log, 'r') as fin:
log_content = fin.readlines()
df = parse_star_report('\t'.join(log_content))
pickle.dump(df, open(output.report, 'wb'))


rule REPORT_ALIGNMENT_LOG:
input:
report = expand(join_path(DIR_PROJ, SUBDIR_REPORT, '{itemid}',
'alignment_'+ALIGNER+'.csv'),
itemid=item_names),

rule report_alignment_log:
input:
'_done_umimatrix_per_experiment',
df = dynamic(join_path(DIR_PROJ, SUBDIR_LOG, '{itemid}', ALIGNER,
'{bc}.pickle')),
output:
report = expand(join_path(DIR_PROJ, SUBDIR_REPORT, '{itemid}',
'alignment_'+ALIGNER+'.csv'), itemid=item_names),
run:
for item in item_names:
logs_per_item = []
logs_name_item = []
report_item = join_path(DIR_PROJ, SUBDIR_REPORT, item,
'alignment_'+ALIGNER+'.csv')

logs_fpath_item = [x for x in input.df if item in x]
for log_fpath in logs_fpath_item:
log_df = pickle.load(open(log_fpath, 'rb'))
logs_per_item.append(log_df)

log_name = base_name(log_fpath)
logs_name_item.append(log_name)

_ = merge_reports(reports=logs_per_item,
report_names=logs_name_item,
aligner_name=ALIGNER,
savetocsv=report_item)


rule count_umi:
input:
gff = join_path(DIR_PROJ, SUBDIR_ANNO,
base_name(GFF) + '.pickle'),
flag = '_done_annotation',
sam = join_path(DIR_PROJ, SUBDIR_ALIGN, '{itemid}', '{bc}.sam'),
output:
umicnt = join_path(DIR_PROJ, SUBDIR_UMI_CNT, '{itemid}', '{bc}.pkl'),
Expand Down Expand Up @@ -457,7 +420,7 @@ rule summarize_umi_matrix_per_item:
hdf_item = expand(join_path(DIR_PROJ, SUBDIR_EXPR,
'{expid}', '{itemid}', 'expr.h5'), zip,
expid=sample_list, itemid=item_names),
flag = join_path(DIR_PROJ, '_done_umimatrix_per_item'),
flag = '_done_umimatrix_per_item',
run:
_, all_genes = pickle.load(open(input.gff, 'rb'))
all_genes = sorted(all_genes)
Expand Down Expand Up @@ -541,32 +504,68 @@ rule summarize_umi_matrix_per_experiment:
shell('touch {output.flag} ')


rule summarize_alignment_diagnose:
rule REPORT_ALIGNMENT_LOG:
input:
report = expand(join_path(DIR_PROJ, SUBDIR_REPORT, '{itemid}',
'alignment_'+ALIGNER+'.csv'),
itemid=item_names),

rule report_alignment_log:
input:
aln_diag = dynamic(join_path(DIR_PROJ, SUBDIR_DIAG,
'{itemid}', '{bc}.pkl')),
df = dynamic(join_path(DIR_PROJ, SUBDIR_LOG, '{itemid}', ALIGNER,
'{bc}.pickle')),
output:
# Diagnose of alignment
alignment = expand(join_path(DIR_PROJ, SUBDIR_DIAG,
'{itemid}', 'alignment_diagnose.csv'),
itemid=item_names),
report = expand(join_path(DIR_PROJ, SUBDIR_REPORT, '{itemid}',
'alignment_'+ALIGNER+'.csv'), itemid=item_names),
flag = '_done_report',

run:
# item_id -> dict(cell_bc -> Counter(align))
item_aln_mat = defaultdict(dict)
for item in item_names:
logs_per_item = []
logs_name_item = []
report_item = join_path(DIR_PROJ, SUBDIR_REPORT, item,
'alignment_'+ALIGNER+'.csv')

for f in input.aln_diag:
bc_name = base_name(f) # BC-1-xxx
item_id = base_name(dir_name(f)) # item-1
item_aln_mat[item_id][bc_name] = pickle.load(open(f, 'rb'))
logs_fpath_item = [x for x in input.df if item in x]
for log_fpath in logs_fpath_item:
log_df = pickle.load(open(log_fpath, 'rb'))
logs_per_item.append(log_df)

log_name = base_name(log_fpath)
logs_name_item.append(log_name)

for item_id, aln_dict in item_aln_mat.items():
for bc, cnt in aln_dict.items():
aln_dict[bc] = pd.Series([cnt[x] for x in aln_diagnose_item],
index=aln_diagnose_item)
_ = merge_reports(reports=logs_per_item,
report_names=logs_name_item,
aligner_name=ALIGNER,
savetocsv=report_item)
shell('touch {output.flag} ')

aln_df = pd.DataFrame(aln_dict, index=aln_diagnose_item).fillna(0)
aln_df.to_csv(join_path(DIR_PROJ, SUBDIR_DIAG,
item_id, 'alignment_diagnose.csv'))
# rule summarize_alignment_diagnose:
# input:
# aln_diag = dynamic(join_path(DIR_PROJ, SUBDIR_DIAG,
# '{itemid}', '{bc}.pkl')),
# output:
# # Diagnose of alignment
# alignment = expand(join_path(DIR_PROJ, SUBDIR_DIAG,
# '{itemid}', 'alignment_diagnose.csv'),
# itemid=item_names),
# run:
# # item_id -> dict(cell_bc -> Counter(align))
# item_aln_mat = defaultdict(dict)

# for f in input.aln_diag:
# bc_name = base_name(f) # BC-1-xxx
# item_id = base_name(dir_name(f)) # item-1
# item_aln_mat[item_id][bc_name] = pickle.load(open(f, 'rb'))

# for item_id, aln_dict in item_aln_mat.items():
# for bc, cnt in aln_dict.items():
# aln_dict[bc] = pd.Series([cnt[x] for x in aln_diagnose_item],
# index=aln_diagnose_item)

# aln_df = pd.DataFrame(aln_dict, index=aln_diagnose_item).fillna(0)
# aln_df.to_csv(join_path(DIR_PROJ, SUBDIR_DIAG,
# item_id, 'alignment_diagnose.csv'))


rule cleanall:
Expand Down

0 comments on commit 4296672

Please sign in to comment.