-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstats_by_project_dag.py
121 lines (109 loc) · 5.21 KB
/
stats_by_project_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
"""
Airflow DAG to run stats by project giving projectID, recipe parameters
"""
with DAG(
dag_id="stats_by_project",
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False,
tags=["stats_by_project"],
) as dag:
"""
Read the input arguments such as:
{"project_directory":"/igo/staging/FASTQ/RUTH_0141_AH27NGDSX5/Project_13586_B","recipe":"RNASeq_PolyA", "species":"human"}
"""
def run_stats(ds, **kwargs):
import scripts.calculate_stats
import scripts.cellranger
import subprocess
import scripts.cellranger_multi
import os
import scripts.get_total_reads_from_demux
project_directory = kwargs["params"]["project_directory"]
recipe = kwargs["params"]["recipe"]
species = kwargs["params"]["species"]
print("running stats for project in this directory {}".format(project_directory))
# main process of calling stats here
# let's go ahead and run stats by project
# add multi process, use recipe as 10X_multi, the project folder has to be gene expression project folder
project_id = project_directory.split("/")[-1]
if recipe == "10X_multi":
# copy the multi config from shared drive to cluster
cmd = "cp -R {}{} {}".format(scripts.cellranger_multi.ORIGIN_DRIVE_LOCATION, project_id[8:], scripts.cellranger_multi.DRIVE_LOCATION)
print(cmd)
subprocess.run(cmd, shell=True)
# add file checking for ch and fb because lims request info not accurate at this moment
file_lst = []
file_prefix = scripts.cellranger_multi.DRIVE_LOCATION + project_id[8:]
file_lst = os.listdir(file_prefix)
print(file_lst)
ch = False
fb = False
for i in file_lst:
file_path = file_prefix + "/" + i
file_type = scripts.cellranger_multi.check_file_type(file_path)
if file_type == "ch":
ch = True
ch_file_name = "{}/{}_cell_hash.xlsx".format(file_prefix, project_id)
os.rename(file_path, ch_file_name)
print(f"File renamed from {file_path} to {ch_file_name}")
elif file_type == "fb":
fb = True
fb_file_name = "{}/{}_feature_barcoding.xlsx".format(file_prefix, project_id)
os.rename(file_path, fb_file_name)
print(f"File renamed from {file_path} to {fb_file_name}")
os.chdir(scripts.cellranger_multi.STATS_AREA)
# gather sample set info from LIMS for each sample
archive = False
if "delivery" in project_directory:
archive = True
sample_list_ori = os.listdir(project_directory)
sample_list = []
for sample in sample_list_ori:
# remove Sample_ prefix
sample_list.append(sample[7:])
for sample in sample_list:
sample_set = scripts.cellranger_multi.gather_sample_set_info(sample)
cmd = "bsub -J {}_{}_multi -o {}_{}_multi.out /igo/work/nabors/tools/venvpy3/bin/python /igo/work/igo/igo-demux/scripts/cellranger_multi.py ".format(project_id, sample, project_id, sample)
# update sample_set based on file checking result
if sample_set["ch"] is not None:
sample_name = sample_set["ch"]
elif sample_set["fb"] is not None:
sample_name = sample_set["fb"]
del sample_set["fb"]
del sample_set["ch"]
if ch:
sample_set["ch"] = sample_name
if fb:
sample_set["fb"] = sample_name
for key, value in sample_set.items():
if value is not None:
cmd = cmd + "-{}={} ".format(key, value)
cmd = cmd + "-genome={}".format(species)
if archive:
cmd = cmd + " -archive"
print(cmd)
subprocess.run(cmd, shell=True)
elif "SC_Chromium" in recipe or "ST_Visium" in recipe:
scripts.cellranger.launch_cellranger_by_project_location(project_directory, recipe, species)
elif "Nanopore" in recipe:
cmd = "bsub -J ont_stats_{} -n 16 -M 16 /igo/work/nabors/tools/venvpy3/bin/python /igo/work/igo/igo-demux/scripts/ont_stats.py {}".format(project_id, project_directory)
print(cmd)
subprocess.run(cmd, shell=True)
elif recipe == "demux_stats":
scripts.get_total_reads_from_demux.by_project_location(project_directory)
else:
scripts.calculate_stats.main([project_directory, recipe, species])
return "Stats done for project in this directory {}".format(project_directory)
run_stats_by_project = PythonOperator(
task_id='stats_by_project',
python_callable=run_stats,
provide_context=True,
email_on_failure=True,
email='skigodata@mskcc.org',
dag=dag
)
run_stats_by_project