Source code for genrisk.pipeline

# -*- coding: utf-8 -*-
import os
import subprocess
from datetime import datetime

import numpy as np
import pandas as pd
from statsmodels.stats.multitest import multipletests

from .association_analysis import run_mannwhitneyu, run_ttest, get_pvals_logit, get_pvals_linear
from .gene_scoring import get_gene_info, plink_process, combine_scores
from .prediction_models import regression_model, classification_model, test_classifier, test_regressor
from .utils import gene_length_normalize

PATH = os.path.abspath(os.path.join((__file__), os.pardir, os.pardir, os.pardir))
BETAREG_SHELL = os.path.join(PATH, 'scripts', 'betareg_shell.R')
association_functions = {
    'mannwhitneyu': run_mannwhitneyu,
    'ttest_ind': run_ttest,
    'logit': get_pvals_logit,
    'linear': get_pvals_linear,
}


[docs] def scoring_process( *, logger, annotation_file, temp_dir, beta_param, weight_func, del_col, maf_threshold, gene_col, variant_col, af_col, alt_col, bfiles, plink, output_file, vcf ): """ Calculate gene-based scores. This is calculated by a weighted sum of the variants in a gene. Parameters ---------- logger an object that logs function outputs. annotation_file : str an annotation file containing variant IDs, alt, and another info. temp_dir : str a temporary directory to save temporary files before merging. beta_param : tuple the parameters from beta weight function. ignore if log10 function is chosen. weight_func : str the weighting function used in score calculation. del_col : str the column containing deleteriousness score or functional annotation. maf_threshold : float the threshold for minor allele frequency. between [0.0-1.0] gene_col : str the column containing gene names. variant_col : str the column containing variant IDs. af_col : str the column containing allele frequency. alt_col : str the column containing alternate allele. bfiles : str the binary files for plink process. if a vcf is provided no binary files are needed. plink : str the directory of plink, if not set in environment output_file : str the path to save the final output scores matrix. vcf : str the vcf file for plink process. if binary files are provided no vcf is needed. Returns ------- DataFrame final scores matrix """ try: logger.info('getting information from annotation file') genes_folder = get_gene_info(annotation_file=annotation_file, variant_col=variant_col, af_col=af_col, alt_col=alt_col, del_col=del_col, output_dir=temp_dir, genes_col=gene_col, maf_threshold=maf_threshold, beta_param=beta_param, weight_func=weight_func) except Exception as arg: logger.exception(arg) raise logger.info('calculating gene scores ...') try: plink_process(genes_folder=genes_folder, plink=plink, bfiles=bfiles, vcf=vcf) except Exception as arg: logger.exception(arg) raise logger.info('combining score files ...') try: if 'plink2' in plink: plink_extension = 'sscore' else: plink_extension = 'profile' df = combine_scores(input_path=temp_dir, output_path=output_file, plink_extension=plink_extension) except Exception as arg: logger.exception(arg) raise logger.info('process is complete.') return df
[docs] def find_pvalue( *, scores_file, info_file, genes=None, phenotype, samples_column, test='mannwhitneyu', covariates=None, cases=None, controls=None, processes=1, logger, adj_pval=None, ): """ Calculate the significance of a gene in a population using different statistical analyses [mannwhitneyu, logit, linear, ttest_ind]. Parameters ---------- adj_pval : str the method used to adjust the p-values. scores_file : str dataframe containing the scores of genes across samples. info_file : str a file containing the information of the sample. this includes target phenotype and covariates. genes : list a list of the genes to calculate the significance. if None will calculate for all genes. phenotype : str the name of the column containing phenotype information. samples_column : str the name of the column contining samples IDs. test : str the type of statistical test to use, choices are: ttest_ind, mannwhitenyu, linear, logit. covariates : str the list of covariates used in the calculation. cases : str the cases category. if binary phenotype. controls : str the controls category. if binary phenotype. processes : int number of processes used, for parallel computing. logger an object that logs function outputs. Returns ------- DataFrame dataframe with genes and their p_values """ logger.info("Reading scores file...") if genes: scores_df = pd.read_csv(scores_file, sep='\t', usecols=lambda x: x in genes+[samples_column], on_bad_lines='warn') else: scores_df = pd.read_csv(scores_file, sep='\t', on_bad_lines='warn') scores_df.set_index(samples_column, inplace=True) scores_df.replace([np.inf, -np.inf], 0, inplace=True) scores_df.fillna(0, inplace=True) scores_df = scores_df.loc[:, scores_df.var() != 0.0].reset_index() logger.info("Reading info file...") phenotypes_col = phenotype.split(',') if covariates: covariates = covariates.split(',') genotype_df = pd.read_csv(info_file, sep='\t', usecols=covariates+phenotypes_col+[samples_column], on_bad_lines='warn') else: genotype_df = pd.read_csv(info_file, sep='\t', on_bad_lines='warn') logger.info("Processing files...") merged_df = pd.merge(scores_df, genotype_df, how='inner', on=samples_column) merged_df.replace([np.inf, -np.inf], 0.0, inplace=True) genes = scores_df.columns.tolist()[1:] del scores_df args = { 'processes': processes, 'cases': cases, 'controls': controls, 'covariates': covariates, 'logger': logger, } for pheno in phenotypes_col: logger.info("Calculating p_values using the following test: " + test + ' for the phenotype: ' + pheno) pheno_df = merged_df.dropna(subset=[pheno]) try: p_values_df = association_functions.get(test)(df=pheno_df, genes=genes, cases_column=pheno, **args) p_values_df.dropna(subset=['p_value'], inplace=True) p_values_df['p_value'].replace(0.0, 7.1429666685167604e-293, inplace=True) p_values_df['p_value'].replace(0, 7.1429666685167604e-293, inplace=True) if adj_pval: logger.info("Calculating the adjusted p_values...") adjusted = multipletests(list(p_values_df['p_value']), method=adj_pval) p_values_df[adj_pval + '_adj_pval'] = list(adjusted)[1] now = datetime.now().strftime("%Y%m%d") cohort = scores_file.split(r"/")[-1].split(".")[0] output = now + "_" + test + "_" + cohort + "_" + pheno + ".tsv" p_values_df.to_csv(output, sep='\t', index=False) except Exception as arg: logger.exception(arg) raise return
[docs] def betareg_pvalues( *, scores_file, pheno_file, samples_col, cases_col, output_path, covariates, processes, genes, logger, ): """ Calculate association significance using betareg. This function runs in Rscript. Parameters ---------- scores_file : str the path to the scores file. pheno_file : str the path to the phenotypes and covariates file. samples_col : str column containing samples ids. cases_col : str column containing the phenotype. output_path : str a path to save the summary statistics. covariates : str the list of covariates used in the calculation. processes : int number of processes used, for parallel computing. genes : str a list of the genes to calculate the significance. if None will calculate for all genes. logger an object that logs function outputs. Returns ------- """ logger.info("The p_values will be calculated using beta regression.") if not genes: genes = "" try: p = subprocess.call( ["Rscript", BETAREG_SHELL, "-s", scores_file, "--phenofile", pheno_file, "--samplescol", samples_col, "--casescol", cases_col, "-o", output_path, "--covariates", covariates, "--processes", str(processes), "--genes", genes] ) except Exception as arg: logger.exception(arg) raise logger.info("Process is complete. The association analysis results have been saved.")
[docs] def create_prediction_model( *, model_name='final_model', model_type='regressor', y_col, imbalanced=True, normalize=True, folds=10, training_set, testing_set=pd.DataFrame(), metric=None, seed, include_models, normalize_method, feature_selection ): """ Create a prediction model (classifier or regressor) using the provided dataset. Parameters ---------- model_name : str the name of the prediction model. model_type : str type of model [regressor|classifier] y_col : str the column containing the target (qualitative or quantitative). imbalanced : bool True means data is imbalanced. normalize : bool True if data needs normalization. folds : int how many folds for cross-validation. training_set : pd.DataFrame the training set for the model. testing_set : pd.DataFrame if exists an extra evaluation step will be done using the testing set. test_size : float the size to split the training set for cross-validation. metric : str the metric to evaluate the best model. seed : int the intilization state random number include_models : list a list of models that the user wants to test. if None all models will be used. normalize_method : str the method to normalize the data. Choices [zscore, minmax, maxabs, robust] Returns ------- Final model """ try: model_func = {'classifier': classification_model, 'regressor': regression_model} final_model = model_func.get(model_type)( y_col=y_col, training_set=training_set, normalize=normalize, folds=folds, metric=metric, model_name=model_name, testing_set=testing_set, imbalanced=imbalanced, seed=seed, include_models=include_models, normalize_method=normalize_method, feature_selection=feature_selection ) except Exception: raise Exception('Model requested is not available. Please choose regressor or classifier.') return final_model
def model_testing( *, model_path, input_file, samples_col, label_col, model_type, output ): """ Load a prediction model and use it to predict label values in an independent dataset. Parameters ---------- model_path : str the path to saved model. input_file : str the file with the test dataset. samples_col : str the name of samples column. label_col : str the name of the target column model_type : regressor or classifier Returns ------- DataFrame the results of predictions. """ testing_df = pd.read_csv(input_file, sep='\t', index_col=samples_col, on_bad_lines='warn') testing_df = testing_df.dropna(subset=[label_col]) testing_df.replace([np.inf, -np.inf, np.nan], 0.0, inplace=True) x_set = testing_df.drop(columns=label_col) model_func = {'classifier': test_classifier, 'regressor': test_regressor} unseen_predictions = model_func.get(model_type)( y_col=testing_df[label_col], output=output, model_path=model_path, x_set=x_set ) return unseen_predictions
[docs] def normalize_data( *, method='gene_length', genes_info=None, genes_col='HGNC symbol', length_col='gene_length', data_file, samples_col, ): """ Normalize dataset using gene_length, minmax, maxabs, zscore or robust Parameters ---------- method : str the normalization method. [zscore, gene_length, minmax, maxabs, robust] genes_info : str file containing the genes and their lengths. if gene_length method chosen with no file, info will be retrieved from ensembl database. genes_col : str the column containing genes (if genes_info file is provided) length_col : str the columns containing genes length (if genes_info file is provided) data_file : str file containing dataset to be normalized. samples_col : str the column containing samples ids. Returns ------- DataFrame a df with the normalized dataset. """ scores_df = pd.read_csv(data_file, sep=r'\s+', on_bad_lines='warn') if method == 'gene_length': scores_df = gene_length_normalize( genes_info=genes_info, genes_col=genes_col, length_col=length_col, scores_df=scores_df, samples_col=samples_col ) elif method == 'maxabs': for col in scores_df.columns: if col == samples_col: continue scores_df[col] = scores_df[col]/scores_df[col].abs().max() elif method == 'minmax': for col in scores_df.columns: if col == samples_col: continue scores_df[col] = (scores_df[col] - scores_df[col].min()) / (scores_df[col].max() - scores_df[col].min()) elif method == 'zscore': scores_df.std(ddof=0) for col in scores_df.columns: if col == samples_col: continue scores_df[col] = (scores_df[col] - scores_df[col].mean()) / scores_df[col].std() elif method == 'robust': for col in scores_df.columns: if col == samples_col: continue scores_df[col] = (scores_df[col] - scores_df[col].median()) / (scores_df[col].quantile(0.75) - scores_df[col].quantile(0.25)) else: raise Exception( 'This function does not support the normalization method you selected. Methods: [zscore, gene_length, ' 'minmax, maxabs, robust] ' ) scores_df.replace([np.inf, -np.inf], 0, inplace=True) return scores_df