Source code for pytadbit.chromosome

"""
26 Nov 2012

"""
from __future__ import print_function

from future import standard_library
standard_library.install_aliases()
from string                            import ascii_lowercase as letters
from copy                              import deepcopy as copy
from pickle                            import load, dump
from random                            import random
from math                              import sqrt
from sys                               import stderr
from os.path                           import exists
from pytadbit.boundary_aligner.aligner import align
from pytadbit                          import tadbit
from pytadbit.utils.extraviews         import tadbit_savefig
from pytadbit.utils.extraviews         import _tad_density_plot
from pytadbit.experiment               import Experiment
from pytadbit.alignment                import Alignment, randomization_test
from functools import reduce

try:
    import matplotlib.pyplot as plt
except ImportError:
    stderr.write('matplotlib not found\n')

try:
    basestring
except NameError:
    basestring = str

[docs]def load_chromosome(in_f, fast=2): """ Load a Chromosome object from a file. A Chromosome object can be saved with the :func:`Chromosome.save_chromosome` function. :param in_f: path to a saved Chromosome object file :param 2 fast: if fast=2 do not load the Hi-C data (in the case that they were saved in a separate file see :func:`Chromosome.save_chromosome`). If fast is equal to 1, the weights will be skipped from load to save memory. Finally if fast=0, both the weights and Hi-C data will be loaded :returns: a Chromosome object TODO: remove first try/except type error... this is loading old experiments """ with open(in_f,'rb') as f_in_f: dico = load(f_in_f) name = '' crm = Chromosome(dico['name']) try: exp_order = dico['experiment_order'] except KeyError: exp_order = list(dico['experiments'].keys()) for name in exp_order: xpr = Experiment(name, dico['experiments'][name]['resolution'], no_warn=True) xpr.tads = dico['experiments'][name]['tads'] xpr.norm = dico['experiments'][name]['wght'] xpr.hic_data = dico['experiments'][name]['hi-c'] xpr.conditions = dico['experiments'][name]['cond'] xpr.size = dico['experiments'][name]['size'] xpr._zeros = dico['experiments'][name].get('zero', {}) try: # new in version post-CSDM13 xpr.identifier = dico['experiments'][name]['iden'] xpr.cell_type = dico['experiments'][name]['cell'] xpr.exp_type = dico['experiments'][name]['expt'] xpr.enzyme = dico['experiments'][name]['enzy'] xpr.description = dico['experiments'][name]['desc'] except KeyError: xpr.identifier = None xpr.cell_type = None xpr.exp_type = None xpr.enzyme = None xpr.description = {} try: crm.experiments.append(xpr) except TypeError: continue crm.size = dico['size'] crm.r_size = dico['r_size'] crm.max_tad_size = dico['max_tad_size'] crm.forbidden = dico['forbidden'] crm._centromere = dico['_centromere'] try: # new in version post-CSDM13 crm.species = dico['species'] crm.assembly = dico['assembly'] crm.description = dico['description'] except KeyError: crm.species = None crm.assembly = None crm.description = {} if isinstance(dico['experiments'][name]['hi-c'], basestring) or fast != int(2): try: dicp = load(open(in_f + '_hic','rb')) for name in dico['experiments']: crm.get_experiment(name).hic_data = dicp[name]['hi-c'] if fast != 1: crm.get_experiment(name).norm = dicp[name]['wght'] except IOError: try: for name in dico['experiments']: crm.get_experiment(name).hic_data = dico['experiments'][name]['hi-c'] if fast != 1: crm.get_experiment(name).norm = dico['experiments'][name]['wght'] except KeyError: raise Exception('ERROR: file %s not found\n' % ( dico['experiments'][name]['hi-c'])) elif not fast: stderr.write('WARNING: data not saved correctly for fast loading.\n') return crm
[docs]class Chromosome(object): """ A Chromosome object designed to deal with Topologically Associating Domains predictions from different experiments, in different cell types for a given chromosome of DNA, and to compare them. :param name: name of the chromosome (might be a chromosome name for example) :param None species: species name :param None assembly: version number of the genomic assembly used :param None resolutions: list of resolutions corresponding to a list of experiments passed. :param None experiment_hic_data: :py:func:`list` of paths to files containing the Hi-C count matrices corresponding to different experiments :param None experiment_tads: :py:func:`list` of paths to files containing the definition of TADs corresponding to different experiments :param None experiment_names: :py:func:`list` of the names of each experiment :param infinite max_tad_size: maximum TAD size allowed. TADs longer than this value will not be considered, and size of the corresponding chromosome size will be reduced accordingly :param 0 chr_len: size of the DNA chromosome in bp. By default it will be inferred from the distribution of TADs :param None parser: a parser function that returns a tuple of lists representing the data matrix and the length of a row/column. With the file example.tsv: :: chrT_001 chrT_002 chrT_003 chrT_004 chrT_001 629 164 88 105 chrT_002 164 612 175 110 chrT_003 88 175 437 100 chrT_004 105 110 100 278 the output of parser('example.tsv') would be be: ``[([629, 164, 88, 105, 164, 612, 175, 110, 88, 175, 437, 100, 105, 110, 100, 278]), 4]`` :param None kw_descr: any other argument passed would be stored as complementary descriptive field. For example:: crm = Chromosome('19', species='mus musculus', subspecies='musculus musculus', skin_color='black') print crm # Chromosome 19: # 0 experiment loaded: # 0 alignment loaded: # species : mus musculus # assembly version: UNKNOWN # subspecies : musculus musculus # skin_color : black *note that these fields may appear in the header of generated out files* :return: Chromosome object """ def __init__(self, name, species=None, assembly=None, experiment_resolutions=None, experiment_tads=None, experiment_hic_data=None, experiment_norm_data=None, experiment_names=None, max_tad_size=float('inf'), chr_len=0, parser=None, centromere_search=False, silent=False, **kw_descr): self.name = name self.size = self._given_size = self.r_size = chr_len self.max_tad_size = max_tad_size self.r_size = self.size self.forbidden = {} # only used for TAD alignment randomization self.experiments = ExperimentList([], self) self._centromere = None self.alignment = AlignmentDict() self.description = kw_descr self.species = species self.assembly = assembly self._search_centromere = centromere_search if experiment_tads: for i, handler in enumerate(experiment_tads or []): name = experiment_names[i] if experiment_names else None self.add_experiment(name, experiment_resolutions[i], tad_def=handler, parser=parser) if experiment_hic_data: for i, handler in enumerate(experiment_hic_data or []): name = experiment_names[i] if experiment_names else None try: xpr = self.get_experiment(name) xpr.load_hic_data(handler, silent=silent) continue except: pass if isinstance(handler, Experiment): handler.name = name or handler.name self.experiments.append(handler) else: self.add_experiment(name, experiment_resolutions[i], hic_data=handler, parser=parser, silent=silent) if experiment_norm_data: for i, handler in enumerate(experiment_norm_data or []): name = experiment_names[i] if experiment_names else None try: xpr = self.get_experiment(name) xpr.load_norm_data(handler, silent=silent) continue except: pass if isinstance(handler, Experiment): handler.name = name or handler.name self.experiments.append(handler) else: self.add_experiment(name, experiment_resolutions[i], norm_data=handler, parser=parser, silent=silent) def __repr__(self): outstr = 'Chromosome %s:\n' % self.name outstr += (' %-2s experiment%s loaded: ' % ( len(self.experiments), 's' * (len(self.experiments) > 0)) + ', '.join([e.name for e in self.experiments]) + '\n') outstr += (' %-2s alignment%s loaded: ' % ( len(self.alignment), 's' * (len(self.alignment) > 0)) + ', '.join([a.name for a in self.alignment]) + '\n') try: # new in version post-CSDM13 outstr += ' species : %s\n' % (self.species or 'UNKNOWN') outstr += ' assembly version: %s\n' % (self.assembly or 'UNKNOWN') for desc in self.description: outstr += ' %-16s: %s\n' % (desc, self.description[desc]) except AttributeError: pass return outstr def _get_forbidden_region(self, xpr, resized=False): """ Find the regions for which there is no information in any of the experiments. This is used to infer the relative chromosome size. """ if not xpr.tads: return forbidden = [] for pos in xpr.tads: start = float(xpr.tads[pos]['start']) end = float(xpr.tads[pos]['end']) diff = end - start if diff * xpr.resolution > self.max_tad_size: forbidden += list(range(int(start), int(end+1))) xpr.tads[pos]['score'] = -abs(xpr.tads[pos]['score']) else: xpr.tads[pos]['score'] = abs(xpr.tads[pos]['score']) if not self.forbidden: self.forbidden = dict([(f, None) for f in forbidden]) else: self.forbidden = dict([(f, None) for f in set(forbidden).intersection(self.forbidden)]) # search for centromere: if self._search_centromere: self._find_centromere(xpr) # add centromere as forbidden region: if self._centromere: for pos in range(int(self._centromere[0]), int(self._centromere[1])): self.forbidden[pos] = 'Centromere' if not resized: self.__update_size(xpr)
[docs] def get_experiment(self, name): """ Fetch an Experiment according to its name. This can also be done directly with Chromosome.experiments[name]. :param name: name of the experiment to select :returns: :class:`pytadbit.Experiment` """ for exp in self.experiments: if exp.name == name: return exp raise Exception(('ERROR: experiment ' + '%s not found\n') % (name))
[docs] def save_chromosome(self, out_f, fast=True, divide=True, force=False): """ Save a Chromosome object to a file (it uses :py:func:`pickle.load` from the :py:mod:`pickle`). Once saved, the object can be loaded with :func:`load_chromosome`. :param out_f: path to the file where to store the :py:mod:`pickle` object :param True fast: if True, skip Hi-C data and weights :param True divide: if True writes two pickles, one with what would result by using the fast option, and the second with the Hi-C and weights data. The second file name will be extended by '_hic' (ie: with out_f='chromosome12.pik' we would obtain chromosome12.pik and chromosome12.pik_hic). When loaded :func:`load_chromosome` will automatically search for both files :param False force: overwrite the existing file """ while exists(out_f) and not force: out_f += '_' dico = {'experiments': {}, 'experiment_order': [xpr.name for xpr in self.experiments]} if divide: dicp = {} for xpr in self.experiments: dico['experiments'][xpr.name] = { 'size' : xpr.size, 'cond' : xpr.conditions, 'tads' : xpr.tads, 'resolution': xpr.resolution, 'hi-c' : None, 'wght' : None, 'iden' : xpr.identifier, 'cell' : xpr.cell_type, 'expt' : xpr.exp_type, 'enzy' : xpr.enzyme, 'desc' : xpr.description, 'zero' : xpr._zeros } if fast: continue if divide: dicp[xpr.name] = { 'wght': xpr.norm, 'hi-c': xpr.hic_data} dico['experiments'][xpr.name]['wght'] = None dico['experiments'][xpr.name]['hi-c'] = None else: dico['experiments'][xpr.name]['wght'] = xpr.norm dico['experiments'][xpr.name]['hi-c'] = xpr.hic_data dico['name'] = self.name dico['size'] = self.size dico['r_size'] = self.r_size dico['max_tad_size'] = self.max_tad_size dico['forbidden'] = self.forbidden dico['_centromere'] = self._centromere dico['species'] = self.species dico['assembly'] = self.assembly dico['description'] = self.description out = open(out_f, 'wb') dump(dico, out) out.close() if not fast and divide: out = open(out_f + '_hic', 'wb') dump(dicp, out) out.close()
[docs] def align_experiments(self, names=None, verbose=False, randomize=False, rnd_method='interpolate', rnd_num=1000, get_score=False, **kwargs): """ Align the predicted boundaries of two different experiments. The resulting alignment will be stored in the self.experiment list. :param None names: list of names of the experiments to align. If None, align all :param experiment1: name of the first experiment to align :param experiment2: name of the second experiment to align :param -0.1 penalty: penalty for inserting a gap in the alignment :param 100000 max_dist: maximum distance between two boundaries allowing match (100Kb seems fair with HUMAN chromosomes) :param False verbose: if True, print some information about the alignments :param False randomize: check the alignment quality by comparing randomized boundaries over Chromosomes of the same size. This will return a extra value, the p-value of accepting that the observed alignment is not better than a random alignment :param False get_score: returns alignemnt object, alignment score and percentage of identity from one side and from the other :param interpolate rnd_method: by default uses the interpolation of TAD distribution. The alternative method is 'shuffle', where TADs are simply shuffled :param 1000 rnd_num: number of randomizations to do :param reciprocal method: if global, Needleman-Wunsch is used to align (see :func:`pytadbit.boundary_aligner.globally.needleman_wunsch`); if reciprocal, a method based on reciprocal closest boundaries is used (see :func:`pytadbit.boundary_aligner.reciprocally.reciprocal`) :returns: an alignment object or, if the randomizattion was invoked, an alignment object, and a list of statistics that are, the alignment score, the probability that observed alignment performs better than randoms, the proportion of borders from the first experiment found aligned in the second experiment and the proportion of borders from the second experiment found aligned in the first experiment. Returned calues can be catched like this: ali = crm.align_experiments() or, with randomization test: ali, (score, pval, prop1, prop2) = crm.align_experiments(randomize=True) """ if names: xpers = ExperimentList([self.get_experiment(n) for n in names], self) else: xpers = self.experiments tads = [] for xpr in xpers: if not xpr.tads: raise Exception('No TADs defined, use find_tad function.\n') tads.append([xpr.tads[x]['brk'] * xpr.resolution for x in xpr.tads if xpr.tads[x]['score'] >= 0]) (aligneds, score, perc1, perc2), consensus = align(tads, verbose=verbose, **kwargs) name = tuple(sorted([x.name for x in xpers])) ali = Alignment(name, aligneds, xpers, consensus, score=score) self.alignment[name] = ali if verbose: print(self.alignment[name]) if not randomize: if get_score: return ali, score, perc1, perc2 else: return ali p_value = randomization_test(xpers, score=score, rnd_method=rnd_method, verbose=verbose, r_size=self.r_size, num=rnd_num, **kwargs) return ali, (score, p_value, perc1, perc2)
[docs] def add_experiment(self, name, resolution=None, tad_def=None, hic_data=None, norm_data=None, replace=False, parser=None, conditions=None, **kwargs): """ Add a Hi-C experiment to Chromosome :param name: name of the experiment or of the Experiment object :param resolution: resolution of the experiment (needed if name is not an Experiment object) :param None hic_data: whether a file or a list of lists corresponding to the Hi-C data :param None tad_def: a file or a dict with precomputed TADs for this experiment :param False replace: overwrite the experiments loaded under the same name :param None parser: a parser function that returns a tuple of lists representing the data matrix and the length of a row/column. With a file example.tsv containing: :: chrT_001 chrT_002 chrT_003 chrT_004 chrT_001 629 164 88 105 chrT_002 164 612 175 110 chrT_003 88 175 437 100 chrT_004 105 110 100 278 the output of parser('example.tsv') would be: ``[([629, 164, 88, 105, 164, 612, 175, 110, 88, 175, 437, 100, 105, 110, 100, 278]), 4]`` """ if not name: name = ''.join([letters[int(random() * len(letters))] \ for _ in range(5)]) stderr.write('WARNING: No name provided, random name ' + 'generated: %s\n' % (name)) if name in self.experiments: if 'hi-c' in self.get_experiment(name) and not replace: stderr.write( '''WARNING: Hi-C data already loaded under the name: %s. This experiment will be kept under %s.\n''' % (name, name + '_')) name += '_' if isinstance(name, Experiment): self.experiments.append(name) elif resolution: self.experiments.append(Experiment( name, resolution, hic_data=hic_data, norm_data=norm_data, tad_def=tad_def, parser=parser, conditions=conditions, **kwargs)) else: raise Exception('resolution param is needed\n')
[docs] def find_tad(self, experiments, name=None, n_cpus=1, verbose=True, max_tad_size="max", heuristic=True, batch_mode=False, **kwargs): """ Call the :func:`pytadbit.tadbit.tadbit` function to calculate the position of Topologically Associated Domain boundaries :param experiment: A square matrix of interaction counts of Hi-C data or a list of such matrices for replicated experiments. The counts must be evenly sampled and not normalized. 'experiment' can be either a list of lists, a path to a file or a file handler :param True normalized: if False simple normalization will be computed, as well as a simple column filtering will be applied (remove columns where value at the diagonal is null) :param 1 n_cpus: The number of CPUs to allocate to TADbit. If n_cpus='max' the total number of CPUs will be used :param max max_tad_size: an integer defining the maximum size of a TAD. Default (auto) defines it as the number of rows/columns :param True heuristic: whether to use or not some heuristics :param False batch_mode: if True, all the experiments will be concatenated into one for the search of TADs. The resulting TADs found are stored under the name 'batch' plus a concatenation of the experiment names passed (e.g.: if experiments=['exp1', 'exp2'], the name would be: 'batch_exp1_exp2'). """ experiments = experiments or self.experiments if not isinstance(experiments, list): experiments = [experiments] xprs = [] for xpr in experiments: if not isinstance(xpr, Experiment): xpr = self.get_experiment(xpr) xprs.append(xpr) # if normalized and (not xpr._zeros or not xpr._normalization): # raise Exception('ERROR: Experiments should be normalized, and' + # ' filtered first') if len(xprs) <= 1 and batch_mode: raise Exception('ERROR: batch_mode implies that more than one ' + 'experiment is passed') if batch_mode: matrix = [] if not name: name = 'batch' resolution = xprs[0].resolution for xpr in sorted(xprs, key=lambda x: x.name): if xpr.resolution != resolution: raise Exception('All Experiments must have the same ' + 'resolution\n') matrix.append(xpr.hic_data[0]) if name.startswith('batch'): name += '_' + xpr.name siz = xprs[0].size tmp = reduce(lambda x, y: x.__add__(y, silent=True), xprs) tmp.filter_columns(silent=kwargs.get('silent', False)) remove = tuple([1 if i in tmp._zeros else 0 for i in range(siz)]) result = tadbit(matrix, remove=remove, n_cpus=n_cpus, verbose=verbose, max_tad_size=max_tad_size, no_heuristic=not heuristic, **kwargs) xpr = Experiment(name, resolution, hic_data=matrix, tad_def=result, **kwargs) xpr._zeros = xprs[0]._zeros for other in xprs[1:]: xpr._zeros = dict([(k, None) for k in set(xpr._zeros.keys()).intersection( list(other._zeros.keys()))]) self.add_experiment(xpr) return for xpr in xprs: result = tadbit( xpr.hic_data, remove=tuple([1 if i in xpr._zeros else 0 for i in range(xpr.size)]), n_cpus=n_cpus, verbose=verbose, max_tad_size=max_tad_size, no_heuristic=not heuristic, **kwargs) xpr.load_tad_def(result) self._get_forbidden_region(xpr)
def __update_size(self, xpr): """ Update the chromosome size and relative size after loading new Hi-C experiments, unless the Chromosome size was defined by hand. """ if not self._given_size: self.size = max(xpr.tads[max(xpr.tads)]['end'] * xpr.resolution, self.size) self._get_forbidden_region(xpr, resized=True) self.r_size = self.size - len(self.forbidden) * xpr.resolution
[docs] def tad_density_plot(self, name, axe=None, focus=None, extras=None, normalized=True, savefig=None, shape='ellipse'): """ Draw an summary of the TAD found in a given experiment and their density in terms of relative Hi-C interaction count. :param name: name of the experiment to visualize :param None focus: can pass a tuple (bin_start, bin_stop) to display the alignment between these genomic bins :param None extras: list of coordinates (genomic bin) where to draw a red cross :param None ymax: limit the y axis up to a given value :param ('grey', ): successive colors for alignment :param True normalized: normalized Hi-C count are plotted instead of raw data. :param 'ellipse' shape: which kind of shape to use as schematic representation of TADs. Implemented: 'ellipse', 'rectangle', 'triangle' :param None savefig: path to a file where to save the image generated; if None, the image will be shown using matplotlib GUI (the extension of the file name will determine the desired format). """ if not self.experiments[name].tads: raise Exception("TAD borders not found\n") _tad_density_plot(self.experiments[name], axe=axe, focus=focus, extras=extras, normalized=normalized, savefig=savefig, shape=shape)
[docs] def visualize(self, names=None, tad=None, focus=None, paint_tads=False, axe=None, show=True, logarithm=True, normalized=False, relative=True, decorate=True, savefig=None, clim=None, scale=(8, 6), cmap='jet'): """ Visualize the matrix of Hi-C interactions of a given experiment :param None names: name of the experiment to visualize, or list of experiment names. If None, all experiments will be shown :param None tad: a given TAD in the form: :: {'start': start, 'end' : end, 'brk' : end, 'score': score} **Alternatively** a list of the TADs can be passed (all the TADs between the first and last one passed will be showed. Thus, passing more than two TADs might be superfluous) :param None focus: a tuple with the start and end positions of the region to visualize :param False paint_tads: draw a box around the TADs defined for this experiment :param None axe: an axe object from matplotlib can be passed in order to customize the picture :param True show: either to pop-up matplotlib image or not :param True logarithm: show the logarithm values :param True normalized: show the normalized data (weights might have been calculated previously). *Note: white rows/columns may appear in the matrix displayed; these rows correspond to filtered rows (see* :func:`pytadbit.utils.hic_filtering.hic_filtering_for_modelling` *)* :param True relative: color scale is relative to the whole matrix of data, not only to the region displayed :param True decorate: draws color bar, title and axes labels :param None savefig: path to a file where to save the image generated; if None, the image will be shown using matplotlib GUI (the extension of the file name will determine the desired format). :param None clim: tuple with minimum and maximum value range for color scale. I.e. clim=(-4, 10) :param 'jet' cmap: color map from matplotlib. Can also be a preconfigured cmap object. """ if names == None: names = [xpr.name for xpr in self.experiments] if not isinstance(names, list) and not isinstance(names, tuple): names = [names] cols = 1 rows = 1 else: sqrtxpr = sqrt(len(names)) cols = int(round(sqrtxpr + (0.0 if int(sqrtxpr)==sqrtxpr else .5))) rows = int(sqrtxpr+.5) notaxe = axe == None if not scale: scale = (8, 6) if notaxe and len(names) != 1: fig = plt.figure(figsize=(scale[0] * cols, scale[1] * rows)) for i in range(rows): for j in range(cols): if i * cols + j >= len(names): break if notaxe and len(names) != 1: axe = fig.add_subplot( rows, cols, i * cols + j + 1) if (isinstance(names[i * cols + j], tuple) or isinstance(names[i * cols + j], list)): if not axe: fig = plt.figure(figsize=(scale[0] * cols, scale[1] * rows)) axe = fig.add_subplot( rows, cols, i * cols + j + 1) xpr1 = self.get_experiment(names[i * cols + j][0]) xpr2 = self.get_experiment(names[i * cols + j][1]) img = xpr1.view(tad=tad, focus=focus, paint_tads=paint_tads, axe=axe, show=False, logarithm=logarithm, normalized=normalized, relative=relative, decorate=decorate, savefig=False, where='up', clim=clim, cmap=cmap) img = xpr2.view(tad=tad, focus=focus, paint_tads=paint_tads, axe=axe, show=False, logarithm=logarithm, normalized=normalized, relative=relative, decorate=False, savefig=False, where='down', clim=clim or img.get_clim(), cmap=cmap) #axe = axe.twinx() #axe.set_aspect('equal',adjustable='box-forced',anchor='NE') if decorate: plt.text(1.01, .5, 'Chromosome %s experiment %s' % ( self.name, xpr2.name), rotation=-90, va='center', size='large', ha='left', transform=axe.transAxes) else: xper = self.get_experiment(names[i * cols + j]) if not xper.hic_data and not xper.norm: continue xper.view(tad=tad, focus=focus, paint_tads=paint_tads, axe=axe, show=False, logarithm=logarithm, normalized=normalized, relative=relative, decorate=decorate, savefig=False, clim=clim, cmap=cmap) if savefig: tadbit_savefig(savefig) if show: plt.show()
[docs] def get_tad_hic(self, tad, x_name, normed=True, matrix_num=0): """ Retrieve the Hi-C data matrix corresponding to a given TAD. :param tad: a given TAD (:py:class:`dict`) :param x_name: name of the experiment :param True normed: if True, normalize the Hi-C data :returns: Hi-C data matrix for the given TAD """ beg, end = int(tad['start']), int(tad['end']) xpr = self.get_experiment(x_name) size = xpr.size matrix = [[0 for _ in range(beg, end)]\ for _ in range(beg, end)] for i, tadi in enumerate(range(beg, end)): tadi = tadi * size for j, tadj in enumerate(range(beg, end)): if normed: matrix[j][i] = xpr.norm[0][tadi + tadj] else: matrix[j][i] = xpr.hic_data[matrix_num][tadi + tadj] return matrix
[docs] def iter_tads(self, x_name, normed=True): """ Iterate over the TADs corresponding to a given experiment. :param x_name: name of the experiment :param True normed: normalize Hi-C data returned :yields: Hi-C data corresponding to each TAD """ if not self.get_experiment(x_name).hic_data: raise Exception('No Hi-c data for %s experiment\n' % (x_name)) for name, ref in self.get_experiment(x_name).tads.items(): yield name, self.get_tad_hic(ref, x_name, normed=normed)
[docs] def set_max_tad_size(self, value): """ Change the maximum size allowed for TADs. It also applies to the computed experiments. :param value: an int value (default is 5000000) """ self.max_tad_size = value for xpr in self.experiments: for tad in xpr.tads: xpr.tads[tad]['brk'] = xpr.tads[tad]['end'] if ((xpr.tads[tad]['end'] - xpr.tads[tad]['start']) * xpr.resolution) > self.max_tad_size: xpr.tads[tad]['score'] = -abs(xpr.tads[tad]['score'])
def _find_centromere(self, xpr): """ Search for the centromere in a chromosome, assuming that :class:`Chromosome` corresponds to a real chromosome. Add a boundary to all the experiments where the centromere is. * A centromere is defined as the largest area where the rows/columns of the Hi-C matrix are empty. """ beg = end = 0 size = xpr.size try: hic = xpr.hic_data[0] except TypeError: return # search for largest empty region of the chromosome best = (0, 0, 0) pos = 0 for pos, raw in enumerate(range(0, size * size, size)): if sum([hic[i] for i in range(raw, raw + size)]) == 0 and not beg: beg = float(pos) if sum([hic[i] for i in range(raw, raw + size)]) != 0 and beg: end = float(pos) if (end - beg) > best[0]: best = ((end - beg), beg, end) beg = end = 0 # this is for weared cases where centromere is at the end of Hi-C data if beg and not end: end = float(pos) if (end - beg) > best[0]: best = ((end - beg), beg, end) beg, end = best[1:] if not beg or not end: return tads = xpr.tads # if we already have a centromere defined, check if it can be reduced if self._centromere: if beg > self._centromere[0]: # readjust TADs that have been split around the centromere for tad in tads: if tads[tad]['end'] == self._centromere[0]: tads[tad]['end'] = beg self._centromere[0] = beg if end < self._centromere[1]: # readjust TADs that have been split around the centromere for tad in tads: if tads[tad]['start'] == self._centromere[1]: tads[tad]['start'] = end self._centromere[1] = end else: self._centromere = [beg, end] # split TADs overlapping with the centromere if [True for t in list(tads.values()) \ if t['start'] < beg < t['end'] \ and t['start'] < end < t['end']]: tad = len(tads) + 1 plus = 0 while tad + plus > 1: start = tads[tad - 1 + plus]['start'] final = tads[tad - 1 + plus]['end'] # centromere found? if start < beg < final and start < end < final: tads[tad] = copy(tads[tad - 1]) tads[tad]['start'] = end tads[tad]['score'] = abs(tads[tad]['score']) if (tads[tad]['end'] - tads[tad]['start']) \ * xpr.resolution > self.max_tad_size: xpr.tads[tad]['score'] = -abs(xpr.tads[tad]['score']) tads[tad]['brk'] = tads[tad]['end'] tad -= 1 tads[tad] = copy(tads[tad]) tads[tad]['score'] = abs(tads[tad]['score']) tads[tad]['end'] = beg if (tads[tad]['end'] - tads[tad]['start']) \ * xpr.resolution > self.max_tad_size: xpr.tads[tad]['score'] = -abs(xpr.tads[tad]['score']) tads[tad]['brk'] = tads[tad]['end'] plus = 1 else: tads[tad] = copy(tads[tad - 1 + plus]) tad -= 1 # if tad includes centromere but starts in the same point elif [True for t in list(tads.values()) \ if t['start'] == beg and end < t['end']]: tad = len(tads) + 1 while tad > 1: start = tads[tad - 1]['start'] final = tads[tad - 1]['end'] # centromere found? if start == beg: tads[tad] = copy(tads[tad - 1]) tads[tad]['start'] = end tads[tad]['score'] = abs(tads[tad]['score']) if (tads[tad]['end'] - tads[tad]['start']) \ * xpr.resolution > self.max_tad_size: xpr.tads[tad]['score'] = -abs(xpr.tads[tad]['score']) else: tads[tad] = copy(tads[tad - 1]) tad -= 1 # if tad includes centromere but ends in the same point elif [True for t in list(tads.values()) \ if t['end'] == end and beg > t['start']]: tad = len(tads) + 1 plus = 0 while tad + plus > 1: start = tads[tad - 1 + plus]['start'] final = tads[tad - 1 + plus]['end'] # centromere found? if final == end: tads[tad] = copy(tads[tad - 1]) tads[tad]['start'] = beg tads[tad]['score'] = abs(tads[tad]['score']) if (tads[tad]['end'] - tads[tad]['start']) \ * xpr.resolution > self.max_tad_size: xpr.tads[tad]['score'] = -abs(xpr.tads[tad]['score']) tads[tad]['brk'] = tads[tad]['end'] tad -= 1 tads[tad] = copy(tads[tad]) tads[tad]['score'] = abs(tads[tad]['score']) tads[tad]['end'] = beg if (tads[tad]['end'] - tads[tad]['start']) \ * xpr.resolution > self.max_tad_size: xpr.tads[tad]['score'] = -abs(xpr.tads[tad]['score']) tads[tad]['brk'] = tads[tad]['end'] plus = 1 else: tads[tad] = copy(tads[tad - 1 + plus]) tad -= 1
[docs]class ExperimentList(list): """ Inherited from python built in :py:func:`list`, modified for TADbit :class:`pytadbit.Experiment`. Mainly, `getitem`, `setitem`, and `append` were modified in order to be able to search for experiments by index or by name, and to add experiments simply using Chromosome.experiments.append(Experiment). The whole ExperimentList object is linked to a Chromosome instance (:class:`pytadbit.Chromosome`). """ def __init__(self, thing, crm): super(ExperimentList, self).__init__(thing) self.crm = crm def __getitem__(self, i): try: return super(ExperimentList, self).__getitem__(i) except TypeError: for nam in self: if nam.name == i: return nam raise KeyError('Experiment %s not found\n' % (i)) def __setitem__(self, i, exp): try: super(ExperimentList, self).__setitem__(i, exp) exp.crm = self.crm self.crm._get_forbidden_region(exp) except TypeError: for j, nam in enumerate(self): if nam.name == i: exp.crm = self.crm self[j] = exp self.crm._get_forbidden_region(exp) break else: exp.crm = self.crm self.append(exp) self.crm._get_forbidden_region(exp) def __delitem__(self, i): try: super(ExperimentList, self).__delitem__(i) except TypeError: for j, nam in enumerate(self): if nam.name == i: exp = self.pop(j) del(exp) break else: raise KeyError('Experiment %s not found\n' % (i))
[docs] def append(self, exp): if exp.name in [e.name for e in self]: self[exp.name] = exp self.crm._get_forbidden_region(exp) else: super(ExperimentList, self).append(exp) self.crm._get_forbidden_region(exp) exp.crm = self.crm
class AlignmentDict(dict): """ :py:func:`dict` of :class:`pytadbit.Alignment` Modified getitem, setitem, and append in order to be able to search alignments by index or by name. linked to a :class:`pytadbit.Chromosome` """ def __getitem__(self, nam): try: return super(AlignmentDict, self).__getitem__(tuple(sorted(nam))) except KeyError: for i, key in enumerate(self): if nam == i: return self[key] raise TypeError('Alignment %s not found\n' % i)