"""
14 May 2013
"""
from __future__ import print_function
from pytadbit.utils.extraviews import colorize, tadbit_savefig
from pytadbit.utils.extraviews import _tad_density_plot
from random import random, shuffle
from sys import stdout
from pytadbit.boundary_aligner.aligner import align
try:
from scipy.interpolate import interp1d
except ImportError:
from pytadbit.utils.tadmaths import Interpolate as interp1d
[docs]class Alignment(object):
"""
Alignment of TAD borders
"""
def __init__(self, name, alignment, experiments, consensus, score=None):
self.name = name
self.__experiments = experiments
self.__values = []
self.__keys = []
self.__len = None
self.consensus = consensus
for seq, exp in zip(alignment, experiments):
self.add_aligned_exp(exp.name, seq)
self.score = score
def __len__(self):
return self.__len
def __str__(self):
"""
calls Alignment.print_alignment
"""
return self.write_alignment(string=True)
def __repr__(self):
"""
Alignment description
"""
return ('Alignment of boundaries (length: %s, ' +
'number of experiments: %s)') % (len(self), len(self.__keys))
def __getitem__(self, i):
try:
return self.__values[i]
except TypeError:
try:
i = self.__keys.index(i)
return self[i]
except ValueError:
raise ValueError('ERROR: %s not in alignment' % (i))
def __setitem__(self, i, j):
if i in self.__keys:
self.__values[self.__keys[i]] = j
else:
self.__values.append(j)
self.__keys.append(i)
def __delitem__(self, i):
try:
self.__values.pop(i)
self.__keys.pop(i)
except TypeError:
try:
i = self.__keys.index(i)
self.__values.pop(i)
self.__keys.pop(i)
except ValueError:
raise ValueError('ERROR: %s not in alignment' % (i))
def __next__(self):
return next(self.__keys)
def __iter__(self):
for key in self.__keys:
yield key
[docs] def iteritems(self):
"""
Iterate over experiment names and aligned boundaries
"""
for i in range(len(self)):
yield (self.__keys[i], self.__values[i])
[docs] def itervalues(self):
"""
Iterate over experiment names and aligned boundaries
"""
for i in range(len(self)):
yield self.__values[i]
[docs] def itercolumns(self):
"""
Iterate over columns in the alignment
"""
for pos in range(len(self)):
col = []
for exp in range(len(self.__keys)):
col.append(self[exp][pos])
yield col
[docs] def write_alignment(self, names=None, xpers=None, string=False,
title='', ftype='ansi'):
"""
Print alignment of TAD boundaries between different experiments.
Alignments are displayed with colors according to the TADbit
confidence score for each boundary.
:param None names: if None print all experiments
:param None xpers: if None print all experiments
:param False string: return string instead of printing
:param ansi ftype: display colors in 'ansi' or 'html' format
"""
if xpers:
xpers = [self.__experiments[n.name] for n in xpers]
else:
xpers = self.__experiments
if not names:
names = [n.name for n in xpers]
length = max([len(n) for n in names])
if ftype == 'html':
out = '''<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<!-- This file was created with the aha Ansi HTML Adapter. http://ziz.delphigl.com/tool_aha.php -->
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="application/xml+xhtml; charset=UTF-8" />
<title>stdin</title>
</head>
<h1>%s</h1>
<body>
<pre>''' % (title)
elif ftype == 'ansi':
out = title + '\n' if title else ''
else:
raise NotImplementedError('Only ansi and html ftype implemented.\n')
out += 'Alignment shown in %s Kb (%s experiments) (' % (
int(xpers[0].resolution // 1000), len(xpers))
out += 'scores: %s)\n' % (' '.join(
[colorize(x, x, ftype) for x in range(11)]))
for i, xpr in enumerate(xpers):
if not xpr.name in self.__keys:
continue
# res = xpr.resolution / 1000
out += ('%' + str(length) + 's') % (names[i])
out += ':'
for x in self[xpr.name]:
if x['end'] == 0.0:
out += '| ' + '-' * 4 + ' '
continue
cell = str(int(x['end']) + 1) # * res # TODO: +1
out += ('|' + ' ' * (6 - len(cell)) +
colorize(cell, x['score'], ftype))
out += '\n'
if ftype == 'html':
out += '</pre></body></html>'
if string:
return out
print(out)
[docs] def get_column(self, cond1, cond2=None, min_num=None):
"""
Get a list of column responding to a given characteristic.
:param cond1: can be either a column number or a function to search for
a given condition that may fulfill boundaries.
:param None cond2: a second function to search for a given condition
that may fulfill boundaries.
:param None min_num: Number of boundaries in a given column that have to
pass the defined conditions (all boundaries may pass them by default).
:returns: a list of column, each column represented by a tuple which
first element is the column number in the alignment and second
element is the list of boundaries in this column.
**Example**
::
ali = Alignment('ALL', crm.get_alignment('ALL'),
crm.experiments, score)
ali.get_column(3)
# [(3, [>55<, ,>56<, >55<, >58<])]
# now we want boundaries with high scores
cond1 = lambda x: x['score'] > 5
# and we want boundaries to be detected in Experiments exp1 and exp2
cond2=lambda x: x['exp'] in ['exp1', 'exp2']
ali.get_column(cond1=cond1, cond2=cond2, min_num=2)
# [(33, [>268<, >192<, >-<, >-<]),
# (46, [>324<, >323<, >335<, >329<]),
# (51, [>348<, >335<, >357<, >347<]),
# (56, [>374<, >358<, >383<, >-<]),
# (64, [>397<, >396<, >407<, >-<]),
# (77, [>444<, >442<, >456<, >-<])]
"""
if isinstance(cond1, int):
val = cond1 - 1
cond1 = lambda x: x['pos'] == val
elif isinstance(cond1, list):
val = [v - 1 for v in cond1]
cond1 = lambda x: x['pos'] in val
if not cond2:
cond2 = lambda x: True
cond = lambda x: cond1(x) and cond2(x)
min_num = min_num or len(self.__keys)
column = []
for pos in range(len(self)):
col = []
cnt = 0
for exp in range(len(self.__keys)):
col.append(self.__values[exp][pos])
if cond(self.__values[exp][pos]):
cnt += 1
if cnt >= min_num:
column.append((pos, col))
return column
def add_aligned_exp(self, name, seq):
p = 1
scores = []
exp = self.__experiments[name]
for i, pos in enumerate(seq):
if pos == '-':
scores.append(TAD((('brk', None),
('end', 0.0),
('score', 0.0),
('start', 0.0)), i,
self.__experiments[name]))
continue
try:
while exp.tads[p]['brk'] < 0:
p += 1
except KeyError:
continue
scr = abs(exp.tads[p]['score'])
scores.append(TAD(exp.tads[p], i, self.__experiments[name]))
scores[-1]['score'] = scr
p += 1
# print name, len(scores)
if not self.__len:
self.__len = len(scores)
elif self.__len != len(scores):
raise AssertionError('ERROR: alignments of different lengths ' +
'(%s != %s)\n' % (self.__len, len(scores)))
self.__values.append(scores)
self.__keys.append(name)
[docs] def draw(self, focus=None, extras=None, ymax=None, ali_colors=('grey',),
normalized=True, savefig=None, shape='ellipse'):
"""
Draw alignments as a plot.
:param None focus: can pass a tuple (bin_start, bin_stop) to display the
alignment between these genomic bins
:param None extras: list of coordinates (genomic bin) where to draw a
red cross
:param None ymax: limit the y axis up to a given value
:param ('grey', ): successive colors for alignment
:param True normalized: normalized Hi-C count are plotted instead of raw
data.
:param 'ellipse' shape: which kind of shape to use as schematic
representation of TADs. Implemented: 'ellipse', 'rectangle',
'triangle'
:param None savefig: path to a file where to save the image generated;
if None, the image will be shown using matplotlib GUI (the extension
of the file name will determine the desired format).
"""
from matplotlib.cm import jet
from matplotlib import pyplot as plt
experiments = self.__experiments
maxres = max([e.resolution for e in experiments])
facts = [maxres // e.resolution for e in experiments]
siz = experiments[0].size
if focus:
figsiz = 4 + (focus[1] - focus[0]) // 30
else:
figsiz = 4 + siz // 30
fig, axes = plt.subplots(nrows=len(experiments),
sharex=True, sharey=True,
figsize=(figsiz, 1 + len(experiments) * 1.8))
fig.subplots_adjust(hspace=0)
maxys = []
for iex, xpr in enumerate(experiments):
if not xpr.name in self:
continue
_tad_density_plot(xpr, maxys=maxys, normalized=normalized,
fact_res=facts[iex], axe=axes[iex],
extras=extras, shape=shape, focus=focus)
# draw alignment columns
start = focus[0] if focus else 1
end = focus[1] if focus else xpr.tads[max(xpr.tads)]['end']
try:
maxy = (ymax or max(maxys)) + 0.4
maxxs = []
for iex in range(len(experiments)):
starting = focus[0] if focus else 1
ending = (focus[1] if focus
else list(experiments[iex].tads.values())[-1]['end'])
axes[iex].hlines(1, 1, end, 'k', lw=1.5)
axes[iex].set_ylim((0, maxy))
maxxs.append(ending / facts[iex])
axes[iex].text(starting + 1, float(maxy) / 20,
experiments[iex].name, {'ha': 'left',
'va': 'bottom'})
axes[iex].set_yticks([float(i) / 2
for i in range(1, int(maxy + .5) * 2)])
if ymax:
axes[iex].set_ylim((0, ymax))
axes[iex].set_xlim(xmin=starting, xmax=max(maxxs))
except ValueError:
pass
pos = {'ha': 'center', 'va': 'bottom'}
for i, col in enumerate(self.itercolumns()):
ends = sorted([(t['end'], j) for j, t in enumerate(col) if t['end']])
beg = (ends[0][0] + 0.9) / facts[ends[0][1]]
end = (ends[-1][0] + 1.1) / facts[ends[-1][1]]
if focus:
if beg < focus[0] or end > focus[1]:
continue
axes[0].text(beg + float(end - beg) / 2, maxy + float(maxy) / 20,
str(i + 1), pos,
rotation=90, size='small')
for iex, tad in enumerate(col):
if not tad['end']:
continue
axes[iex].axvspan(beg-.2, end+.2, alpha=0.2,
color=ali_colors[i%(len(ali_colors))])
axes[iex].set_xlabel('Genomic bin')
tit1 = fig.suptitle("TAD borders' alignment", size='x-large')
tit2 = axes[0].set_title("Alignment column number")
tit2.set_y(1.3)
plt.subplots_adjust(top=0.76)
# This was for color bar instead of legend
# ax1 = fig.add_axes([0.9 + 0.3/figsiz, 0.05, 0.2/figsiz, 0.9])
# cb1 = colorbar.ColorbarBase(ax1, cmap=jet,
# norm=colors.Normalize(vmin=0., vmax=1.))
# cb1.set_label('Border prediction score')
# cb1.ax.set_yticklabels([str(i)for i in range(1, 11)])
fig.set_facecolor('white')
plots = []
for scr in range(1, 11):
plots += plt.plot((100,),(100,), marker=6, ms=9,
color=jet(float(scr) / 10), mec='none')
try:
axes[-1].legend(plots,
[str(scr) for scr in range(1, 11)],
numpoints=1, title='Border scores',
fontsize='small', loc='lower left',
bbox_to_anchor=(1, 0.5))
except TypeError:
axes[-1].legend(plots,
[str(scr) for scr in range(1, 11)],
numpoints=1, title='Border scores',
loc='lower left',
bbox_to_anchor=(1, 0.5))
if savefig:
tadbit_savefig(savefig)
else:
plt.show()
[docs]class TAD(dict):
"""
Specific class of TADs, used only within Alignment objects.
It is directly inheriting from python dict.
a TAD these keys:
- 'start': position of the TAD
- 'end': position of the TAD
- 'score': of the prediction of boundary
- 'brk': same as 'end'
- 'pos': in the alignment (column number)
- 'exp': Experiment this TAD belongs to
- 'index': of this TAD within all TADs in the Experiment
"""
def __init__(self, thing, i, exp):
super(TAD, self).__init__(thing)
idx = [t for t in exp.tads if exp.tads[t]['start']==self['start']][0]
self.update(dict((('pos', i),('exp', exp), ('index', idx))))
def __repr__(self):
return '>' + (str(int(self['end']) * self['exp'].resolution // 1000) \
if int(self['end']) else '-') + '<'
def _interpolation(experiments):
"""
Calculate the distribution of TAD lengths, and interpolate it using
interp1d function from scipy.
:param experiments: names of experiments included in the
distribution
:return: function to interpolate a given TAD length according to a
probability value
"""
# get all TAD lengths and multiply it by bin size of the experiment
norm_tads = []
for tad in experiments:
for brk in list(tad.tads.values()):
if not brk['brk']:
continue
norm_tads.append((brk['end'] - brk['start']) * tad.resolution)
win = [0.0]
cnt = [0.0]
max_d = max(norm_tads)
# we ask here for a mean number of 20 values per bin
bin_s = int(max_d / (float(len(norm_tads)) / 20))
for bee in range(0, int(max_d) + bin_s, bin_s):
win.append(len([i for i in norm_tads
if bee < i <= bee + bin_s]) + win[-1])
cnt.append(bee + float(bin_s))
win = [float(v) / max(win) for v in win]
## to see the distribution and its interpolation
#distr = interp1d(x, y)
#from matplotlib import pyplot as plt
#plt.plot([distr(float(i)/1000) for i in xrange(1000)],
# [float(i)/1000 for i in xrange(1000)])
#plt.hist(norm_tads, normed=True, bins=20, cumulative=True)
#plt.show()
return interp1d(win, cnt)
[docs]def randomization_test(xpers, score=None, num=1000, verbose=False, max_dist=100000,
rnd_method='interpolate', r_size=None, method='reciprocal'):
"""
Return the probability that original alignment is better than an
alignment of randomized boundaries.
:param tads: original TADs of each experiment to align
:param distr: the function to interpolate TAD lengths from probability
:param None score: just to print it when verbose
:param 1000 num: number of random alignment to generate for comparison
:param False verbose: to print something nice
:param interpolate method: how to generate random tads (alternative is
'shuffle'). 'interpolate' will calculate the distribution of TAD lengths,
and generate a random set of TADs according to this distribution (see
:func:`pytadbit.alignment.generate_rnd_tads`). In contrast, the 'shuffle'
method uses directly the set of observed TADs and shuffle them (see
:func:`pytadbit.alignment.generate_shuffle_tads`).
"""
if not rnd_method in ['interpolate', 'shuffle']:
raise Exception('method should be either "interpolate" or ' +
'"shuffle"\n')
if rnd_method == 'interpolate' and not r_size:
raise Exception('should provide Chromosome r_size if interpolate\n')
tads = []
for xpr in xpers:
if not xpr.tads:
raise Exception('No TADs defined, use find_tad function.\n')
tads.append([(t['end'] - t['start']) * \
xpr.resolution for t in list(xpr.tads.values())])
rnd_distr = []
# rnd_len = []
distr = _interpolation(xpers) if rnd_method is 'interpolate' else None
rnd_exp = lambda : tads[int(random() * len(tads))]
for val in range(num):
if verbose:
val = float(val)
if not val / num * 100 % 5:
stdout.write('\r' + ' ' * 10 +
' randomizing: '
'%.2f completed' % (100 * val/num))
stdout.flush()
if rnd_method is 'interpolate':
rnd_tads = [generate_rnd_tads(r_size, distr)
for _ in range(len(tads))]
# rnd_len.append(float(sum([len(r) for r in rnd_tads]))
# / len(rnd_tads))
else:
rnd_tads = [generate_shuffle_tads(rnd_exp())
for _ in range(len(tads))]
# rnd_len.append(len(tads))
rnd_distr.append(align(rnd_tads, verbose=False, method=method,
max_dist=max_dist)[0][1])
# aligns, sc = align(rnd_tads, verbose=False)
# rnd_distr.append(sc)
# for xpr in aligns:
# print sc, '|'.join(['%5s' % (str(x/1000)[:-2] \
# if x!='-' else '-' * 4)\
# for x in xpr])
# print ''
pval = float(len([n for n in rnd_distr if n > score])) / len(rnd_distr)
if verbose:
stdout.write('\n %s randomizations finished.' % (num))
stdout.flush()
print(' Observed alignment score: %s' % (score))
# print ' Mean number of boundaries: {}; observed: {}'.format (
# sum(rnd_len)/len(rnd_len),
# str([len(self.experiments[e].brks)
# for e in self.experiments]))
print('Randomized scores between %s and %s; observed: %s' % (
min(rnd_distr), max(rnd_distr), score))
print('p-value: %s' % (pval if pval else '<%s' % (1./num)))
return pval
[docs]def generate_rnd_tads(chromosome_len, distr, start=0):
"""
Generates random TADs over a chromosome of a given size according to a given
distribution of lengths of TADs.
:param chromosome_len: length of the chromosome
:param distr: function that returns a TAD length depending on a p value
:param bin_size: size of the bin of the Hi-C experiment
:param 0 start: starting position in the chromosome
:returns: list of TADs
"""
pos = start
tads = []
while True:
pos += distr(random())
if pos > chromosome_len:
break
tads.append(float(pos))
return tads
[docs]def generate_shuffle_tads(tads):
"""
Returns a shuffle version of a given list of TADs
:param tads: list of TADs
:returns: list of shuffled TADs
"""
rnd_tads = tads[:]
shuffle(rnd_tads)
tads = []
for tad in rnd_tads:
if tads:
tad += tads[-1]
tads.append(tad)
return tads