Skip to content
Snippets Groups Projects
Commit 6d1cc041 authored by Kurtz, Prof. Dr. Stefan's avatar Kurtz, Prof. Dr. Stefan
Browse files

Added program to calculate ANI based on nucmer

difference to original version of Phybema: distance estimator
outputs result as matrix in PHYLIP format
parent 1d420bbc
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,7 @@
test:testsuite/test_rep.sh
@testsuite/test_rep.sh testset1
@echo "all tests passed"
clean:
@${RM} -r temp out
""" Calculation of Average nucleotide indentity (ANI)"""
import os
import numpy as np
from anicalc.fasta_iterator import fasta_next, write_sequence
from anicalc.nucmer import compute_delta, nucmer_calculate_alignments_list
def extend_path(program):
"""
Function to extend the a program to the full length path
with systems file path if required and checks if program is executable.
@param program: program filepath or name
@return: full filepath or error if not found or not executable
"""
fpath, _ = os.path.split(program)
if fpath:
if os.path.isfile(program) and os.access(program, os.X_OK):
return os.path.abspath(program)
raise FileNotFoundError(
"{} not found or not executable".format(program))
for path in os.environ["PATH"].split(os.pathsep):
exe_file = os.path.abspath(os.path.join(path, program))
if os.path.isfile(exe_file) and os.access(exe_file, os.X_OK):
return exe_file
raise FileNotFoundError("{} not found or not executable".format(program))
def create_fragments(infile, outputfilepath, min_fragment_size, fragment_size):
"""
Creates fragments based on an fasta file (for later alignments) Output files
get the suffixes '.fa' for the original sequences and '.fa.split' for
the fragments. Filenames based on FASTA-headers.
@param infile: (multi-) FASTA file with the input sequence(s)
@param outputfilepath: filepath were the output should be written to
@param min_fragment_size: minimal fragment size a fragment should have
@param fragment_size: Usual size of the output fragments
@return: list of created file prefixes
"""
files = list()
for header, sequence in fasta_next(infile):
save_header = header.split(' ')[0]
save_header = save_header.split('|')[0]
files.append(save_header)
outputfile = '{}/{}.fa'.format(outputfilepath, save_header)
with open(outputfile, 'w') as file:
file.write('>' + header + '\n')
write_sequence(file, sequence)
with open(outputfile + '.split', 'w') as file:
idx = 0
while len(sequence) > (fragment_size + min_fragment_size):
idx += 1
fragment = sequence[:fragment_size]
file.write('>' + str(idx) + '\n')
file.write(fragment + '\n')
sequence = sequence[fragment_size:]
file.write('>' + str(idx + 1) + '\n')
file.write(sequence + '\n')
return files
def calculate_ani_single(alignments_list):
"""
Calculates average nucleotide indentity (ANI) from a list of local
alignments. ANI is calculated from all alignment with are at least aligned
to 70% with maximal 30% non identities.
@param alignments_list: list of alignments represented as triple like
(alignment length, non identities, query sequence length)
@return: Average nucleotide identity
"""
ni_sum = 0
ani_matches = 0
for alignment_length, non_identities, query_length in alignments_list:
if (((alignment_length - non_identities) / query_length) > 0.3) and \
((alignment_length / query_length) >= 0.7):
ni_sum += (alignment_length - non_identities) / alignment_length
ani_matches += 1
ani = (ni_sum / ani_matches) if ani_matches > 0 else 0
return ani
def calculate_ani_nucmer(reference, files, tmpdir):
"""
Function to create alignments with nucmer and then calculate ANI for all
reference-query-pairs. Requires to have 'nucmer' and 'delta-filter' in your
path.
@param reference: Prefix of the reference file to use
(Filename should be like reference + '.fa')
@param files: List of rrefix of query filesto use
(Filename should be like query + '.fa.split')
@param tmpdir: Name of the directory where the reference and query files
are located.
(Filenames are expanded to tmpdir + '/' + filename)
@return: reference , same as input (for multiprocessing)
@return: numpy array holding all ANI
(np.nan for the alignment of reference to reference)
"""
nucmer_path = extend_path('nucmer')
delta_filter = extend_path('delta-filter')
file_paths = [compute_delta(nucmer_path, delta_filter,
tmpdir + '/' + reference + '.fa',
tmpdir + '/' + currentfile + '.fa.split',
tmpdir)
if currentfile != reference else None
for currentfile in files]
ani = (calculate_ani_single(nucmer_calculate_alignments_list(qfile))
if qfile else np.nan for qfile in file_paths)
return reference, np.fromiter(ani, np.float)
_CALCULATE_ANI_WITH_TOOL = {
'nucmer': calculate_ani_nucmer
}
def calculate_ani_with_tool(tool):
""" Wrapper for using different alignment tools"""
return _CALCULATE_ANI_WITH_TOOL[tool]
""" Provides Iterator over (multi-) FASTA file"""
import re
def write_sequence(filestream, sequence):
""" Writes FASTA sequence to filestream using fixed length 60 """
for startpos in range(0, len(sequence), 60):
filestream.write(sequence[startpos:startpos + 60] + '\n')
def print_sequence(seq, linelength=70):
""" Prints FASTA sequence seq to sdtout with linelength (default 70)"""
for startpos in range(0, len(seq), linelength):
print(seq[startpos:startpos + linelength])
def _seq_list2sequence(seq_list):
sequence = ' '.join(seq_list)
return re.sub(r'\s', "", sequence)
def fasta_next(filename):
"""
Iterator to get (header, sequence) tuple
from (multi-)FASTA file with filename.
"""
with open(filename, "r") as stream:
seq_list = list()
header = None
for line in stream:
if not re.search(r'^(>|\s*$|\s*#)', line):
seq_list.append(line.rstrip())
else:
match = re.search(r'^>(.*)', line)
if match is not None:
if header is not None:
yield header, _seq_list2sequence(seq_list)
seq_list.clear()
header = match.group(1)
if header is not None:
yield header, _seq_list2sequence(seq_list)
""" Module to run nucmer and parse its output"""
import subprocess
import os
import tempfile
import re
class Alignment:
""" Class holding information from a .delta file"""
def __init__(self, reference, query, ref_len, query_len):
self.reference = reference
self.query = query
self.ref_len = ref_len
self.query_len = query_len
self.alignments = list()
def __len__(self):
return sum([len(alg) for alg in self.alignments])
def add_segment(self, reference_coordinates, query_coordinates, *errors):
"""
Function to add an new aligned segment to an alignment object.
(not quite sure if an alignment in delta format can
hold more than one segments)
@param reference_coordinates: tuple, start and end position
in the reference sequence
@param query_coordinates: tuple, start and end position
in the query sequence
@param: errors: triple containing non identities, similarity errors and
stop codons (if protein sequence)
"""
new = AlignmentSegment(reference_coordinates,
query_coordinates, *errors)
self.alignments.append(new)
return new
def get_errors(self):
""" Returns sum of all non identities over all alignment segments"""
return sum([int(alg.errors) for alg in self.alignments])
class AlignmentSegment:
""" Class holding an alignment segment from .delta file """
def __init__(self, reference_coordinates, query_coordinates, *errors):
self.reference_coordinates = reference_coordinates
self.query_coordinates = query_coordinates
self.errors = int(errors[0])
self.similarity_errors = int(errors[1])
self.stop_codons = int(errors[2])
self.nexts = list()
def __len__(self):
return abs(int(self.query_coordinates[1]) -
int(self.query_coordinates[0])) + 1
def add_next(self, value):
""" Adds alignment information"""
self.nexts.append(value)
def compute_delta(nucmer_path, delta_filter_path, reference, query, tmp_dir):
"""
Computes .delta files with nucmer based on one reference FASTA and one query
multi-FASTA file. Files are automatically filtered using 'delta-filer'.
@param nucmer_path: absolute path to nucmer
@param delta_filter: absolute path to delta-filter
@param reference: absolute path to reference FASTA file
@param query: absolute path to query multi-FASTA file
@param tmp: absolute path to working directory
@return: the absolute path to the resulting filtered .delta file
"""
with tempfile.TemporaryDirectory(dir=tmp_dir) as tmpdir:
try:
subprocess.check_call([nucmer_path, '--threads=1', reference,
query], cwd=tmpdir, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except subprocess.CalledProcessError:
subprocess.check_call([nucmer_path, reference, query],
cwd=tmpdir, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
filtered_delta_path = '{}/{}_{}.delta'.format(
tmp_dir, os.path.basename(reference),
os.path.basename(query))
with open(filtered_delta_path, 'w') as filtered_delta:
subprocess.check_call([delta_filter_path, '-q', tmpdir + '/out.delta'],
stdout=filtered_delta)
return filtered_delta_path
def read_delta(filtered_delta_path):
""" Iterator to read an .delta file to Alignment objects """
current_alignment = None
with open(filtered_delta_path, 'r') as filtered_delta:
for line in filtered_delta:
line = line.rstrip()
if line[0] == '>':
if current_alignment is not None:
yield current_alignment
reference, query, ref_len, query_len = line.split(' ')
current_alignment = Alignment(reference[1:], query,
int(ref_len), int(query_len))
elif re.match(r'\d+\s\d+\s\d+\s\d+\s\d+\s\d+', line):
split_line = line.split(' ')
current_segment = current_alignment.add_segment(
split_line[0:2],
split_line[2:4],
*split_line[4:]
)
elif re.match(r'\d+', line):
split_line = line.split(' ')
if split_line[0] != '0':
current_segment.add_next(int(split_line[0]))
if current_alignment is not None:
yield current_alignment
def nucmer_calculate_alignments_list(file):
""" Function to create an list of triples as input for
anicalc.calculate_ani_single from an .delta file
"""
if file is not None:
output = list()
for alignment in read_delta(file):
output.append(
(len(alignment), alignment.get_errors(), alignment.query_len))
return output
return None
......@@ -11,15 +11,19 @@ from anicalc.anicalc import create_fragments, calculate_ani_with_tool
def convert_raw_results_to_phybema(raw_results):
num_sequences = len(raw_results)
print('# pairwise dist values for {} sequences'.format(num_sequences))
for idx, r in enumerate(raw_results):
print('{}\t{}'.format(idx,r[0]))
for i in range(num_sequences-1):
for j in range(i+1,num_sequences):
output = list()
output.append('{}'.format(num_sequences))
for i in range(num_sequences):
line = list()
line.append('{}'.format(raw_results[i][0]))
line.append('0.0')
for j in range(0,i):
ani1 = raw_results[i][1][j]
ani2 = raw_results[j][1][i]
avg_ani = (ani1+ani2)/2.0
print('dist {} {} {:.8f}'.format(i,j,1.0 - avg_ani))
line.append('{:.8f}'.format(1.0 - avg_ani))
output.append('\t'.join(line))
print('\n'.join(output))
DEBUG = False
......
#!/bin/sh
if test $# -ne 1
then
echo "Usage: $0 <inputfile>"
exit 1
fi
inputfile=$1
src/calculate_ani.py --phybema nucmer ${inputfile}
......@@ -7,7 +7,7 @@ then
fi
subdir=$1
src/phybema.py --tools mash andi -- testdata/${subdir}/
src/phybema.py --tools mash andi dnadiff -- testdata/${subdir}/
for prog in andi mash
do
for suffix in mat nh
......
4
tgen_1 0.0
tgen_2 0.0 0.00090939
tgen_3 0.0 0.00083935 0.00051365
tgen_4 0.0 0.00101731 0.00153718 0.00200906
(tgen_2,((tgen_4:0.00064,tgen_1:-0.00064):0.00022,tgen_3:0.00055):0.00029);
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment