Source code for pytadbit.utils.file_handling
"""
22 Jan 2014
"""
from __future__ import print_function
from builtins import next
from io import open, TextIOWrapper, BufferedReader
import ctypes
import os
import errno
import platform
import bz2
import gzip
import zipfile
import tarfile
from subprocess import Popen, PIPE
from multiprocessing import cpu_count
try:
basestring
except NameError:
basestring = str
def check_pik(path):
with open(path, "rt") as f:
f.seek (0, 2) # Seek @ EOF
fsize = f.tell() # Get Size
f.seek (max (fsize-2, 0), 0) # Set pos @ last n chars
key = f.read() # Read to end
return key == 's.'
[docs]def magic_open(filename, verbose=False, cpus=None):
"""
To read uncompressed zip gzip bzip2 or tar.xx files
:param filename: either a path to a file, or a file handler
:returns: opened file ready to be iterated
"""
textchars = bytearray({7,8,9,10,12,13,27} | set(range(0x20, 0x100)) - {0x7f})
is_binary_string = lambda bytes: bool(bytes.translate(None, textchars))
if isinstance(filename, basestring) or isinstance(filename, basestring):
fhandler = open(filename, 'rb')
inputpath = True
if tarfile.is_tarfile(filename):
print('tar')
thandler = tarfile.open(filename)
if len(thandler.members) != 1:
raise NotImplementedError(
'Not exactly one file in this tar archieve.')
return magic_open(thandler.extractfile(thandler.getnames()[0]))
else:
fhandler = filename
filename = fhandler.name
inputpath = False
start_of_file = ''
if filename.endswith('.dsrc'):
dsrc_binary = which('dsrc')
if not dsrc_binary:
raise Exception('\n\nERROR: DSRC binary not found, install it from:'
'\nhttps://github.com/lrog/dsrc/releases')
proc = Popen([dsrc_binary, 'd', '-t%d' % (cpus or cpu_count()),
'-s', filename], stdout=PIPE, universal_newlines=True)
return proc.stdout
if inputpath:
start_of_file = fhandler.read(1024)
fhandler.seek(0)
if is_binary_string(start_of_file):
if start_of_file.startswith(b'\x50\x4b\x03\x04'):
if verbose:
print('zip')
zhandler = TextIOWrapper(zipfile.ZipFile(fhandler))
if len(zhandler.NameToInfo) != 1:
raise NotImplementedError(
'Not exactly one file in this zip archieve.')
return TextIOWrapper(BufferedReader(zhandler.open(list(zhandler.NameToInfo.keys())[0])))
if is_binary_string(start_of_file) and start_of_file.startswith(b'\x42\x5a\x68'):
if verbose:
print('bz2')
fhandler.close()
return TextIOWrapper(BufferedReader(bz2.BZ2File(filename)))
if is_binary_string(start_of_file) and start_of_file.startswith(b'\x1f\x8b\x08'):
if verbose:
print('gz')
return TextIOWrapper(BufferedReader(gzip.GzipFile(fileobj=fhandler)))
else:
if verbose:
print('text')
fhandler.close()
fhandler = open(filename, 'r')
return fhandler
[docs]def get_free_space_mb(folder, div=2):
"""
Return folder/drive free space (in bytes)
Based on stackoverflow answer: http://stackoverflow.com/questions/51658/cross-platform-space-remaining-on-volume-using-python
:param folder: folder/drive to measure
:param 2 div: divide the size by (div*1024) to get the size in Mb (3 is Gb...)
"""
if platform.system() == 'Windows':
free_bytes = ctypes.c_ulonglong(0)
ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(folder), None, None, ctypes.pointer(free_bytes))
return free_bytes.value/1024/1024
else:
st = os.statvfs(folder)
return st.f_bavail * st.f_frsize/(1024**div)
def is_fastq(thing):
"""
Check if a given file is in fastq format
"""
if isinstance(thing, basestring):
fh = magic_open(thing)
else:
fh = thing
for _ in range(100):
if (next(fh).startswith('@') and next(fh)[0].upper() in 'ATGCN' and
next(fh).startswith('+') and next(fh)):
continue
return False
return True
def wc(fnam):
"""
Pythonic way to count lines
"""
return sum(1 for _ in open(fnam,'rt'))
def mkdir(dnam):
try:
os.mkdir(dnam)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(dnam):
pass
else:
raise
def which(program):
"""
stackoverflow: http://stackoverflow.com/questions/377017/test-if-executable-exists-in-python
"""
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
fpath, _ = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None