Source code for tfm_utils.tfmp

#!/usr/bin/env python
"""
tfm_utils contains Python wrappers for the TFM-Pvalue program. It allows for
accurate calculation of p-values associated with scores for a given motif PWM.
Naturally, it can also be used to calculate the score corresponding to a given
p-value for a transcription factor motif PWM.

The original program can be downloaded at:
http://bioinfo.lifl.fr/tfm-pvalue/tfm-pvalue.php

Additionally, see this paper for how the p-values and thresholds are calculated:
Efficient and accurate P-value computation for Position Weight Matrices
H. Touzet and J.S. Varre
Algorithms for Molecular Biology 2007, 2:15

Copyright (C) 2017  Jared Andrews

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""

from __future__ import absolute_import, division, print_function, unicode_literals
import tfm_utils.pytfmpval as tfm
import psutil
from math import ceil

import os, sys
import pandas as pd
import numpy as np
import ctypes


[docs]def create_matrix(matrix_file, bg=[0.25, 0.25, 0.25, 0.25], mat_type="counts", log_type="nat"): """ From a JASPAR formatted motif matrix count file, create a Matrix object. This function also converts it to a log-odds (position weight) matrix if necessary. Args: matrix_file (str): White-space delimited string of row-concatenated motif matrix. bg (list of floats): Background nucleotide frequencies for [A, C, G, T]. mat_type (str): Type of motif matrix provided. Options are: "counts", "pfm", "pwm". "counts" is for raw count matrices for each base at each position. "pfm" is for position frequency matrices (frequencies already calculated. "pwm" is for position weight matrices (also referred to as position-specific scoring matrices.) log_type (str): Base to use for log. Default is to use the natural log. "log2" is the other option. This will affect the scores and p-values. Returns: m (tfm_utils Matrix): Matrix in pwm format. """ a, c, g, t = bg[0], bg[1], bg[2], bg[3] m = tfm.Matrix(a, c, g, t) if not os.path.exists(matrix_file): raise FileNotFoundError() m.readJasparMatrix(matrix_file) if mat_type.upper() == "COUNTS": if log_type.upper() == "NAT": m.toLogOddRatio() elif log_type.upper() == "LOG2": m.toLog2OddRatio() else: print("Improper log type argument, using natural log.") m.toLogOddRatio() return m
[docs]def toPWM(df: pd.DataFrame): """ Converts DFs in PPM (position probability matrix) or PCM (position count matrix) and forces then to PWM (position weight matrix format. .. math:: \sum_{i=1}^{\\infty} x_{i} Args: df (pd.DataFrame): Takes a dataframe with appropriate columns or rows in any of the following forms Returns: """ # rule 1: if all values can be casts to integers, then its almost defiantly a count matrix try: np.array(df.values).astype(int, casting="safe") df = orient_df(df) df = df / df.sum(axis=0) return np.log2(df) except TypeError: pass # rule 2: if any of the values are negative its likely already a PWM if (np.array(df) < 0).any(): return df # if we reach this point we assume that we are using frequency values else: # make them like log neg2 or something return np.log2(df)
[docs]def read_matrix(matrix, bg=[0.25, 0.25, 0.25, 0.25], mat_type="counts", log_type="nat"): """ From a string of space-delimited counts create a Matrix object. Break the string into 4 rows corresponding to A, C, G, and T. This function also converts it to a log-odds (position weight) matrix if necessary. Args: matrix_file (str): White-space delimited string of row-concatenated motif matrix. bg (list of floats): Background nucleotide frequencies for [A, C, G, T]. mat_type (str): Type of motif matrix provided. Options are: "counts", "pfm", "pwm". "counts" is for raw count matrices for each base at each position. "pfm" is for position frequency matrices (frequencies already calculated). "pwm" is for position weight matrices (also referred to as position-specific scoring matrices.) log_type (str): Base to use for log. Default is to use the natural log. "log2" is the other option. This will affect the scores and p-values. Returns: m (tfm_utils Matrix): Matrix in pwm format. """ try: a, c, g, t = bg[0], bg[1], bg[2], bg[3] if (len(matrix.split()) % 4) != 0: raise ValueError("Uneven rows in motif matrix. Ensure rows of equal length in input.") m = tfm.Matrix(a, c, g, t) m.readMatrix(matrix) if mat_type.upper() == "COUNTS": if log_type.upper() == "NAT": m.toLogOddRatio() elif log_type.upper() == "LOG2": m.toLog2OddRatio() else: print("Improper log type argument, using natural log.") m.toLogOddRatio() return m except ValueError as error: print(repr(error))
[docs]def orient_df(df): column_names =[x.upper() for x in df.columns if type(x) == str] rows = [x.upper() for x in df.index if type(x) == str] if "A" in column_names and "C" in column_names and "G" in column_names and "T" in column_names: return df.traspose() elif "A" in rows and "C" in rows and "G" in rows and "T" in rows: return df else: raise ValueError("Columns or rows of df must include all of 'A', 'C', 'G', 'T'.")
[docs]def df_to_matrix(df, bg = [.25, .25, .25, .25]): """ Converts a dataframe into the space delimited form required by the rest of the library. Args: df (pandas.DataFrame): A dataframe with the PWM values in it. Must have columns with names "A'", "C", "T", and "G" or rows with the same name Returns: values (str): All values of the df in the correct order and space delimited """ column_names =[x.upper() for x in df.columns if type(x) == str] rows = [x.upper() for x in df.index if type(x) == str] if "A" in column_names and "C" in column_names and "G" in column_names and "T" in column_names: values = np.array([df[x].values for x in ["A", "C", "G", "T"]]).flatten() elif "A" in rows and "C" in rows and "G" in rows and "T" in rows: values = np.array([df.loc[x].values for x in ["A", "C", "G", "T"]]).flatten() else: raise ValueError("DataFrame must have all of ['A', 'C' 'T' 'G'] in the column names or row index") string = ' '.join([str(x) for x in values]) return read_matrix(string, bg)
[docs]def score2pval(matrix, req_score, mem_thresh=2.0): """ Determine the p-value for a given score for a specific motif PWM. Args: matrix (tfm_utils Matrix): Matrix in pwm format. req_score (float): Requested score for which to determine the p-value. mem_thresh (float): Memory in GBs to remain free to system. Once passed, the closest p-val approximation will be returned instead of the exact p-val. Should only occur rarely with very long and degenerate motifs. Used to help ensure the system won't run out of memory due to these outliers. This is only calculated after each pass, each of which is more time and memory intensive than the last, so changing this value isn't recommended unless accuracy out to the 8th decimal place is really necessary. Returns: pv (float): The calculated p-value corresponding to the score. """ if isinstance(matrix, pd.DataFrame): matrix = toPWM(matrix) matrix = df_to_matrix(matrix) mem_thresh = mem_thresh * 1000 * 1024 * 1024 granularity = 0.1 max_granularity = 1e-10 decrgr = 10 # Factor to increase granularity by after each iteration. pv = tfm.doublep() ppv = tfm.doublep() pv.assign(.001) ppv.assign(.001) while granularity > max_granularity: matrix.computesIntegerMatrix(granularity) if np.isnan(matrix.errorMax): matrix.errorMax = 0.01 #check if this makes any sense max_s = int(req_score * matrix.granularity + matrix.offset + matrix.errorMax + 1) min_s = int(req_score * matrix.granularity + matrix.offset - matrix.errorMax - 1) score = int(req_score * matrix.granularity + matrix.offset) matrix.lookForPvalue(score, min_s, max_s, ppv, pv) mem = psutil.virtual_memory() if mem.available <= mem_thresh: print("Memory usage threshold passed, returning closest approximation.") return pv.value() if ppv.value() == pv.value(): return pv.value() granularity = granularity / decrgr print("Max granularity exceeded. Returning closest approximation.") return pv.value()
[docs]def pval2score(matrix, pval, mem_thresh=2.0): """ Determine the score for a given p-value for a specific motif PWM. Args: matrix (tfm_utils Matrix): Matrix in pwm format. pval (float): p-value for which to determine the score. mem_thresh (float): Memory in GBs to remain free to system. Once passed, the closest p-val approximation will be returned instead of the exact p-val. Should only occur rarely with very long and degenerate motifs. Used to help ensure the system won't run out of memory due to these outliers. This is only calculated after each pass, each of which is more time and memory intensive than the last, so changing this value isn't recommended unless accuracy out to the 8th decimal place is really necessary. Returns: score (float): The calculated score corresponding to the p-value. """ if isinstance(matrix, pd.DataFrame): matrix = toPWM(matrix) matrix = df_to_matrix(matrix) mem_thresh = mem_thresh * 1000 * 1024 * 1024 init_granularity = 0.1 max_granularity = 1e-10 decrgr = 10 # Factor to increase granularity by after each iteration. pv = tfm.doublep() # Initialize as a c++ double. ppv = tfm.doublep() pv.assign(.001) ppv.assign(.001) matrix.computesIntegerMatrix(init_granularity) if np.isnan(matrix.errorMax): matrix.errorMax = .001 max_s = int(matrix.maxScore + ceil(matrix.errorMax + 0.5)) min_s = int(matrix.minScore) granularity = init_granularity while granularity > max_granularity: matrix.computesIntegerMatrix(granularity) if np.isnan(matrix.errorMax): matrix.errorMax = .001 score = matrix.lookForScore(min_s, max_s, pval, pv, ppv) min_s = int((score - ceil(matrix.errorMax + 0.5)) * decrgr) max_s = int((score + ceil(matrix.errorMax + 0.5)) * decrgr) if ppv.value() == pv.value(): break mem = psutil.virtual_memory() if mem.available <= mem_thresh: print("Memory usage threshold passed, returning closest score approximation.") break granularity = granularity / decrgr if granularity <= max_granularity: print("Max granularity exceeded. Returning closest score approximation.") final_score = (score - matrix.offset) / matrix.granularity return final_score