Source code for pytadbit.parsers.genome_parser
"""
17 nov. 2014
convert a bunch of fasta files, or a single multi fasta file, into a dictionary
"""
from __future__ import print_function
from collections import OrderedDict
import multiprocessing as mu
from os import path
import re
from pytadbit.utils.file_handling import magic_open
from functools import reduce
try:
basestring
except NameError:
basestring = str
[docs]def parse_fasta(f_names, chr_names=None, chr_filter=None, chr_regexp=None,
verbose=True, save_cache=True, reload_cache=False, only_length=False):
"""
Parse a list of fasta files, or just one fasta.
WARNING: The order is important
:param f_names: list of pathes to files, or just a single path
:param None chr_names: pass list of chromosome names, or just one. If None
are passed, then chromosome names will be inferred from fasta headers
:param None chr_filter: use only chromosome in the input list
:param None chr_regexp: use only chromosome matching
:param True save_cache: save a cached version of this file for faster
loadings (~4 times faster)
:param False reload_cache: reload cached genome
:param False only_length: returns dictionary with length of genome,not sequence
:returns: a sorted dictionary with chromosome names as keys, and sequences
as values (sequence in upper case)
"""
if isinstance(f_names, basestring):
f_names = [f_names]
if len(f_names) == 1:
fname = f_names[0] + '_genome.TADbit'
else:
fname = path.join(path.commonprefix(f_names), 'genome.TADbit')
if path.exists(fname) and not reload_cache:
if verbose:
print('Loading cached genome')
genome_seq = OrderedDict()
with open(fname) as f_open:
for line in f_open:
if line.startswith('>'):
c = line[1:].strip()
else:
if only_length:
genome_seq[c] = len(line.strip())
else:
genome_seq[c] = line.strip()
return genome_seq
if isinstance(chr_names, basestring):
chr_names = [chr_names]
if chr_filter:
bad_chrom = lambda x: not x in chr_filter
else:
bad_chrom = lambda x: False
if chr_regexp:
chr_regexp = re.compile(chr_regexp)
else:
chr_regexp = re.compile('.*')
genome_seq = OrderedDict()
if len(f_names) == 1:
header = None
seq = []
with magic_open(f_names[0]) as fhandler:
for line in fhandler:
if line.startswith('>'):
if header:
genome_seq[header] = ''.join(seq).upper()
header = line[1:].split()[0]
if bad_chrom(header) or not chr_regexp.match(header):
header = 'UNWANTED'
elif not chr_names:
if verbose:
print('Parsing %s' % (header))
else:
header = chr_names.pop(0)
if verbose:
print('Parsing %s as %s' % (line[1:].rstrip(),
header))
seq = []
continue
seq.append(line.rstrip())
if only_length:
genome_seq[header] = len(seq)
else:
genome_seq[header] = ''.join(seq).upper()
if 'UNWANTED' in genome_seq:
del(genome_seq['UNWANTED'])
else:
for fnam in f_names:
with magic_open(f_nam) as fhandler:
try:
while True:
if not chr_names:
header = next(fhandler)
if header.startswith('>'):
header = header[1:].split()[0]
if bad_chrom(header) or not chr_regexp.match(header):
header = 'UNWANTED'
genome_seq[header] = ''
break
else:
_ = next(fhandler)
header = chr_names.pop(0)
if bad_chrom(header):
header = 'UNWANTED'
genome_seq[header] = ''
break
except StopIteration:
raise Exception('No crocodiles found, is it fasta?')
if only_length:
genome_seq[header] = sum(len(l.rstrip()) for l in fhandler)
else:
genome_seq[header] = ''.join([l.rstrip() for l in fhandler]).upper()
if 'UNWANTED' in genome_seq:
del(genome_seq['UNWANTED'])
if save_cache and not only_length:
if verbose:
print('saving genome in cache')
if len(f_names) == 1:
fname = f_names[0] + '_genome.TADbit'
else:
fname = path.join(path.commonprefix(f_names), 'genome.TADbit')
out = open(fname, 'w')
for c in genome_seq:
out.write('>%s\n%s\n' % (c, genome_seq[c]))
out.close()
return genome_seq
def get_gc_content(genome, resolution, chromosomes=None, n_cpus=None, by_chrom=False):
"""
Get GC content by bins of a given size. Ns are nottaken into account in the
calculation, only the number of Gs and Cs over As, Ts, Gs and Cs
:param genome: a TADbit parsed genome object
:param resolution:
:param None chromosomes: GC content only calculated over these chromosomes
:param None n_cpus: parallelize (can't parallelize more than the number of
chromosomes)
:param False by_chrom: if False returns a unique list for the full genome
"""
chromosomes = chromosomes if chromosomes else list(genome.keys())
if not n_cpus:
n_cpus = mu.cpu_count()
pool = mu.Pool(n_cpus)
get_chr_gc = _get_chr_gc_dico if by_chrom else _get_chr_gc_list
jobs = {}
for crm in chromosomes:
jobs[crm] = pool.apply_async(get_chr_gc, args=(genome[crm], resolution))
pool.close()
pool.join()
if by_chrom:
gc_content = dict((crm, jobs[crm].get()) for crm in chromosomes)
else:
gc_content = reduce(lambda x,y: x + y, (jobs[crm].get() for crm in chromosomes))
return gc_content
def _get_chr_gc_list(chrom, resolution):
gc_content = []
for pos in range(0, len(chrom), resolution):
seq = chrom[pos:pos + resolution]
try:
gc_content.append(float(seq.count('G') + seq.count('C')) /
(len(seq) - seq.count('N')))
except ZeroDivisionError:
gc_content.append(float('nan'))
return gc_content
def _get_chr_gc_dico(chrom, reso):
gc_content = {}
for pos in range(0, len(chrom), reso):
seq = chrom[pos:pos + reso]
try:
gc_content[pos /reso] = (float(seq.count('G') + seq.count('C')) /
(len(seq) - seq.count('N')))
except ZeroDivisionError:
gc_content[pos /reso] = float('nan')
return gc_content