import dircache;
import threading;
import subprocess;
import os;
from os import path;
import sys;
import pdb;
import time;
import cPickle;
import itertools;
import re;
from numpy import array, zeros, any, all;
from optparse import OptionParser;
annealing = True;
BASECALL_PATH = "./basecall";
SUFFSTAT_PATH = "./suffstat";
NAIVEBASECALL_PATH = "./naivebasecall";
DATA_FNAME = None;
DATA_DIRECTORY = None;
RESULT_FNAME = None;
ACGTMap = {'A':0, 'C':1, 'G':2, 'T':3};
def execCmd(cmd):
finished = False;
while not finished:
try:
proc = subprocess.Popen(cmd, bufsize=10240, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE);
(stdout, stderr) = proc.communicate();
finished = True;
except:
pass;
#return itertools.chain(stdout, stderr);
#print stderr;
return stdout;
# basecall function
def BayesCall(fname, C, sigma, pPrephasing, pPhasing, pDeath, concentration_sigma, nPrephasing, nPhasing, alpha, beta, nBurnIn, nSample):
cmd = "nice -n 20 " + BASECALL_PATH + ' ';
if pPhasing!=None:
cmd += "-pPhasing %s "%pPhasing;
if pPrephasing!=None:
cmd += "-pPrephasing %s "%pPrephasing;
if pDeath!=None:
cmd += "-pDeath %s "%pDeath;
if C!=None:
cmd += "-C %s "%C;
if sigma!=None:
cmd += "-sigma %s "%sigma;
if alpha!=None:
cmd += "-alpha %s "%alpha;
if beta!=None:
cmd += "-beta %s "%beta;
if concentration_sigma!=None:
cmd += "-concentration_sigma %s "%concentration_sigma;
if nPrephasing!=None:
cmd += "-nPrephasing %s "%nPrephasing;
if nPhasing!=None:
cmd += "-nPhasing %s "%nPhasing;
if nBurnIn!=None:
cmd += "-nBurnIn %s "%nBurnIn;
if nSample!=None:
cmd += "-nSample %s "%nSample;
#cmd += "-nThin 5 ";
if annealing:
cmd += "-annealing 1 ";
else:
cmd += "-annealing 0 ";
# ##############
# # initialize sequence by Bustard sequence
# fseq = open(fname + "seq.txt", "r");
# (SolexaSeq, AnsSeq) = tuple(fseq.readline().split());
# fseq.close();
# cmd += "-initseq %s "%SolexaSeq;
# ##############
cmd += fname + "_int.mat";
output = execCmd(cmd);
ResultFileLock.acquire();
os.remove(fname + "_int.mat");
# sys.stdout.write("=== %s ===\n"%fname);
# ResultFile.write("=== %s ===\n"%fname);
# for line in output:
# sys.stdout.write(line);
# ResultFile.write(line);
# sys.stdout.flush();
output = output.split('\n');
if len(output)>2 and len(output)==len(output[1].strip())+3:
ResultFile.write("@_%s\n"%fname);
ResultFile.write("%s\n"%output[1].strip());
ResultFile.write("+_%s\n"%fname);
for (base, scores) in itertools.izip(output[1].strip(), output[2:]):
ResultFile.write("%s "%scores.split()[ACGTMap[base]]);
ResultFile.write("\n");
ResultFileLock.release();
nCurThread.release();
def runBayesCall(DataFileNames, OutputFileNames, options):
# parse parameters
crosstalk_path, sigma_path, alpha, beta, pPrephasing, pPhasing, pDeath, concentration_sigma, nPrephasing, nPhasing, nBurnIn, nSample = parseParameter(options.param_fname);
for DATA_FNAME, RESULT_FNAME in itertools.izip(DataFileNames, OutputFileNames):
if options.data_directory is not None:
DATA_DIRECTORY = options.data_directory;
else:
DATA_DIRECTORY = path.join(path.dirname(options.param_fname), "tmp/");
# create data directory
try:
os.mkdir(DATA_DIRECTORY);
except:
pass;
# Run through all data
global ResultFile, ResultFileLock;
ResultFile = open(RESULT_FNAME, "w");
ResultFileLock = threading.Lock();
global nCurThread;
maxThread = options.ncpu;
nCurThread = threading.Semaphore(maxThread);
allThread = [];
fdata = open(DATA_FNAME, "r");
for line in fdata:
# parse data
token = line.split();
coor = tuple([ int(c) for c in token[:4]]);
token = token[4:];
intensity = [ [ float(a), float(c), float(g), float(t) ] for (a,c,g,t) in itertools.izip(*[iter(token)]*4)];
intensity = intensity[:options.seqLen];
if len(intensity)<=0:
continue;
# run on sequence that has better intensity profile
if any(all(array(intensity)<=0.1, 1)):
continue;
# save to a tmp file
fname = "%d_%d_%d_%d"%coor;
fout = open(DATA_DIRECTORY + fname + "_int.mat", "w");
for i in intensity:
fout.write("%f %f %f %f\n"%tuple(i));
fout.close();
# run!
nCurThread.acquire();
t = threading.Thread(target = BayesCall,
kwargs = {'fname':DATA_DIRECTORY + fname,
'pPrephasing':pPrephasing,
'pPhasing':pPhasing,
'pDeath':pDeath,
'nPrephasing':nPrephasing,
'nPhasing':nPhasing,
'C':crosstalk_path,
'sigma':sigma_path,
'alpha':alpha,
'beta': beta,
'concentration_sigma':concentration_sigma,
'nBurnIn':nBurnIn,
'nSample':nSample});
allThread.append(t);
t.start();
# make sure that all threads are finished
for t in allThread:
t.join();
def naiveBayesCall(input_fname, output_fname, C, sigma, pPrephasing, pPhasing, pDeath, concentration_sigma, nPrephasing, nPhasing, alpha, beta):
cmd = "nice -n 20 " + NAIVEBASECALL_PATH + ' ';
if pPhasing!=None:
cmd += "-pPhasing %s "%pPhasing;
if pPrephasing!=None:
cmd += "-pPrephasing %s "%pPrephasing;
if pDeath!=None:
cmd += "-pDeath %s "%pDeath;
if C!=None:
cmd += "-C %s "%C;
if sigma!=None:
cmd += "-sigma %s "%sigma;
if alpha!=None:
cmd += "-alpha %s "%alpha;
if beta!=None:
cmd += "-beta %s "%beta;
if concentration_sigma!=None:
cmd += "-concentration_sigma %s "%concentration_sigma;
if nPrephasing!=None:
cmd += "-nPrephasing %s "%nPrephasing;
if nPhasing!=None:
cmd += "-nPhasing %s "%nPhasing;
if output_fname!=None:
cmd += "-output %s "%output_fname;
cmd += "-seqlen %s "%options.seqLen;
cmd += input_fname;
print cmd;
output = execCmd(cmd);
nCurThread.release();
def runNaiveBayesCall(DataFileNames, OutputFileNames, options):
# parse parameters
crosstalk_path, sigma_path, alpha, beta, pPrephasing, pPhasing, pDeath, concentration_sigma, nPrephasing, nPhasing, nBurnIn, nSample = parseParameter(options.param_fname);
for DATA_FNAME, RESULT_FNAME in itertools.izip(DataFileNames, OutputFileNames):
# Run through all data
global nCurThread;
maxThread = options.ncpu;
nCurThread = threading.Semaphore(maxThread);
allThread = [];
# run!
nCurThread.acquire();
t = threading.Thread(target = naiveBayesCall,
kwargs = {'input_fname': DATA_FNAME,
'output_fname': RESULT_FNAME,
'pPrephasing':pPrephasing,
'pPhasing':pPhasing,
'pDeath':pDeath,
'nPrephasing':nPrephasing,
'nPhasing':nPhasing,
'C':crosstalk_path,
'sigma':sigma_path,
'alpha':alpha,
'beta': beta,
'concentration_sigma':concentration_sigma});
allThread.append(t);
t.start();
# make sure that all threads are finished
for t in allThread:
t.join();
# parse parameters
def parseParameter(fname):
fparam = open(fname, 'r');
base_dir = path.dirname(fname);
for line in fparam.readlines():
token = line.split();
if token[0]=='nPrephasing':
nPrephasing = int(token[1]);
elif token[0]=='nPhasing':
nPhasing = int(token[1]);
elif token[0]=='pPrephasing':
pPrephasing = float(token[1]);
elif token[0]=='pPhasing':
pPhasing = float(token[1]);
elif token[0]=='pDeath':
#pDeath = float(token[1]);
pDeath = path.join(base_dir, token[1]);
elif token[0]=='concentration_sigma':
#concentration_sigma = float(token[1]);
concentration_sigma = path.join(base_dir,token[1]);
elif token[0]=='crosstalk_path':
crosstalk_path = path.join(base_dir, token[1]);
elif token[0]=='sigma_path':
sigma_path = path.join(base_dir, token[1]);
elif token[0]=='alpha':
#alpha = float(token[1]);
alpha = path.join(base_dir, token[1]);
elif token[0]=='beta':
#beta = float(token[1]);
beta = path.join(base_dir, token[1]);
elif token[0]=='nburnin':
nBurnIn = int(token[1]);
elif token[0]=='nsample':
nSample = int(token[1]);
fparam.close();
return crosstalk_path, sigma_path, alpha, beta, pPrephasing, pPhasing, pDeath, concentration_sigma, nPrephasing, nPhasing, nBurnIn, nSample;
# Main function
if __name__ == "__main__":
# parse commandline options
parser = OptionParser(usage = "usage: %prog [options] <intensity_file_name>");
parser.add_option("-o", "--output", action="store", dest="output_fname", type="string", help="basecall output file", default=None);
parser.add_option("-p", "--param", action="store", dest="param_fname", type="string", help="parameter file", default="run.param");
parser.add_option("--DD", "--tmp_dir", action="store", dest="data_directory", type="string", help="temporary data directory", default=None);
parser.add_option("-l", "--seq_len", action="store", dest="seqLen", type="int", help="number of cycle", default=76);
parser.add_option("-u", "--ncpu", action="store", dest="ncpu", type="int", help="number of processes used in training", default=10);
parser.add_option("--naive", action="store_true", dest="naive", help="Use naive method (default)", default=True);
parser.add_option("--orig", "--original", action="store_false", dest="naive", help="Use original method", default=True);
(options, args) = parser.parse_args();
assert len(args)>=1;
ST_TIME = time.time();
# prepare output file names
OutputFileNames = [];
if options.output_fname is not None:
OutputFileNames = options.output_fname.split(',');
for id, input_fname in itertools.izip(itertools.count(), args):
if len(OutputFileNames)<=id:
OutputFileNames.append(re.match(r"(.*)\..*", input_fname).group(1)+".fastq");
# Start basecall
if options.naive:
runNaiveBayesCall(args, OutputFileNames, options);
else:
runBayesCall(args, OutputFileNames, options);
ED_TIME = time.time();