-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsnv_indel_call.py
executable file
·134 lines (112 loc) · 5.51 KB
/
snv_indel_call.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3
"""
VCF filter; make calls based on supporting depth
"""
from __future__ import print_function
import argparse
import vcf
import vcf.parser
import sys
import numpy
import scipy.stats
def call_from_depths(totdepth, evidence, error_rate, prob_threshold):
"""
Null hypothesis: counts are consistent with a binomial probabilty of error_rate
Returns: True if we can reject the null hypothesis w/ probability 1-prob_threshold
False otherwise
"""
altdepth = sum(evidence)
prob = 1.-scipy.stats.binom.cdf(altdepth, totdepth, error_rate)
return prob < prob_threshold
def reject_from_strandbias(totdepth, evidence, prob_threshold, mindepth=30):
if len(evidence) == 1:
return False # can't reject; don't have strand information
if sum(evidence) < mindepth:
return False
prob = 1.-scipy.stats.binom.cdf(max(evidence), sum(evidence), 0.66)
return prob < prob_threshold
def germline_hom_het(n_depth, n_evidence, t_depth, t_evidence, alpha):
# consistent w/ homozygous or het? True
if scipy.stats.binom_test(sum(n_evidence), n_depth, 1.) >= alpha:
return True
if scipy.stats.binom_test(sum(n_evidence), n_depth, .95) >= alpha:
return True
if scipy.stats.binom_test(sum(n_evidence), n_depth, 0.5) >= alpha:
return True
if scipy.stats.binom_test(sum(n_evidence), n_depth, 0.45) >= alpha:
return True
return False
def reject_from_normal_evidence_vs_noise(n_depth, n_evidence, t_depth, t_evidence, error_rate):
phat_tumour = sum(t_evidence)*1./t_depth
phat_normal = sum(n_evidence)*1./n_depth
if phat_normal == 0.:
return False
if phat_normal >= phat_tumour/2:
return True
# is the normal evidence more consistent with the tumour evidence (over a factor of 2, in case of LOH),
# or noise?
oddsratio, prob_normal = scipy.stats.fisher_exact([[sum(t_evidence)/2, sum(n_evidence)], [t_depth-sum(t_evidence)/2, n_depth-sum(n_evidence)]])
prob_noise = 1.-scipy.stats.binom.cdf(sum(n_evidence), n_depth, error_rate)
if prob_normal >= prob_noise:
return True
return False
def reject_from_normal_evidence(n_depth, n_evidence, t_depth, t_evidence, alpha):
"""
Null hypothesis: germline results are consistent with being drawn from same
binomial distribution as tumour.
Returns: True if we cannot reject the null w/ probability 1-alpha
False otherwise
Use fisher exact test to determine if these are consistent
"""
phat_tumour = sum(t_evidence)*1./t_depth
phat_normal = sum(n_evidence)*1./n_depth
if phat_normal == 0.:
return False
if phat_normal >= phat_tumour/2:
return True
oddsratio, prob = scipy.stats.fisher_exact([[sum(t_evidence)/2, sum(n_evidence)], [t_depth-sum(t_evidence)/2, n_depth-sum(n_evidence)]])
if prob < alpha:
return False
else:
return True
return True
def filter_calls():
parser = argparse.ArgumentParser( description='Set genotypes based on DP4 scores')
parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin)
parser.add_argument('-o', '--output', type=argparse.FileType('w'), default=sys.stdout)
parser.add_argument('-e', '--error', type=float, default=0.01, help='Error rate')
parser.add_argument('-t', '--callthreshold', type=float, default=0.02, help='Max prob to call')
parser.add_argument('-s', '--strandbias', type=float, default=0.10, help='minimum strand ratio')
parser.add_argument('-m', '--mindepth', type=int, default=10, help='minimum total depth')
parser.add_argument('-g', '--germlineprob', type=float, default=0.02, help='Maximum prob of germline')
args = parser.parse_args()
vcf_reader = vcf.Reader(args.input)
vcf_reader.infos['Validation_status'] = vcf.parser._Info(id='Validation_status', num='.', type='String',
desc='Status from validation data',
source=None, version=None)
vcf_writer = vcf.Writer(args.output, vcf_reader)
for record in vcf_reader:
normal_reads = int(record.INFO['NormalReads'][0])
tumour_reads = int(record.INFO['TumourReads'][0])
normal_evidence = [int(x) for x in record.INFO['NormalEvidenceReads']]
tumour_evidence = [int(x) for x in record.INFO['TumourEvidenceReads']]
if min(normal_reads, tumour_reads) < args.mindepth:
record.FILTER = ['LOWDEPTH']
record.INFO['Validation_status'] = 'LOWDEPTH'
vcf_writer.write_record(record)
continue
record.FILTER = []
if sum(tumour_evidence) < 7 or not call_from_depths(tumour_reads, tumour_evidence, args.error, args.callthreshold):
record.FILTER = ['NOTSEEN']
if (tumour_reads > 0) > 0 and reject_from_strandbias(tumour_reads, tumour_evidence, args.strandbias):
record.FILTER += ['STRANDBIAS']
if germline_hom_het(normal_reads, normal_evidence, tumour_reads, tumour_evidence, args.germlineprob):
record.FILTER += ['GERMLINE']
elif reject_from_normal_evidence_vs_noise(normal_reads, normal_evidence, tumour_reads, tumour_evidence, args.error):
record.FILTER += ['NORMALEVIDENCE']
if len(record.FILTER) == 0:
record.FILTER = ['PASS']
record.INFO['Validation_status'] = ','.join(record.FILTER)
vcf_writer.write_record(record)
if __name__ == "__main__":
filter_calls()