Skip to content

Commit c150f8e

Browse files
committed
started alignment
Former-commit-id: 08032a3da12d833fbbb72d701a942f5655322b7c
1 parent b15b9a8 commit c150f8e

10 files changed

+362
-2
lines changed

Makefile

+5
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,8 @@ test:
2626
# Targets for installation
2727
install:
2828
$(PYTHON_BIN)/python setup.py install
29+
30+
# Targets for running fixture
31+
runfixture:
32+
rm -rf fixtures/output
33+
-spark-submit $(CURDIR)/apps/brisera_align.py $(CURDIR)/fixtures/s_suis.ser $(CURDIR)/fixtures/100k.ser $(CURDIR)/fixtures/output

apps/brisera_align.py

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
"""
2+
Spark application that takes a preprocessed query file and DNA sequence
3+
and uses seed-and-reduce (e.g. BLAST) to find alignments in a distributed
4+
fashion (similar to CloudBurst's implementation on Hadoop).
5+
6+
Spark implements RDDs - an efficient way of cacheing repeated data in
7+
memory, saving some of the overhead of disk IO that is requied in Hadoop,
8+
as such, Spark is a much faster processing platform for distributed
9+
alignments than Hadoop and Cloudburst.
10+
"""
11+
12+
##########################################################################
13+
## Imports
14+
##########################################################################
15+
16+
import sys
17+
import brisera
18+
19+
from brisera.utils import timeit
20+
from brisera.config import settings
21+
from pyspark import SparkConf, SparkContext
22+
23+
@timeit
24+
def run_brisera_alignment(sc, refpath, qrypath, outpath):
25+
"""
26+
Runs the complete alignment
27+
"""
28+
# Execute the alignments
29+
alignments, adelta = brisera.align_all(sc, refpath, qrypath)
30+
31+
# Filter best alignments
32+
if settings.filter_align:
33+
alignments, fdelta = brisera.filter_alignments(sc, alignments)
34+
else:
35+
fdelta = 0
36+
37+
# Write alignments to disk
38+
alignments.saveAsTextFile(outpath)
39+
40+
return alignments, adelta, fdelta
41+
42+
if __name__ == "__main__":
43+
44+
if len(sys.argv) != 4:
45+
sys.stderr.write("Usage: convert_fasta.py refpath qrypath outpath\n")
46+
sys.stderr.write(" all other settings are stored in brisera.yaml\n")
47+
sys.exit(-1)
48+
49+
conf = SparkConf().setAppName("Brisera Alignment")
50+
sc = SparkContext(conf=conf)
51+
52+
refpath = sys.argv[1]
53+
qrypath = sys.argv[2]
54+
outpath = sys.argv[3]
55+
56+
if settings.redundancy < 1:
57+
raise brisera.ImproperlyConfigured("Minimum redundancy is 1")
58+
59+
if settings.max_read_len > settings.overlap:
60+
raise brisera.ImproperlyConfigured("Increase overlap for %i length reads"
61+
" and reconvert FASTA file.", settings.max_read_len)
62+
63+
result, delta = run_brisera_alignment(sc, refpath, qrypath, outpath)
64+
alignments, adelta, fdelta = result
65+
print "Total execution time: %0.3f seconds" % delta
66+
print " Alignment time: %0.3f seconds" % adelta
67+
print " Filtering time: %0.3f seconds" % fdelta

brisera/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,7 @@
44
"""
55

66
__version__ = "1.0"
7+
8+
from brisera.exceptions import *
9+
from brisera.align import align_all
10+
from brisera.filter import filter_alignments

brisera/align.py

+169
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
"""
2+
Implements MerReduce alignment (seed-and-reduce) in a distributed fashion
3+
"""
4+
5+
##########################################################################
6+
## Imports
7+
##########################################################################
8+
9+
from brisera.utils import *
10+
from brisera.records import *
11+
from brisera.config import settings
12+
from brisera.exceptions import *
13+
14+
N = '.'
15+
16+
##########################################################################
17+
## MerAlignment
18+
##########################################################################
19+
20+
class MerAlignment(object):
21+
"""
22+
Performs both map and reduce alignment akin to CloudBurst
23+
"""
24+
25+
def __init__(self, **kwargs):
26+
setting = lambda name: kwargs.get(name, getattr(settings, name))
27+
28+
self.min_read_len = setting('min_read_len')
29+
self.max_read_len = setting('max_read_len')
30+
self.seed_len = setting('seed_len')
31+
self.flank_len = setting('flank_len')
32+
self.k = setting('k')
33+
self.redundancy = setting('redundancy')
34+
35+
def parse_record(self, data):
36+
key, val = data # Expand the key, value pair
37+
record = deserialize_record(val)
38+
sequence = record[0]
39+
offset = record[1]
40+
is_last = record[2]
41+
seqlen = len(sequence)
42+
43+
return key, sequence, offset, is_last, seqlen
44+
45+
def map_reference(self, arg):
46+
"""
47+
Input (id, (sequence, offset, tag))
48+
Yields tuples:
49+
(mer, (id, pos, tag, left, right, r))
50+
for each seed in the sequences that are passed in
51+
if tag = 0, also output the reverse complement sequences
52+
"""
53+
54+
key, sequence, offset, is_last, seqlen = self.parse_record(arg)
55+
56+
start = 0
57+
if offset != 0:
58+
# Not the first chunk, shift for room on left flank
59+
start = settings.overlap + 1 - self.flank_len - self.seed_len
60+
offset += start
61+
62+
# stop so the last mer will fit
63+
end = seqlen - self.seed_len + 1
64+
65+
if not is_last:
66+
# If not the last chunk, stop so the right flank fits as well
67+
end -= self.flank_len
68+
69+
# Emit the mers starting at every position
70+
for idx in xrange(start, end):
71+
72+
seed = sequence[start:self.seed_len]
73+
if N in seed:
74+
continue
75+
76+
offset += 1
77+
start += 1
78+
79+
leftstart = start - self.flank_len
80+
if leftstart < 0:
81+
leftstart = 0
82+
leftlen = start-leftstart
83+
84+
rightstart = start + self.seed_len
85+
rightend = rightstart + self.flank_len
86+
if rightend > seqlen:
87+
rightend = seqlen
88+
rightlen = rightend-rightstart
89+
90+
seed = sequence[start:start+self.seed_len]
91+
if self.redundancy > 1 and repseed(sequence, start, self.seed_len):
92+
for rdx in xrange(self.redundancy):
93+
r = rdx % self.redundancy
94+
yield (seed, (key, offset, is_last, leftstart, leftlen, rightstart, rightlen, r))
95+
else:
96+
yield (seed, (key, offset, is_last, leftstart, leftlen, rightstart, rightlen, 0))
97+
98+
def map_queries(self, arg):
99+
"""
100+
Input (id, (sequence, offset, tag))
101+
Yields tuples:
102+
(mer, (id, pos, tag, left, right, r))
103+
for each seed in the sequences that are passed in
104+
if tag = 0, also output the reverse complement sequences
105+
"""
106+
key, sequence, offset, is_last, seqlen = self.parse_record(arg)
107+
108+
if seqlen < self.min_read_len:
109+
raise ReadLengthException("read length %i < minimum read length %i", seqlen, self.min_read_len)
110+
elif seqlen > self.max_read_len:
111+
raise ReadLengthException("read length %i > maximum read length %i", seqlen, self.max_read_len)
112+
113+
numN = sum(1 for char in sequence if char == N)
114+
115+
for rc in xrange(2):
116+
117+
if numN > self.k:
118+
break
119+
120+
if rc == 1:
121+
# Reverse complement the sequence
122+
sequence = revc(sequence)
123+
is_rc = True
124+
else:
125+
is_rc = False
126+
127+
# emit non-overlapping mers
128+
for idx in xrange(0, seqlen, self.seed_len):
129+
seed = sequence[idx:idx+self.seed_len]
130+
if N in seed:
131+
continue
132+
133+
rightstart = idx+self.seed_len
134+
rightlen = seqlen - rightstart
135+
136+
if self.redundancy > 1 and repseed(sequence, idx, self.seed_len):
137+
r = key % self.redundancy
138+
yield (seed, (key, idx, is_last, 0, idx, rightstart, rightlen, r))
139+
else:
140+
yield (seed, (key, idx, is_last, 0, idx, rightstart, rightlen, 0))
141+
142+
##########################################################################
143+
## Spark Functionality
144+
##########################################################################
145+
146+
@timeit
147+
def align_all(sc, refpath, qrypath):
148+
"""
149+
Returns an RDD of alignments (no writes to disk)
150+
"""
151+
reference = sc.sequenceFile(refpath)
152+
queries = sc.sequenceFile(qrypath)
153+
alignment = MerAlignment()
154+
155+
# Perform mapping
156+
reference = reference.flatMap(alignment.map_reference)
157+
return reference
158+
159+
if __name__ == '__main__':
160+
from brisera.convert import *
161+
# path = fixture('s_suis.fa', 'cloudburst')
162+
path = fixture('100k.fa', 'cloudburst')
163+
chunker = FastaChunker(path)
164+
aligner = MerAlignment()
165+
for chunk in chunker.convert():
166+
# for record in aligner.map_reference(chunk):
167+
for record in aligner.map_queries(chunk):
168+
print record
169+
# break

brisera/config.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class BriseraConfiguration(confire.Configuration):
4444
max_read_len = 36
4545
k = 3
4646
allow_diff = False
47-
fliter_align = True
47+
filter_align = True
4848
block_size = 128
4949
redundancy = 16
5050

brisera/convert.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,19 @@
22
Handles the conversion of a FASTA sequence into a sequence format
33
"""
44

5+
##########################################################################
6+
## Imports
7+
##########################################################################
8+
59
import cPickle
610

711
from brisera.utils import fasta
812
from brisera.config import settings
13+
from brisera.records import serialize_record
14+
15+
##########################################################################
16+
## Chunker for RDDs
17+
##########################################################################
918

1019
class FastaChunker(object):
1120

@@ -62,7 +71,7 @@ def convert(self):
6271

6372
for idx, seq in self:
6473
for record in self.chunk(seq):
65-
yield (idx, cPickle.dumps(record, cPickle.HIGHEST_PROTOCOL))
74+
yield (idx, serialize_record(record))
6675

6776
def __iter__(self):
6877
"""

brisera/exceptions.py

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
"""
2+
Class hierarchy for exceptions in Brisera
3+
"""
4+
5+
class BriseraException(Exception):
6+
"""
7+
Top level class for Brisera exceptions
8+
"""
9+
pass
10+
11+
class ImproperlyConfigured(BriseraException):
12+
"""
13+
Something is wrong with a setting or configuration
14+
"""
15+
pass
16+
17+
class ReadLengthException(BriseraException):
18+
"""
19+
The read is not in bounds of the minimum and maximum read lengths
20+
"""
21+
pass

brisera/filter.py

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
"""
2+
Filters the best alignments from the computed alignments.
3+
"""
4+
5+
##########################################################################
6+
## Imports
7+
##########################################################################
8+
9+
from brisera.utils import *
10+
from brisera.config import settings
11+
12+
##########################################################################
13+
## Helper functions
14+
##########################################################################
15+
16+
@timeit
17+
def filter_alignments(sc, alignments):
18+
return alignments

brisera/records.py

+34
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,16 @@
22
Utilities to help create and serialize records in binary format
33
"""
44

5+
##########################################################################
6+
## Imports
7+
##########################################################################
8+
9+
import cPickle
10+
11+
##########################################################################
12+
## Module Constants
13+
##########################################################################
14+
515
DNA_BYTES = {
616
'A': 0x00,
717
'C': 0x01,
@@ -14,6 +24,10 @@
1424

1525
BYTES_DNA = dict((v, k) for (k,v) in DNA_BYTES.items())
1626

27+
##########################################################################
28+
## Helper functions
29+
##########################################################################
30+
1731
def dna_from_seq(dna, pos, length):
1832
if length == 0:
1933
return ""
@@ -46,6 +60,13 @@ def dna_from_seq(dna, pos, length):
4660

4761
return string
4862

63+
def repseed(seq, start, slen):
64+
first = seq[start]
65+
for idx in xrange(slen):
66+
if seq[idx+start] != first:
67+
return False
68+
return True
69+
4970
def record_from_bytes(raw):
5071

5172
last_chunk = raw[0] == 1
@@ -59,6 +80,19 @@ def record_from_bytes(raw):
5980

6081
return sequence, offset, last_chunk
6182

83+
def serialize_record(record):
84+
"""
85+
Convert a tuple into a binary string for use with SequenceFiles
86+
"""
87+
return cPickle.dumps(record, 0)
88+
89+
def deserialize_record(record):
90+
"""
91+
Read a binary record object and return the tuple
92+
"""
93+
record = record.encode('utf-8')
94+
return cPickle.loads(record)
95+
6296
if __name__ == '__main__':
6397
value = bytearray(b'\x01\x00\x00\x00\x00!\x14$AD@\x10B\x04DD"A@$$\x04"')
6498
print record_from_bytes(value)

0 commit comments

Comments
 (0)