Source code for oligo.tools
from __future__ import print_function, division
from collections import namedtuple
import os
import re
import subprocess
import pandas as pd
import pysam
from Bio import SeqIO
GENOME_MAP = {
"mm": {"species": "Mus musculus", "docker_lib_file": "/usr/local/mouse.hmm"},
"hg": {"species": "Homo sapiens", "docker_lib_file": "/usr/local/humans.hmm"},
}
USE_CUSTOM_RM_LIB = os.getenv("USE_CUSTOM_RM_LIB") in ("true", "1")
blat_param = '-stepSize=5 -minScore=10 -minIdentity=0 -repMatch=999999'
star_param = '--readFilesIn {} --genomeDir {} --runThreadN 4 --genomeLoad NoSharedMemory ' \
'--outFilterMultimapScoreRange 1000 --outFilterMultimapNmax ' \
'100000 --outFilterMismatchNmax 110 --seedSearchStartLmax 4 ' \
'--seedSearchLmax 20 --alignIntronMax 10 --seedPerWindowNmax ' \
'15 --seedMultimapNmax 11000 --winAnchorMultimapNmax 200 ' \
'--limitOutSAMoneReadBytes 400000 --outFileNamePrefix oligos_'
pat = re.compile('^[A-Z]')
[docs]
class Tools(object):
"""
Parameters
----------
genome : {'mm9', 'mm10', 'hg18', 'hg19', 'hg38'}
Genome build
fa : str
Path to reference genome fasta
Attributes
----------
blat : bool
Check off-target binding using BLAT instead of STAR (not
recommended for large designs), default = False
fasta : str
Name of fasta file for oligo sequences, default = oligo_seqs.fa
oligo_seqs : dict
Contains all oligo sequences after generating oligos
"""
def __init__(self, genome, fa, config_path, blat=False):
self.genome = genome
self.fa = fa
self.paths = dict((x.strip().split(' = ') for x in open(config_path) if pat.match(x)))
self.blat = blat
self.fasta = 'oligo_seqs.fa'
if self.__class__.__name__ != 'Tools':
print('Loading reference fasta file...')
self.genome_seq = SeqIO.to_dict(SeqIO.parse(fa, 'fasta'))
print('\t...complete')
[docs]
def _create_attr(self, oligo):
"""Creates `oligo`, `oligo_seqs` and `_assoc` attributes"""
self.oligo = oligo
self.oligo_seqs = {}
self._assoc = {}
[docs]
def write_fasta(self):
"""Writes `oligo_seqs` attribute to fasta file"""
with open(self.fasta, 'w') as fa_w:
for key, value in self.oligo_seqs.items():
fa_w.write('>{}\n{}\n'.format(key, value))
print('Wrote oligos to {}'.format(self.fasta))
return None
[docs]
def detect_repeats(self):
"""Detects repeat sequences in oligos, using RepeatMasker"""
options = ('RM_PATH', 'RepeatMasker', 'RepeatMasker',
'rm_log.txt', ''.join((self.fasta, '.out')))
genome_id = self.genome.lower()[:2]
cmd = f"-noint -s "
if USE_CUSTOM_RM_LIB:
cmd += f"-lib {GENOME_MAP[genome_id]['docker_lib_file']} "
else:
cmd += f"-species {GENOME_MAP[genome_id]['species']} "
cmd += f"{self.fasta}"
msg = 'Checking for repeat sequences in oligos,'
self._run_command(options, cmd, msg)
return self
[docs]
def align_to_genome(self, s_idx=''):
"""Aligns oligos to the genome using BLAT or STAR
Parameters
----------
s_idx : str
Path to the directory containing the STAR index for this
genome (not required if blat=True)
Raises
------
AttributeError
If `blat` = False but `s_idx` is not specified
FileNotFoundError
If a fasta file with the name specified by the `fasta`
attribute is not found
"""
if (not self.blat) and (not s_idx):
raise AttributeError('Path to STAR index must be set if '
'blat=False')
if not os.path.exists(self.fasta):
raise FileNotFoundError('A valid FASTA file with the name {} was '
'not found'.format(self.fasta))
if self.blat:
blat_out = 'blat_out.psl'
options = ('BLAT_PATH', 'blat', 'BLAT', 'blat_log.txt', blat_out)
cmd = ' '.join((blat_param, self.fa, self.fasta, blat_out))
else:
options = ('STAR_PATH', 'STAR', 'STAR', 'star_log.txt',
'oligos_Aligned.out.sam')
cmd = star_param.format(self.fasta, s_idx)
msg = 'Aligning oligos to the genome,'
self._run_command(options, cmd, msg)
return None
[docs]
def extract_repeats(self):
"""Extracts information of repeat content from RepeatMasker output
file for every oligo
"""
try:
self._oligo_stats
except AttributeError:
self._populate_oligo_stats()
# TODO: detect existence of .fa.out file here and echo the RM log contents if not found
with open('.'.join((self.fasta, 'out'))) as repeats_file:
if len(repeats_file.readlines())>1:
repeats_file.seek(0)
for _ in range(3):
next(repeats_file)
for line in repeats_file:
parts = re.split("\s+", line.strip())
oligo_name = parts[4]
repeat_type = parts[9]
fragment_side = re.split("\W+", oligo_name)[5]
if len(fragment_side)>1:
oligo_name = oligo_name.split('_')[0]
qstart, qstop = map(int, (parts[5:7]))
length = (qstop-qstart) + 1
if length > self._oligo_stats[oligo_name]['repeat_length']:
self._oligo_stats[oligo_name]['repeat_length'] = length
self._oligo_stats[oligo_name]['repeat_type'] = repeat_type
msg = 'Repeat scores calculated'
else:
msg = 'No repeats detected'
print(msg)
return self
[docs]
def calculate_density(self,
sam='oligos_Aligned.out.sam',
blat_file='blat_out.psl'):
"""Calculates the repeat scores and off-target binding for
each oligo based on their scores from RepeatMasker and
STAR/BLAT. Outputs results to `oligo_info.txt`.
Parameters
----------
sam : str
Path to STAR alignment (.sam) file from `align_to_genome`
(not required if `blat`=True), default = oligos_Aligned.out.sam
blat_file : str
Path to BLAT alignment (.psl) file from `align_to_genome`
(not required if `blat`=False), default = blat_out.psl
"""
try:
self._oligo_stats
except AttributeError:
self._populate_oligo_stats()
if self.blat:
with open(blat_file) as f:
for _ in range(5):
next(f)
for line in f:
parts = re.split("\s+", line.strip())
oligo_name = parts[9]
qgapbases, qstart, qend = map(int, (parts[5], parts[11],
parts[12]))
self._oligo_stats[oligo_name]['multimap'] += 1
self._oligo_stats[oligo_name]['matches'] += (int(qend) -
int(qstart)) + 1
self._oligo_stats[oligo_name]['mismatches'] += int(qgapbases)
else:
sf = pysam.AlignmentFile(sam, 'r')
for r in sf.fetch(until_eof=True):
oligo_name = r.query_name
if self._oligo_stats[oligo_name]['multimap'] == 0:
self._oligo_stats[oligo_name]['multimap'] = r.get_tag('NH')
for block in r.cigartuples:
if block[0] == 0:
self._oligo_stats[oligo_name]['matches'] += block[1]
elif (block[0] == 1) | (block[0] == 2):
self._oligo_stats[oligo_name]['mismatches'] += block[1]
for oligo in self._oligo_stats:
score = self._oligo_stats[oligo]['matches'] - self._oligo_stats[oligo]['mismatches']
density = score / len(self._oligo_stats[oligo]['sequence'])
self._oligo_stats[oligo]['density'] = float("{0:.2f}".format(density))
print('Density scores calculated')
return self
[docs]
def write_oligo_info(self):
"""Writes oligo stats to oligo_info.txt"""
p = re.compile('\W+')
with open('oligo_info.txt', 'w') as output:
output.write('chr\tstart\tstop\tfragment_start\tfragment_stop\t'
'side_of_fragment\tsequence\ttotal_number_of_alignments\t'
'density_score\trepeat_length\trepeat_class\tGC%\t'
'associations\n')
for oligo, stats in self._oligo_stats.items():
oligo_parts = (chrom, read_start, read_stop, frag_start,
frag_stop, frag_side) = p.split(oligo)
has_fragment = True
if (frag_start, frag_stop, frag_side) == ('000', '000', 'X'):
has_fragment = False
#frag_start, frag_stop, frag_side = '.' * 3
oligo_parts[3:] = '.' * 3
try:
self._assoc
except AttributeError:
associations = '.'
else:
if self._assoc:
if has_fragment:
coor = '{}:{}-{}'.format(chrom, frag_start, frag_stop)
else:
coor = '{}:{}-{}'.format(chrom, read_start, read_stop)
associations = self._assoc.get(coor, '.')
else:
associations = '.'
keys = ('sequence', 'multimap', 'density', 'repeat_length',
'repeat_type', 'GC%')
to_write = oligo_parts + [str(stats[x])
for x in keys] + [associations]
output.write('{}\n'.format('\t'.join(to_write)))
sorted_df = self._sort_file()
sorted_df.to_csv('oligo_info.txt', sep='\t', index=False, na_rep='NA')
print('Oligo information written to oligo_info.txt')
return None
[docs]
def _run_command(self, options, cmd, msg):
"""Runs a command using subprocess"""
CmdOptions = namedtuple('CmdOptions', ['paths_key', 'exe', 'name',
'log_file', 'output_file'])
run_options = CmdOptions._make(options)
path = os.path.join(self.paths[run_options.paths_key], run_options.exe)
print('{} with {}...'.format(msg, run_options.name))
log = open(run_options.log_file, 'w')
subprocess.call(' '.join((path, cmd)), shell=True, stdout=log,
stderr=log)
log.close()
print('\t...complete. Output written to {}'.format(
run_options.output_file))
return None
[docs]
def _populate_oligo_stats(self):
"""Populates _oligo_stats attribute with default values"""
self._oligo_stats = {}
with open(self.fasta) as fasta_file:
for line in fasta_file:
oligo_name = line.lstrip('>').strip()
read_seq = next(fasta_file).strip()
self._oligo_stats[oligo_name] = {
'sequence': read_seq,
'multimap': 0,
'density': 0,
'repeat_length': 0,
'repeat_type': 'NA',
'GC%': self._get_gc(read_seq),
'matches': 0,
'mismatches': 0,
}
return None
[docs]
def _get_gc(self, x):
"""Calculates GC percentage of a DNA sequence"""
gc_decimal = (x.count('C') + x.count('G'))/len(x)
gc_decimal = float("{0:.2f}".format(gc_decimal))
gc_perc = int(gc_decimal*100)
return gc_perc
[docs]
def _sort_file(self):
"""Sorts oligo output file"""
df = pd.read_table('oligo_info.txt', header=0)
#df['chr'] = [int(x[3:]) for x in df['chr']] # this threw an error for chromosomes X and Y
df.sort_values(['chr', 'start'], inplace=True)
#df['chr'] = 'chr' + df['chr'].map(str)
return df
def __repr__(self):
return '{}(genome={}, fa={}, blat={})'.format(self.__class__.__name__,
self.genome,
self.fa,
self.blat)