Source code for lipyd.mgf

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
#  This file is part of the `lipyd` python module
#
#  Copyright (c) 2015-2018 - EMBL
#
#  File author(s): Dénes Türei (turei.denes@gmail.com)
#
#  Distributed under the GNU GPLv3 License.
#  See accompanying file LICENSE.txt or copy at
#      http://www.gnu.org/licenses/gpl-3.0.html
#
#  Website: http://www.ebi.ac.uk/~denes
#

import os
import sys
import re
import imp
import numpy as np

import lipyd.lookup as lookup
import lipyd.session as session
import lipyd.settings as settings


[docs]class MgfReader(object): stRrtinseconds = 'RTINSECONDS' stRtitle = 'TITLE' stRbe = 'BE' stRen = 'EN' stRch = 'CH' stRpepmass = 'PEPMASS' stRempty = '' stRcharge = 'CHARGE' reln = re.compile(r'^([A-Z]+).*=([\d\.]+)[\s]?([\d\.]*)["]?$') def __init__( self, fname, label = None, charge = 1, rt_tolerance = 1.0, drift = 1.0, tolerance = None ): """ Provides methods for looking up MS2 scans from an MGF file. """ self.fname = fname self.label = label self.charge = charge self.rt_tolerance = rt_tolerance self.drift = drift self.index() self.log = session.get_log() self.ms2_within_range = settings.get('ms2_within_range') self.tolerance = ( tolerance or settings.get('precursor_match_tolerance') )
[docs] def reload(self): modname = self.__class__.__module__ mod = __import__(modname, fromlist=[modname.split('.')[0]]) imp.reload(mod) new = getattr(mod, self.__class__.__name__) setattr(self, '__class__', new)
[docs] def index(self): """ Indexing offsets in one MS2 MGF file. Columns: -- pepmass -- intensity -- retention time -- scan num -- offset in file -- fraction num """ features = [] offset = 0 cap_next = False with open(self.fname, 'rb', 8192) as fp: for l in fp: l = l.decode('ascii') if ( l.strip() and not l[0].isdigit() and not l[:2] == self.stRbe and not l[:2] == self.stRen ): if not l[:2] == self.stRch: try: m = self.reln.match(l.strip()).groups() except: sys.stdout.write( 'Line in MGF file `%s`' 'could not be processed: ' '\n\t`%s`\n' % (self.fname, l) ) continue if m[0] == self.stRtitle: scan = float(m[1]) if m[0] == self.stRrtinseconds: rtime = float(m[1]) / 60.0 if m[0] == self.stRpepmass: pepmass = float(m[1]) intensity = ( 0.0 if m[2] == self.stRempty else float(m[2]) ) if self.charge is None: cap_next = True else: _charge = int(l[7]) if len(l) >= 8 else None if self.charge is None or _charge == self.charge: cap_next = True elif cap_next: features.append([ pepmass, # precursor ion mass intensity, # intensity rtime, # retention time scan, # scan ID offset, # byte offset in file self.label # fraction ID ]) # reset all values scan = None rtime = None intensity = None pepmass = None _charge = None cap_next = False offset += len(l) # sorted by precursor mass self.mgfindex = np.array( sorted(features, key = lambda x: x[0]), dtype = np.object ) self.scan_index = dict(zip( self.mgfindex[:,3].astype(int), # scan indices range(len(self)) # row numbers ))
[docs] def lookup(self, mz, rt = None, tolerance = None): """ Looks up an MS1 m/z and returns the indices of MS2 scans in the MGF file. Returns 2 numpy arrays of the same length: first one with the indices, second one with the RT differences. """ if self.log.verbosity > 4: self.log.msg( 'Recalibrated m/z: %.08f; drift = %.08f; ' 'measured m/z: %.08f' % (mz, self.drift, mz / self.drift) ) rt = rt or np.nan mz_uncorr = mz / self.drift idx = np.array(lookup.findall( self.mgfindex[:,0], mz_uncorr, tolerance or self.tolerance )) rtdiff = np.array([self.mgfindex[i,2] - rt for i in idx]) if self.log.verbosity > 4: self.log.msg( 'Looking up MS1 m/z %.08f. ' 'MS2 scans with matching precursor mass: %u' % ( mz_uncorr, len(idx), ) ) if self.ms2_within_range: if np.isnan(rt): self.log.msg( 'No MS1 RT provided, could not check RT ' 'difference of MS2 scans.' ) idx = idx[ np.logical_or( np.isnan(rtdiff), np.abs(rtdiff) < self.rt_tolerance ) ] if self.log.verbosity > 4: self.log.msg( 'RT range: %.03f--%.03f; ' 'Matching MS2 scans within this range: %u' % ( rtlower, rtupper, len(idx), ) ) elif self.log.verbosity > 4: self.log.msg('Not checking RT.') return idx, rtdiff
[docs] def lookup_scan_ids(self, mz, rt = None, tolerance = None): """ Same as `lookup` but returns scan ids instead of indices. """ idx, rtdiff = self.lookup(mz, rt, tolerance) ids = np.array([self.mgfindex[i, 3] for i in idx], dtype = int) return ids, rtdiff
[docs] def get_scan(self, i): """ Reads MS2 fragment peaks from one scan. Returns m/z's and intensities in 2 columns array. """ scan = [] self.get_file() # jumping to offset self.fp.seek(int(self.mgfindex[i, 4]), 0) # zero means no clue about charge charge = 0 for l in self.fp: if l[:6] == self.stRcharge: # one chance to obtain the charge charge = int(l.strip()[-2]) continue if not l[0].isdigit(): # end of scan break else: # reading fragment masses mi = l.strip().split() if len(mi) == 1: continue intensity = float(mi[1]) if intensity > 0.0: scan.append([ float(mi[0]), # mass intensity # intensity ]) if self.log.verbosity > 4: self.log.msg( 'Read scan #%u from file `%s`;' '%u peaks retrieved.' % ( self.mgfindex[i, 3], self.fname, len(scan), ) ) return np.array(scan)
[docs] def get_scans(self, mz, rt = None): """ Looks up all scans for one precursor mass and RT, yields 2 column arrays of MS2 m/z's and intensities and delta RTs. Calls `get_scan` for all indices returned by `lookup`. """ idx, rtdiff = self.lookup(mz, rt = rt) for i, r in zip(idx, rtdiff): yield self.get_scan(i), r
[docs] def i_by_id(self, scan_id): """ Returns the row number for one scan ID. """ return self.scan_index.get(int(scan_id), None)
[docs] def precursor_by_id(self, scan_id): """ Returns the precursor ion mass by scan ID. """ i = self.i_by_id(scan_id) return self.mgfindex[i,0] if i is not None else None
[docs] def scan_by_id(self, scan_id): """ Retrieves a scan by its ID as used in the MGF file. Scan ID is an integer number, the number of the scan in the sequence of the whole experiment. Returns the scan as 2 columns array by `get_scan` or `None` if the scan ID could not be found. """ i = self.i_by_id(scan_id) return self.get_scan(i) if i is not None else None
[docs] def get_file(self): """ Returns the file pointer, opens the file if necessary. """ if not hasattr(self, 'fp') or self.fp.closed: self.fp = open(self.fname, 'r')
def __len__(self): return self.mgfindex.shape[0] def __del__(self): if hasattr(self, 'fp') and not self.fp.closed: self.fp.close()