"""
09 Jun 2015
"""
from __future__ import print_function
from builtins import next
import os
import re
import copy
import multiprocessing as mu
from warnings import warn
from tempfile import gettempdir, mkstemp
from subprocess import CalledProcessError, PIPE, STDOUT, Popen
from pysam import Samfile
from pytadbit.utils.file_handling import mkdir, which, is_fastq
from pytadbit.utils.file_handling import magic_open, get_free_space_mb
from pytadbit.parsers.sam_parser import parse_gem_3c, merge_sort
from pytadbit.mapping.restriction_enzymes import religateds
from pytadbit.mapping.restriction_enzymes import RESTRICTION_ENZYMES
from pytadbit.mapping.restriction_enzymes import map_re_sites
from pytadbit.mapping.restriction_enzymes import iupac2regex
try:
basestring
except NameError:
basestring = str
def transform_fastq(fastq_path, out_fastq, trim=None, r_enz=None, add_site=True,
min_seq_len=15, fastq=True, verbose=True,
light_storage=False, **kwargs):
"""
Given a FASTQ file it can split it into chunks of a given number of reads,
trim each read according to a start/end positions or split them into
restriction enzyme fragments
:param True add_site: when splitting the sequence by ligated sites found,
removes the ligation site, and put back the original RE site.
"""
skip = kwargs.get('skip', False)
## define local functions to process reads and sequences
def _get_fastq_read_heavy(rlines):
"""
returns header and sequence of 1 FASTQ entry
Note: header also contains the sequence
"""
rlines = rlines.rstrip('\n').split()[0][1:]
seq = next(fhandler)
_ = next(fhandler) # lose qualities header but not needed
qal = next(fhandler) # lose qualities but not needed
# header now also contains original read
return (rlines.split('/',1)[0].split('~',1)[0] + ' ' + seq.strip() + ' ' + qal.strip(),
seq.strip(), qal.strip())
def _get_fastq_read_light(rlines):
"""
returns header and sequence of 1 FASTQ entry
Note: header also contains the sequence
"""
rlines = rlines.rstrip('\n').split()[0][1:]
seq = next(fhandler)
_ = next(fhandler) # lose qualities header but not needed
qal = next(fhandler) # lose qualities but not needed
return (rlines.split('/',1)[0].split('~',1)[0], seq.strip(), qal.strip())
def _get_map_read_heavy(line):
header = line.split('\t', 1)[0]
seq, qal = header.rsplit(' ', 2)[-2:]
return header, seq, qal
def _get_map_read_light(line):
header, seq, qal, _ = line.split('\t', 3)
return header, seq, qal
def _inverse_find(seq, pat):
try:
return pat.search(seq).start()
except AttributeError:
return float('inf')
def find_patterns(seq, patterns):
pos, pat = min((_inverse_find(seq, patterns[p]), p) for p in patterns)
return int(pos), pat
def _split_read_re(seq, qal, patterns, site, max_seq_len=None, cnt=0):
"""
Recursive generator that splits reads according to the
predefined restriction enzyme.
RE fragments yielded are followed and preceded by the RE site if a
ligation site was found after the fragment.
EXAMPLE:
seq = '-------oGATCo========oGATCGATCo_____________oGATCGATCo~~~~~~~~~~~~'
qal = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
should yield these fragments:
-------oGATCo========oGATC
xxxxxxxxxxxxxxxxxxxxxxHHHH
GATCo_____________oGATC
HHHHxxxxxxxxxxxxxxxHHHH
GATCo~~~~~~~~~~~~
HHHHxxxxxxxxxxxxx
:param seq: sequence of the read fragment
:param qal: quality of the sequence of the read fragment
:param patterns: list of patterns of the ligated cut sites
:param None max_seq_len: to control that all reads are bellow this
length
:param '' site: non-ligated cut site to replace ligation site
:param 0 cnt: to count number of fragments
:yields: seq fragments, their qualities and their count, or index
(higher than 0 if ligation sites are found)
"""
cnt += 1
try:
pos, (r_enz1, r_enz2) = find_patterns(seq, patterns)
except OverflowError:
if len(seq) == max_seq_len:
raise ValueError
if len(seq) > min_seq_len:
yield seq, qal, cnt
return
# add quality before corresponding to the space occupied by the cut-site
xqal1 = ('H' * len(site[r_enz1]))
xqal2 = ('H' * len(site[r_enz2]))
if pos < min_seq_len:
split_read(site[r_enz2] + seq[pos + len_relgs[(r_enz1, r_enz2)]:],
xqal2 + qal[pos + len_relgs[(r_enz1, r_enz2)]:],
patterns, no_site, max_seq_len, cnt=cnt)
else:
yield seq[:pos] + site[r_enz1], qal[:pos] + xqal1, cnt
new_pos = pos + len_relgs[(r_enz1, r_enz2)]
for sseq, sqal, cnt in split_read(site[r_enz2] + seq[new_pos:],
xqal2 + qal[new_pos:], patterns,
site, max_seq_len, cnt=cnt):
yield sseq, sqal, cnt
# Define function for stripping lines according to focus
if isinstance(trim, tuple):
beg, end = trim
beg -= 1
strip_line = lambda x: x[beg:end]
else:
strip_line = lambda x: x
# define function to split reads according to restriction enzyme sites
if isinstance(r_enz, basestring):
r_enzs = [r_enz]
elif isinstance(r_enz, list):
r_enzs = r_enz
else:
r_enzs = None
if r_enzs:
enzymes = {}
enz_patterns = {}
for r_enz in r_enzs:
enzymes[r_enz] = RESTRICTION_ENZYMES[r_enz].replace('|', '')
enz_patterns = religateds(r_enzs)
sub_enz_patterns = {}
len_relgs = {}
for r_enz1, r_enz2 in enz_patterns:
sub_enz_patterns[(r_enz1, r_enz2)] = (
enz_patterns[(r_enz1, r_enz2)][:len(enz_patterns[(r_enz1, r_enz2)])
// 2])
len_relgs[(r_enz1, r_enz2)] = len(enz_patterns[(r_enz1, r_enz2)])
print(' - splitting into restriction enzyme (RE) fragments using ligation sites')
print(' - ligation sites are replaced by RE sites to match the reference genome')
for r_enz1 in r_enzs:
for r_enz2 in r_enzs:
print(' * enzymes: %s & %s, ligation site: %s, RE site: %s & %s' % (
r_enz1, r_enz2, enz_patterns[(r_enz1, r_enz2)],
enzymes[r_enz1], enzymes[r_enz2]))
# replace pattern with regex to support IUPAC annotation
for ezp in enz_patterns:
enz_patterns[ezp] = re.compile(iupac2regex(enz_patterns[ezp]))
for ezp in sub_enz_patterns:
sub_enz_patterns[ezp] = iupac2regex(sub_enz_patterns[ezp])
split_read = _split_read_re
else:
enzymes = ''
enz_patterns = ''
sub_enz_patterns = ''
split_read = lambda x, y, z, after_z, after_after_z: (yield x, y , 1)
# function to yield reads from input file
if light_storage:
get_seq = _get_fastq_read_light if fastq else _get_map_read_light
insert_mark = insert_mark_light
else:
get_seq = _get_fastq_read_heavy if fastq else _get_map_read_heavy
insert_mark = insert_mark_heavy
## Start processing the input file
if verbose:
print('Preparing %s file' % ('FASTQ' if fastq else 'MAP'))
if fastq:
print(' - conversion to MAP format')
if trim:
print(' - trimming reads %d-%d' % tuple(trim))
counter = 0
if skip:
if fastq:
print(' ... skipping, only counting lines')
counter = sum(1 for _ in magic_open(fastq_path,
cpus=kwargs.get('nthreads')))
counter /= 4 if fastq else 1
print(' ' + fastq_path, counter, fastq)
return out_fastq, counter
# open input file
fhandler = magic_open(fastq_path, cpus=kwargs.get('nthreads'))
# create output file
out_name = out_fastq
out = open(out_fastq, 'w')
# iterate over reads and strip them
no_site = dict([(r_enz, '') for r_enz in enzymes])
site = enzymes if add_site else no_site
for header in fhandler:
header, seq, qal = get_seq(header)
counter += 1
# trim on wanted region of the read
seq = strip_line(seq)
qal = strip_line(qal)
# get the generator of restriction enzyme fragments
iter_frags = split_read(seq, qal, enz_patterns, site, len(seq))
# the first fragment should not be preceded by the RE site
try:
seq, qal, cnt = next(iter_frags)
except StopIteration:
# read full of ligation events, fragments not reaching minimum
continue
except ValueError:
# or not ligation site found, in which case we try with half
# ligation site in case there was a sequencing error (half ligation
# site is a RE site or nearly, and thus should not be found anyway)
iter_frags = split_read(seq, qal, sub_enz_patterns, no_site, len(seq))
try:
seq, qal, cnt = next(iter_frags)
except ValueError:
continue
except StopIteration:
continue
out.write(_map2fastq('\t'.join((insert_mark(header, cnt),
seq, qal, '0', '-\n'))))
# the next fragments should be preceded by the RE site
# continue
for seq, qal, cnt in iter_frags:
out.write(_map2fastq('\t'.join((insert_mark(header, cnt),
seq, qal, '0', '-\n'))))
out.close()
return out_name, counter
def insert_mark_heavy(header, num):
if num == 1 :
return header
h, s, q = header.rsplit(' ', 2)
return '%s~%d~ %s %s' % (h, num, s, q)
def insert_mark_light(header, num):
if num == 1 :
return header
return '%s~%d~' % (header, num)
def _map2fastq(read):
return '@{0}\n{1}\n+\n{2}\n'.format(*read.split('\t', 3)[:-1])
def _sam_filter(fnam, fastq_path, unmap_out, map_out):
"""
Divides reads in a map file in two categories: uniquely mapped, and not.
Writes them in two files
"""
try:
fhandler = Samfile(fnam)
except IOError:
raise Exception('ERROR: file "%s" not found' % fnam)
# getrname chromosome names
i = 0
crm_dict = {}
while True:
try:
crm_dict[i] = fhandler.getrname(i)
i += 1
except ValueError:
break
# iteration over reads
unmap_out = open(unmap_out, 'w')
map_out = open(map_out, 'w')
fastq_in = open(fastq_path , 'r')
for line in fhandler:
line_in = fastq_in.readline()
if line.is_unmapped or line.mapq < 4:
read = '%s\t%s\t%s\t%s\t%s\n' % (
line_in.split('\t', 1)[0].rstrip('\n')[1:],
line.seq, line.qual, '-', '-'
)
unmap_out.write(read)
else:
read = '%s\t%s\t%s\t%s\t%s:%s:%d:%d\n' % (
line.qname, line.seq, line.qual, '1',
crm_dict[line.tid],
'-' if line.is_reverse else '+', line.pos + 1, len(line.seq))
map_out.write(read)
for _ in range(3):
fastq_in.readline()
unmap_out.close()
map_out.close()
fastq_in.close()
def _gem_filter(fnam, unmap_out, map_out):
"""
Divides reads in a map file in two categories: uniquely mapped, and not.
Writes them in two files
Notes:
- GEM unique-maps can not be used as it gets rid of reads like 1:0:0:5
- not feasible with gt.filter
"""
fhandler = magic_open(fnam) if isinstance(fnam, basestring) else fnam
unmap_out = open(unmap_out, 'w')
map_out = open(map_out , 'w')
def _strip_read_name(line):
"""
remove original sequence from read name when read is mapped uniquely
"""
header, line = line.split('\t', 1)
return '\t'.join((header.rsplit(' ', 2)[0], line))
for line in fhandler:
matches = line.rsplit('\t', 2)[1]
bad = False
if matches != '1':
for m in matches.replace('+', ':').split(':'):
if m == '0':
continue
if m != '1':
bad = True
unmap_out.write(line)
break
break
else:
bad = True
unmap_out.write(line)
if not bad:
map_out.write(_strip_read_name(line))
unmap_out.close()
map_out.close()
def _bowtie2_mapping(bowtie2_index_path, fastq_path1, out_map_path, fastq_path2 = None,
bowtie2_binary='bowtie2', bowtie2_params=None, **kwargs):
"""
"""
bowtie2_index_path= os.path.abspath(os.path.expanduser(bowtie2_index_path))
fastq_path1 = os.path.abspath(os.path.expanduser(fastq_path1))
paired_map = False
if fastq_path2:
fastq_path2 = os.path.abspath(os.path.expanduser(fastq_path2))
paired_map = True
out_map_path = os.path.abspath(os.path.expanduser(out_map_path))
nthreads = kwargs.get('nthreads' , 8)
# check that we have the GEM binary:
bowtie2_binary = which(bowtie2_binary)
if not bowtie2_binary:
raise Exception('\n\nERROR: %s binary not found'%bowtie2_binary)
# mapping
print('TO %s'%bowtie2_binary, fastq_path1, fastq_path2)
bowtie2_cmd = [
bowtie2_binary, '-x', bowtie2_index_path,
'-p', str(nthreads), '--reorder','-k','1','-S',
out_map_path]
if paired_map:
bowtie2_cmd += ['-1',fastq_path1,'-2',fastq_path2]
else:
bowtie2_cmd += ['-U', fastq_path1]
if bowtie2_params:
if isinstance(bowtie2_params, dict):
for bow_param in bowtie2_params:
bowtie2_cmd.append('-'+bow_param)
if bowtie2_params[bow_param]:
bowtie2_cmd.append(bowtie2_params[bow_param])
elif isinstance(bowtie2_params, list):
bowtie2_cmd += bowtie2_params
elif bowtie2_binary == 'bowtie2':
bowtie2_cmd.append('--very-sensitive')
print(' '.join(bowtie2_cmd))
try:
# check_call(gem_cmd, stdout=PIPE, stderr=PIPE)
out, err = Popen(bowtie2_cmd, stdout=PIPE, stderr=PIPE,
universal_newlines=True).communicate()
except CalledProcessError as e:
print(out)
print(err)
raise Exception(e.output)
def _gem_mapping(gem_index_path, fastq_path, out_map_path, fastq_path2 = None,
r_enz=None, gem_binary='gem-mapper', gem_version=2, compress=False,
gem_params=None, **kwargs):
"""
:param None focus: trims the sequence in the input FASTQ file according to a
(start, end) position, or the name of a restriction enzyme. By default it
uses the full sequence.
:param 33 quality: set it to 'ignore' in order to speed-up the mapping
"""
gem_index_path = os.path.abspath(os.path.expanduser(gem_index_path))
fastq_path = os.path.abspath(os.path.expanduser(fastq_path))
out_map_path = os.path.abspath(os.path.expanduser(out_map_path))
nthreads = kwargs.get('nthreads' , 8)
max_edit_distance = kwargs.get('max_edit_distance' , 0.04)
mismatches = kwargs.get('mismatches' , 0.04)
# mapping
print('TO GEM', gem_version, fastq_path)
kgt = kwargs.get
if gem_version == 2:
gem_cmd = [
gem_binary, '-I', gem_index_path,
'-q' , kgt('q', 'offset-33' ),
'-m' , kgt('m', str(max_edit_distance ) ),
'-s' , kgt('s', kgt('strata-after-best', '0') ),
'--allow-incomplete-strata' , kgt('allow-incomplete-strata', '0.00' ),
'--granularity' , kgt('granularity', '10000' ),
'--max-decoded-matches' , kgt('max-decoded-matches', kgt('d', '1')),
'--min-decoded-strata' , kgt('min-decoded-strata', kgt('D', '0') ),
'--min-insert-size' , kgt('min-insert-size', '0' ),
'--max-insert-size' , kgt('max-insert-size', '0' ),
'--min-matched-bases' , kgt('min-matched-bases', '0.8' ),
'--gem-quality-threshold' , kgt('gem-quality-threshold', '26' ),
'--max-big-indel-length' , kgt('max-big-indel-length', '15' ),
'--mismatch-alphabet' , kgt('mismatch-alphabet', 'ACGT' ),
'-E' , kgt('E', '0.30' ),
'--max-extendable-matches' , kgt('max-extendable-matches', '20' ),
'--max-extensions-per-match', kgt('max-extensions-per-match', '1' ),
'-e' , kgt('e', str(mismatches) ),
'-T' , str(nthreads),
'-i' , fastq_path,
'-o', out_map_path.replace('.map', '')]
if 'paired-end-alignment' in kwargs or 'p' in kwargs:
gem_cmd.append('--paired-end-alignment')
if 'map-both-ends' in kwargs or 'b' in kwargs:
gem_cmd.append('--map-both-ends')
if 'fast-mapping' in kwargs:
gem_cmd.append('--fast-mapping')
if 'unique-mapping' in kwargs:
gem_cmd.append('--unique-mapping')
if 'unique-pairing' in kwargs:
gem_cmd.append('--unique-pairing')
# check kwargs
for kw in kwargs:
if not kw in ['nthreads', 'max_edit_distance',
'mismatches', 'max_reads_per_chunk',
'out_files', 'temp_dir', 'skip', 'q', 'm', 's',
'strata-after-best', 'allow-incomplete-strata',
'granularity', 'max-decoded-matches',
'min-decoded-strata', 'min-insert-size',
'max-insert-size', 'min-matched-bases',
'gem-quality-threshold', 'max-big-indel-length',
'mismatch-alphabet', 'E', 'max-extendable-matches',
'max-extensions-per-match', 'e', 'paired-end-alignment',
'p', 'map-both-ends', 'fast-mapping', 'unique-mapping',
'unique-pairing', 'suffix']:
warn('WARNING: %s not in usual keywords, misspelled?' % kw)
else:
gem_cmd = [
gem_binary, '-I', gem_index_path,
'-t' , str(nthreads),
'-F' , 'SAM',
'-o', out_map_path]
if compress:
gem_cmd += ['--gzip-output']
if fastq_path2:
if not r_enz:
raise Exception('ERROR: need enzyme name to fragment.')
print('Using GEM', gem_version, 'with 3c mapping')
gem_cmd += ['--i1', fastq_path, '--i2', fastq_path2, '--3c']
if isinstance(r_enz, basestring):
gem_cmd += ['--restriction-enzyme', r_enz]
elif isinstance(r_enz, list):
for r_z in r_enz:
gem_cmd += ['--restriction-enzyme', r_z]
else:
gem_cmd += ['-i', fastq_path]
if gem_params:
if isinstance(gem_params, dict):
for gem_param in gem_params:
gem_cmd.append('-'+gem_param)
if gem_params[gem_param]:
gem_cmd.append(gem_params[gem_param])
elif isinstance(gem_params, list):
gem_cmd += gem_params
print(' '.join(gem_cmd))
try:
# check_call(gem_cmd, stdout=PIPE, stderr=PIPE)
out, err = Popen(gem_cmd, stdout=PIPE, stderr=PIPE,
universal_newlines=True).communicate()
except CalledProcessError as e:
print(out)
print(err)
raise Exception(e.output)
[docs]def full_mapping(mapper_index_path, fastq_path, out_map_dir, mapper='gem',
r_enz=None, frag_map=True, min_seq_len=15, windows=None,
add_site=True, clean=False, get_nread=False,
mapper_binary=None, mapper_params=None, **kwargs):
"""
Maps FASTQ reads to an indexed reference genome. Mapping can be done either
without knowledge of the restriction enzyme used, or for experiments
performed without one, like Micro-C (iterative mapping), or using the
ligation sites created from the digested ends (fragment-based mapping).
:param mapper_index_path: path to index file created from a reference genome
using gem-index tool or bowtie2-build
:param fastq_path: PATH to FASTQ file, either compressed or not.
:param out_map_dir: path to a directory where to store mapped reads in MAP
format .
:param None r_enz: name of the restriction enzyme used in the experiment e.g.
HindIII. This is optional if frag_map option is False
:param True frag_map: two step mapper, first full length is mapped, then
remaining, unmapped reads, are divided into restriction-enzyme fragments
andeach is mapped.
:param True add_site: when splitting the sequence by ligated sites found,
removes the ligation site, and put back the original RE site.
:param 15 min_seq_len: minimum size of a fragment to map
:param None windows: tuple of ranges for beginning and end of the
mapping. This parameter allows to do classical iterative mapping, e.g.
windows=((1,25),(1,30),(1,35),(1,40),(1,45),(1,50))
A unique window can also be passed, for trimming, like this:
windows=((1,101),)
:param False clean: remove intermediate files created in temp_dir
:param 4 nthreads: number of threads to use for mapping (number of CPUs)
:param 0.04 max_edit_distance: The maximum number of edit operations allowed
while verifying candidate matches by dynamic programming.
:param 0.04 mismatches: The maximum number of nucleotide substitutions
allowed while mapping each k-mer. It is always guaranteed that, however
other options are chosen, all the matches up to the specified number of
substitutions will be found by the program.
:param /tmp temp_dir: important to change. Intermediate FASTQ files will be
written there.
:param False get_nreads: returns a list of lists where each element contains
a path and the number of reads processed
:param gem-mapper mapper_binary: path to the binary mapper
:param None mapper_params: extra parameters for the mapper
:returns: a list of paths to generated outfiles. To be passed to
:func:`pytadbit.parsers.map_parser.parse_map`
"""
skip = kwargs.get('skip', False)
suffix = kwargs.get('suffix', '')
suffix = ('_' * (suffix != '')) + suffix
nthreads = kwargs.get('nthreads', 8)
outfiles = []
temp_dir = os.path.abspath(os.path.expanduser(
kwargs.get('temp_dir', gettempdir())))
if mapper == 'gem':
gem_version = None
# check that we have the GEM binary:
gem_binary = mapper_binary or 'gem-mapper'
gem_binary = which(gem_binary)
if not gem_binary:
raise Exception('\n\nERROR: GEM binary not found, install it from:'
'\nhttps://sourceforge.net/projects/gemlibrary/files/gem-library/Binary%20pre-release%202/'
'\n - Download the GEM-binaries-Linux-x86_64-core_i3 if'
'have a recent computer, the '
'GEM-binaries-Linux-x86_64-core_2 otherwise\n - '
'Uncompress with "tar xjvf GEM-binaries-xxx.tbz2"\n - '
'Copy the binary gem-mapper to /usr/local/bin/ for '
'example (somewhere in your PATH).\n\nNOTE: GEM does '
'not provide any binary for MAC-OS.')
try:
out, err = Popen([gem_binary,'--version'], stdout=PIPE, stderr=STDOUT,
universal_newlines=True).communicate()
gem_version = int(out[1])
except ValueError as e:
gem_version = 2
print('Falling to gem v2')
if mapper_params and isinstance(mapper_params, dict):
kwargs.update(mapper_params)
# create directories
for rep in [temp_dir, out_map_dir]:
mkdir(rep)
# check space
fspace = int(get_free_space_mb(temp_dir, div=3))
if fspace < 200:
warn('WARNING: only %d Gb left on tmp_dir: %s\n' % (fspace, temp_dir))
# iterative mapping
base_name = os.path.split(fastq_path)[-1].replace('.gz', '')
base_name = '.'.join(base_name.split('.')[:-1])
input_reads = fastq_path
if windows is None:
light_storage = True
windows = (None, )
elif isinstance(windows[0], int):
# if windows starts at zero we do not need to store all the sequence
# otherwise we need it because sequence can be trimmed two times
# in fragment based mapping
light_storage = True if not windows[0] else False
windows = [tuple(windows)]
else:
# ensure that each element is a tuple, not a list
windows = [tuple(win) for win in windows]
# in this case we will need to keep the information about original
# sequence at any point, light storage is thus not possible.
light_storage = False
for win in windows:
# Prepare the FASTQ file and iterate over them
curr_map, counter = transform_fastq(
input_reads, mkstemp(prefix=base_name + '_', dir=temp_dir)[1],
fastq=is_fastq(input_reads),
min_seq_len=min_seq_len, trim=win, skip=skip, nthreads=nthreads,
light_storage=light_storage)
# clean
if input_reads != fastq_path and clean:
print(' x removing original input %s' % input_reads)
os.system('rm -f %s' % (input_reads))
# First mapping, full length
if not win:
beg, end = 1, 'end'
else:
beg, end = win
out_map_path = curr_map + '_full_%s-%s%s.map' % (beg, end, suffix)
if end:
print('Mapping reads in window %s-%s%s...' % (beg, end, suffix))
else:
print('Mapping full reads...', curr_map)
if not skip:
if mapper == 'gem':
_gem_mapping(mapper_index_path, curr_map, out_map_path,
gem_binary=gem_binary, gem_version=gem_version,
gem_params=mapper_params, **kwargs)
# parse map file to extract not uniquely mapped reads
print('Parsing result...')
if gem_version >= 3:
_sam_filter(out_map_path, curr_map,
curr_map + '_filt_%s-%s%s.map' % (beg, end, suffix),
os.path.join(out_map_dir,
base_name + '_full_%s-%s%s.map' % (beg, end, suffix)))
else:
_gem_filter(out_map_path,
curr_map + '_filt_%s-%s%s.map' % (beg, end, suffix),
os.path.join(out_map_dir,
base_name + '_full_%s-%s%s.map' % (
beg, end, suffix)))
elif mapper == 'bowtie2' or mapper == 'hisat2':
_bowtie2_mapping(mapper_index_path, curr_map, out_map_path,
bowtie2_binary=(mapper_binary if mapper_binary else mapper),
bowtie2_params=mapper_params, **kwargs)
# parse map file to extract not uniquely mapped reads
print('Parsing result...')
_sam_filter(out_map_path, curr_map,
curr_map + '_filt_%s-%s%s.map' % (beg, end, suffix),
os.path.join(out_map_dir,
base_name + '_full_%s-%s%s.map' % (beg, end, suffix)))
else:
raise Exception('ERROR: unknown mapper.')
# clean
if clean:
print(' x removing %s input %s' % (mapper.upper(),curr_map))
os.system('rm -f %s' % (curr_map))
print(' x removing map %s' % out_map_path)
os.system('rm -f %s' % (out_map_path))
# for next round, we will use remaining unmapped reads
input_reads = curr_map + '_filt_%s-%s%s.map' % (beg, end, suffix)
outfiles.append(
(os.path.join(out_map_dir,
base_name + '_full_%s-%s%s.map' % (beg, end, suffix)),
counter))
# map again splitting unmapped reads into RE fragments
# (no need to trim this time)
if frag_map:
if not r_enz:
raise Exception('ERROR: need enzyme name to fragment.')
frag_map, counter = transform_fastq(
input_reads, mkstemp(prefix=base_name + '_', dir=temp_dir)[1],
min_seq_len=min_seq_len, trim=win, fastq=False, r_enz=r_enz,
add_site=add_site, skip=skip, nthreads=nthreads,
light_storage=light_storage)
# clean
if clean:
print(' x removing pre-%s input %s' % (mapper.upper(),input_reads))
os.system('rm -f %s' % (input_reads))
if not win:
beg, end = 1, 'end'
else:
beg, end = win
out_map_path = frag_map + '_frag_%s-%s%s.map' % (beg, end, suffix)
if not skip:
if mapper == 'gem':
print('Mapping fragments of remaining reads...')
_gem_mapping(mapper_index_path, frag_map, out_map_path,
gem_binary=gem_binary, gem_version=gem_version,
**kwargs)
print('Parsing result...')
# check if output is sam format for gem3
if gem_version >= 3:
_sam_filter(out_map_path, frag_map,
curr_map + '_fail%s.map' % (suffix),
os.path.join(out_map_dir,
base_name + '_frag_%s-%s%s.map' % (beg, end, suffix)))
else:
_gem_filter(out_map_path, curr_map + '_fail%s.map' % (suffix),
os.path.join(out_map_dir,
base_name + '_frag_%s-%s%s.map' % (beg, end, suffix)))
elif mapper == 'bowtie2' or mapper == 'hisat2':
print('Mapping fragments of remaining reads...')
_bowtie2_mapping(mapper_index_path, frag_map, out_map_path,
bowtie2_binary=(mapper_binary if mapper_binary else mapper),
bowtie2_params=mapper_params, **kwargs)
print('Parsing result...')
_sam_filter(out_map_path, frag_map,
curr_map + '_fail%s.map' % (suffix),
os.path.join(out_map_dir,
base_name + '_frag_%s-%s%s.map' % (beg, end, suffix)))
else:
raise Exception('ERROR: unknown mapper.')
# clean
if clean:
print(' x removing %s input %s' % (mapper.upper(),frag_map))
os.system('rm -f %s' % (frag_map))
print(' x removing failed to map ' + curr_map + '_fail%s.map' % (suffix))
os.system('rm -f %s' % (curr_map + '_fail%s.map' % (suffix)))
print(' x removing tmp mapped %s' % out_map_path)
os.system('rm -f %s' % (out_map_path))
outfiles.append((os.path.join(out_map_dir,
base_name + '_frag_%s-%s%s.map' % (beg, end, suffix)),
counter))
if clean:
os.system('rm -rf %s' % (temp_dir))
if get_nread:
return outfiles
return [out for out, _ in outfiles]
def fast_fragment_mapping(mapper_index_path, fastq_path1, fastq_path2, r_enz,
genome_seq, out_map, clean=True, get_nread=False,
mapper_binary=None, mapper_params=None,
samtools = 'samtools', **kwargs):
"""
Maps FASTQ reads to an indexed reference genome with the knowledge of
the restriction enzyme used (fragment-based mapping).
:param mapper_index_path: path to index file created from a reference genome
using gem-index tool, bowtie2-build or hisat2-build
:param fastq_path1: PATH to FASTQ file of read 1, either compressed or not.
:param fastq_path2: PATH to FASTQ file of read 2, either compressed or not.
:param out_map_dir: path to outfile tab separated format containing mapped
read information.
:param r_enz: name of the restriction enzyme used in the experiment e.g.
HindIII.
:param genome_seq: a dictionary generated by :func:`pyatdbit.parser.genome_parser.parse_fasta`.
containing the genomic sequence
:param False clean: remove intermediate files created in temp_dir
:param False get_nread: returns a list of lists where each element contains
a path and the number of reads processed
:param 4 nthreads: number of threads to use for mapping (number of CPUs)
:param /tmp temp_dir: important to change. Intermediate FASTQ files will be
written there.
:param gem-mapper mapper_binary: path to the binary mapper
:param None mapper_params: extra parameters for the mapper
:param samtools samtools: path to samtools binary.
:returns: outfile with the intersected read pairs
"""
suffix = kwargs.get('suffix', '')
suffix = ('_' * (suffix != '')) + suffix
nthreads = kwargs.get('nthreads', 8)
samtools = which(samtools)
# check out folder
if not os.path.isdir(os.path.dirname(os.path.abspath(out_map))):
raise Exception('\n\nERROR: Path to store the output does not exist.\n')
temp_dir = os.path.abspath(os.path.expanduser(
kwargs.get('temp_dir', gettempdir())))
gem_version = None
# check that we have the GEM binary:
gem_binary = mapper_binary or 'gem-mapper'
gem_binary = which(gem_binary)
if not gem_binary:
raise Exception('\n\nERROR: GEM v3 binary not found, install it from:'
'\nhttps://github.com/smarco/gem3-mapper'
'Copy the binary gem-mapper to /usr/local/bin/ for '
'example (somewhere in your PATH).\n')
try:
out, err = Popen([gem_binary,'--version'], stdout=PIPE, stderr=STDOUT,
universal_newlines=True).communicate()
gem_version = int(out[1])
except ValueError as e:
gem_version = 2
print('Falling to gem v2')
if gem_version < 3:
raise Exception('\n\nERROR: GEM v3 binary not found, install it from:'
'\nhttps://github.com/smarco/gem3-mapper'
'Copy the binary gem-mapper to /usr/local/bin/ for '
'example (somewhere in your PATH).\n')
if mapper_params:
kwargs.update(mapper_params)
# create directories
for rep in [temp_dir]:
mkdir(rep)
# check space
fspace = int(get_free_space_mb(temp_dir, div=3))
if fspace < 200:
warn('WARNING: only %d Gb left on tmp_dir: %s\n' % (fspace, temp_dir))
# iterative mapping
base_name1 = os.path.split(fastq_path1)[-1].replace('.gz', '')
base_name1 = '.'.join(base_name1.split('.')[:-1])
curr_map1, _ = transform_fastq(
fastq_path1, mkstemp(prefix=base_name1 + '_', dir=temp_dir)[1],
fastq=is_fastq(fastq_path1), nthreads=nthreads, light_storage=True)
base_name2 = os.path.split(fastq_path2)[-1].replace('.gz', '')
base_name2 = '.'.join(base_name2.split('.')[:-1])
curr_map2, count_fastq = transform_fastq(
fastq_path2, mkstemp(prefix=base_name2 + '_', dir=temp_dir)[1],
fastq=is_fastq(fastq_path1), nthreads=nthreads, light_storage=True)
out_map_path = curr_map1 + '_frag%s.map' % (suffix)
print('Mapping fragments of remaining reads...')
_gem_mapping(mapper_index_path, curr_map1, out_map_path,fastq_path2=curr_map2,
r_enz=r_enz, gem_binary=gem_binary, gem_version=gem_version, **kwargs)
# clean
if clean:
print(' x removing GEM 3 input %s' % (curr_map1))
os.system('rm -f %s' % (curr_map1))
print(' x removing GEM 3 input %s' % (curr_map2))
os.system('rm -f %s' % (curr_map2))
#sort sam file
os.system(samtools + ' sort -n -O SAM -@ %d -T %s -o %s %s'
% (nthreads, out_map_path, out_map_path, out_map_path))
genome_lengths = dict((crm, len(genome_seq[crm])) for crm in genome_seq)
frag_chunk = kwargs.get('frag_chunk', 100000)
frags = map_re_sites(r_enz, genome_seq, frag_chunk=frag_chunk)
if samtools and nthreads > 1:
print('Splitting sam file')
# headers
for i in range(nthreads):
os.system(samtools + ' view -H -O SAM %s > "%s_%d"'
% (out_map_path, out_map_path, (i+1)))
chunk_lines = int((count_fastq*2.3)/nthreads) # estimate lines in sam with reads and frags
os.system(samtools + ''' view -O SAM %s | awk -v n=%d -v FS="\\t" '
BEGIN { part=0; line=n }
{ if( line>=n && $1!=last_read ) {part++; line=1; print $0 >> "%s_"part }
else { print $0 >> "%s_"part; line++; }
last_read = $1;
}'
''' % (out_map_path, chunk_lines, out_map_path, out_map_path))
if clean:
print(' x removing tmp mapped %s' % out_map_path)
os.system('rm -f %s' % (out_map_path))
print('Parsing results...')
kwargs['nthreads'] = 1
procs = []
pool = mu.Pool(nthreads)
for i in range(nthreads):
frags_shared = copy.deepcopy(frags)
procs.append(pool.apply_async(
parse_gem_3c, args=('%s_%d' % (out_map_path,(i+1)),
'%s_parsed_%d' % (out_map_path,(i+1)),
copy.deepcopy(genome_lengths), frags_shared,
False, True), kwds=kwargs))
#results.append('%s_parsed_%d' % (out_map_path,(i+1)))
pool.close()
pool.join()
results = [proc.get() for proc in procs if proc.get()]
if clean:
for i in range(nthreads):
print(' x removing tmp mapped %s_%d' % (out_map_path,(i+1)))
os.system('rm -f %s_%d' % (out_map_path,(i+1)))
#Final sort and merge
nround = 0
while len(results) > 1:
nround += 1
num_procs = min(nthreads,int(len(results)/2))
pool = mu.Pool(num_procs)
procs = [pool.apply_async(
merge_sort,
(results.pop(0), results.pop(0), out_map_path+'_%d' % nround, i, True)
) for i in range(num_procs)]
pool.close()
pool.join()
results = [proc.get() for proc in procs if proc.get()]
map_out = open(out_map, 'w')
tmp_reads_fh = open(results[0],'r')
for crm in genome_seq:
map_out.write('# CRM %s\t%d\n' % (crm, len(genome_seq[crm])))
for read_line in tmp_reads_fh:
read = read_line.split('\t')
map_out.write('\t'.join([read[0]]+read[2:8]+read[9:]))
map_out.close()
if clean:
print(' x removing tmp mapped %s' % results[0])
os.system('rm -f %s' % (results[0]))
else:
print('Parsing result...')
parse_gem_3c(out_map_path, out_map, genome_lengths, frags, verbose=False,
tmp_format=False, **kwargs)
# clean
if clean:
print(' x removing tmp mapped %s' % out_map_path)
os.system('rm -f %s' % (out_map_path))
if get_nread:
return [(out_map,count_fastq)]
return out_map