Python Multithread


airtime

Mitglied
Hi,

hat jemand Erfahrung mit python und multithreads? Ich nutze ein python script, das ich nicht selbst geschrieben habe (und der Entwickler antwortet nicht :( ).

Das Problem ist das ich multi cores als parameter übergeben kann,
- wenn ich mehrere cores angebe bricht das Programm unspezifisch ab,
- wenn ich keine cores angebe versucht das script automatisch alle cores einzubinden und das Programm bricht unspezifisch ab.
- wenn ich genau ein core angebe läuft das Programm komplett durch, ist aber sehr langsam.

Ich bin mir sehr sicher das es am multithreading von python liegen müsste, habe aber zu wenig Erfahrungen.

Gruß Air
 

saftmeister

Nutze den Saft!
Premium-User
Kannst du mit einem Debugger umgehen? Dann versuch Python (+Script) doch mal mit dem GDB oder etwas vergleichbaren zu debuggen. Um welches OS gehts eigentlich?
 

airtime

Mitglied
Sorry wieder mal zu wenig infos.

Also ich nutze das script auf Ubuntu 11.10. Das mit dem debuggen hat mich bisher noch nicht überzeugt, kann es aber mal suf eclipse probieren.

Desweiteren habe ich im code rum geschaut und das threading wird mit semaphore durch geführt.
Als ich dann den code geändert hab und mit bounded semaphore getestet hab kam ein error das zu viele threadings gestartet werden. Irgendwie muss ich die limitierung von den cores (also in meinem Beispiel 4) beim threading angeben können oder.

Ich hatte mir schon ein paar tutorials und examples zu threading angeschaut, aber ich hab den Dreh noch nicht raus. Falls jemand mir noch Tipps zum Threading geben kann, dann nur her damit.

Gruß Air
 

airtime

Mitglied
Hallo,
sorry das ich mich erst jetzt melde war einige tage im bett gelegen. Nun gut der Code ist hier. Ich weiß nicht genau welche Methoden wichtig sind, deshalb habe ich das gesamte script angegeben. Ich hoffe es ist noch übersichtlich:
Code:
import dircache;
import threading;
import subprocess;
import os;
from os import path;
import sys;
import pdb;
import time;
import cPickle;
import itertools;
import re;
from numpy import array, zeros, any, all;
from optparse import OptionParser;


annealing = True;
BASECALL_PATH = "./basecall";
SUFFSTAT_PATH = "./suffstat";
NAIVEBASECALL_PATH = "./naivebasecall";

DATA_FNAME = None; 
DATA_DIRECTORY = None; 
RESULT_FNAME = None;

ACGTMap = {'A':0, 'C':1, 'G':2, 'T':3};

def execCmd(cmd):
    finished = False;
    while not finished:
        try:
            proc = subprocess.Popen(cmd, bufsize=10240, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE);
            (stdout, stderr) = proc.communicate();
            finished = True;
        except:
            pass;
    #return itertools.chain(stdout, stderr);
    #print stderr;
    return stdout;
    

# basecall function
def BayesCall(fname, C, sigma, pPrephasing, pPhasing, pDeath, concentration_sigma, nPrephasing, nPhasing, alpha, beta, nBurnIn, nSample):
    cmd = "nice -n 20 " + BASECALL_PATH + ' ';
    if pPhasing!=None:
        cmd += "-pPhasing %s "%pPhasing;
    if pPrephasing!=None:
        cmd += "-pPrephasing %s "%pPrephasing;
    if pDeath!=None:
        cmd += "-pDeath %s "%pDeath;
    if C!=None:
        cmd += "-C %s "%C;
    if sigma!=None:
        cmd += "-sigma %s "%sigma;
    if alpha!=None:
        cmd += "-alpha %s "%alpha;
    if beta!=None:
        cmd += "-beta %s "%beta;
    if concentration_sigma!=None:
        cmd += "-concentration_sigma %s "%concentration_sigma;
    if nPrephasing!=None:
        cmd += "-nPrephasing %s "%nPrephasing;
    if nPhasing!=None:
        cmd += "-nPhasing %s "%nPhasing;
    if nBurnIn!=None:
        cmd += "-nBurnIn %s "%nBurnIn;
    if nSample!=None:
        cmd += "-nSample %s "%nSample;
    #cmd += "-nThin 5 ";
    if annealing:
        cmd += "-annealing 1 ";
    else:
        cmd += "-annealing 0 ";
        

#     ##############
#     # initialize sequence by Bustard sequence
#     fseq = open(fname + "seq.txt", "r");
#     (SolexaSeq, AnsSeq) = tuple(fseq.readline().split());
#     fseq.close();
#     cmd += "-initseq %s "%SolexaSeq;
#     ##############
        
    cmd += fname + "_int.mat";

    output = execCmd(cmd);
    
    ResultFileLock.acquire();
    os.remove(fname + "_int.mat");
#     sys.stdout.write("=== %s ===\n"%fname);
#     ResultFile.write("=== %s ===\n"%fname);
#     for line in output:
#         sys.stdout.write(line);
#         ResultFile.write(line);
#     sys.stdout.flush();
    output = output.split('\n');
    if len(output)>2 and len(output)==len(output[1].strip())+3:
        ResultFile.write("@_%s\n"%fname);
        ResultFile.write("%s\n"%output[1].strip());
        ResultFile.write("+_%s\n"%fname);
        for (base, scores) in itertools.izip(output[1].strip(), output[2:]):
            ResultFile.write("%s "%scores.split()[ACGTMap[base]]);
        ResultFile.write("\n");
    ResultFileLock.release();

    nCurThread.release();

def runBayesCall(DataFileNames, OutputFileNames, options):
    # parse parameters
    crosstalk_path, sigma_path, alpha, beta, pPrephasing, pPhasing, pDeath, concentration_sigma, nPrephasing, nPhasing, nBurnIn, nSample = parseParameter(options.param_fname);
    
    for DATA_FNAME, RESULT_FNAME in itertools.izip(DataFileNames, OutputFileNames):
        if options.data_directory is not None:
            DATA_DIRECTORY = options.data_directory;
        else:
            DATA_DIRECTORY = path.join(path.dirname(options.param_fname), "tmp/");

        # create data directory
        try:
            os.mkdir(DATA_DIRECTORY);
        except:
            pass;


        # Run through all data
        global ResultFile, ResultFileLock;
        ResultFile = open(RESULT_FNAME, "w");
        ResultFileLock = threading.Lock();

        global nCurThread;
        maxThread = options.ncpu;
        nCurThread = threading.Semaphore(maxThread);
        allThread = [];
        fdata = open(DATA_FNAME, "r");
        for line in fdata:
            # parse data
            token = line.split();
            coor = tuple([ int(c) for c in token[:4]]);
            token = token[4:];
            intensity = [ [ float(a), float(c), float(g), float(t) ] for (a,c,g,t) in itertools.izip(*[iter(token)]*4)];
            intensity = intensity[:options.seqLen];

            if len(intensity)<=0:
                continue;

            # run on sequence that has better intensity profile
            if any(all(array(intensity)<=0.1, 1)):
                continue;

            # save to a tmp file
            fname = "%d_%d_%d_%d"%coor;
            fout = open(DATA_DIRECTORY + fname + "_int.mat", "w");
            for i in intensity:
                fout.write("%f %f %f %f\n"%tuple(i));
            fout.close();    

            # run!
            nCurThread.acquire();
            t = threading.Thread(target = BayesCall, 
                                 kwargs = {'fname':DATA_DIRECTORY + fname, 
                                           'pPrephasing':pPrephasing, 
                                           'pPhasing':pPhasing, 
                                           'pDeath':pDeath,
                                           'nPrephasing':nPrephasing,
                                           'nPhasing':nPhasing,
                                           'C':crosstalk_path,
                                           'sigma':sigma_path,
                                           'alpha':alpha,
                                           'beta': beta,
                                           'concentration_sigma':concentration_sigma,
                                           'nBurnIn':nBurnIn,
                                           'nSample':nSample});
            allThread.append(t);
            t.start();

        # make sure that all threads are finished

        for t in allThread:
            t.join();
    
def naiveBayesCall(input_fname, output_fname, C, sigma, pPrephasing, pPhasing, pDeath, concentration_sigma, nPrephasing, nPhasing, alpha, beta):
    cmd = "nice -n 20 " + NAIVEBASECALL_PATH + ' ';
    if pPhasing!=None:
        cmd += "-pPhasing %s "%pPhasing;
    if pPrephasing!=None:
        cmd += "-pPrephasing %s "%pPrephasing;
    if pDeath!=None:
        cmd += "-pDeath %s "%pDeath;
    if C!=None:
        cmd += "-C %s "%C;
    if sigma!=None:
        cmd += "-sigma %s "%sigma;
    if alpha!=None:
        cmd += "-alpha %s "%alpha;
    if beta!=None:
        cmd += "-beta %s "%beta;
    if concentration_sigma!=None:
        cmd += "-concentration_sigma %s "%concentration_sigma;
    if nPrephasing!=None:
        cmd += "-nPrephasing %s "%nPrephasing;
    if nPhasing!=None:
        cmd += "-nPhasing %s "%nPhasing;
    if output_fname!=None:
        cmd += "-output %s "%output_fname;

    cmd += "-seqlen %s "%options.seqLen;
    cmd += input_fname;
    print cmd;
        
    output = execCmd(cmd);
    nCurThread.release();

def runNaiveBayesCall(DataFileNames, OutputFileNames, options):
    # parse parameters
    crosstalk_path, sigma_path, alpha, beta, pPrephasing, pPhasing, pDeath, concentration_sigma, nPrephasing, nPhasing, nBurnIn, nSample = parseParameter(options.param_fname);
    
    for DATA_FNAME, RESULT_FNAME in itertools.izip(DataFileNames, OutputFileNames):
        # Run through all data
        global nCurThread;
        maxThread = options.ncpu;
        nCurThread = threading.Semaphore(maxThread);
        allThread = [];
        # run!
        nCurThread.acquire();
        t = threading.Thread(target = naiveBayesCall, 
                             kwargs = {'input_fname': DATA_FNAME,
                                       'output_fname': RESULT_FNAME, 
                                       'pPrephasing':pPrephasing, 
                                       'pPhasing':pPhasing, 
                                       'pDeath':pDeath,
                                       'nPrephasing':nPrephasing,
                                       'nPhasing':nPhasing,
                                       'C':crosstalk_path,
                                       'sigma':sigma_path,
                                       'alpha':alpha,
                                       'beta': beta,
                                       'concentration_sigma':concentration_sigma});
        allThread.append(t);
        t.start();

    # make sure that all threads are finished
    for t in allThread:
        t.join();
    
    
# parse parameters
def parseParameter(fname):
    fparam = open(fname, 'r');
    base_dir = path.dirname(fname);
    for line in fparam.readlines():
        token = line.split();
        if token[0]=='nPrephasing':
            nPrephasing = int(token[1]);
        elif token[0]=='nPhasing':
            nPhasing = int(token[1]);
        elif token[0]=='pPrephasing':
            pPrephasing = float(token[1]);
        elif token[0]=='pPhasing':
            pPhasing = float(token[1]);
        elif token[0]=='pDeath':
            #pDeath = float(token[1]);
            pDeath = path.join(base_dir, token[1]);
        elif token[0]=='concentration_sigma':
            #concentration_sigma = float(token[1]);
            concentration_sigma = path.join(base_dir,token[1]);
        elif token[0]=='crosstalk_path':
            crosstalk_path = path.join(base_dir, token[1]);
        elif token[0]=='sigma_path':
            sigma_path = path.join(base_dir, token[1]);
        elif token[0]=='alpha':
            #alpha = float(token[1]);
            alpha = path.join(base_dir, token[1]);
        elif token[0]=='beta':
            #beta = float(token[1]);
            beta = path.join(base_dir, token[1]);
        elif token[0]=='nburnin':
            nBurnIn = int(token[1]);
        elif token[0]=='nsample':
            nSample = int(token[1]);
    fparam.close();

    return crosstalk_path, sigma_path, alpha, beta, pPrephasing, pPhasing, pDeath, concentration_sigma, nPrephasing, nPhasing, nBurnIn, nSample;

    
# Main function
if __name__ == "__main__":
    # parse commandline options
    parser = OptionParser(usage = "usage: %prog [options] <intensity_file_name>");
    parser.add_option("-o", "--output", action="store", dest="output_fname", type="string", help="basecall output file", default=None);
    parser.add_option("-p", "--param", action="store", dest="param_fname", type="string", help="parameter file", default="run.param");
    parser.add_option("--DD", "--tmp_dir", action="store", dest="data_directory", type="string", help="temporary data directory", default=None);
    parser.add_option("-l", "--seq_len", action="store", dest="seqLen", type="int", help="number of cycle", default=76);
    parser.add_option("-u", "--ncpu", action="store", dest="ncpu", type="int", help="number of processes used in training", default=10);
    parser.add_option("--naive", action="store_true", dest="naive", help="Use naive method (default)", default=True);
    parser.add_option("--orig", "--original", action="store_false", dest="naive", help="Use original method", default=True);    
    
    (options, args) = parser.parse_args();

    assert len(args)>=1;

    ST_TIME = time.time();

    # prepare output file names
    OutputFileNames = [];
    if options.output_fname is not None:
        OutputFileNames = options.output_fname.split(',');

    for id, input_fname in itertools.izip(itertools.count(), args):
        if len(OutputFileNames)<=id:
            OutputFileNames.append(re.match(r"(.*)\..*", input_fname).group(1)+".fastq");

    # Start basecall
    if options.naive:
        runNaiveBayesCall(args, OutputFileNames, options);
    else:
        runBayesCall(args, OutputFileNames, options);
            
    ED_TIME = time.time();
Gruß Air
 

airtime

Mitglied
Hi,

also ich bin mir ziemlich sicher das in dieser Methode das threading abläuft,
Code:
def runNaiveBayesCall(DataFileNames, OutputFileNames, options):
    # parse parameters
    crosstalk_path, sigma_path, alpha, beta, pPrephasing, pPhasing, pDeath, concentration_sigma, nPrephasing, nPhasing, nBurnIn, nSample = parseParameter(options.param_fname);
    
    for DATA_FNAME, RESULT_FNAME in itertools.izip(DataFileNames, OutputFileNames):
        # Run through all data
        global nCurThread;
        maxThread = options.ncpu;
        nCurThread = threading.Semaphore(maxThread);
        allThread = [];
        # run!
        nCurThread.acquire();
        t = threading.Thread(target = naiveBayesCall, 
                             kwargs = {'input_fname': DATA_FNAME,
                                       'output_fname': RESULT_FNAME, 
                                       'pPrephasing':pPrephasing, 
                                       'pPhasing':pPhasing, 
                                       'pDeath':pDeath,
                                       'nPrephasing':nPrephasing,
                                       'nPhasing':nPhasing,
                                       'C':crosstalk_path,
                                       'sigma':sigma_path,
                                       'alpha':alpha,
                                       'beta': beta,
                                       'concentration_sigma':concentration_sigma});
        allThread.append(t);
        t.start();
 
    # make sure that all threads are finished
    for t in allThread:
        t.join();
aber das mit dem maxThread funktioniert nicht. Da ich 4 angebe aber sobald mehrere dateien mit übergeben werden läuft alles weiter und es werden nicht die ersten vier bearbeitet und so weiter.

Besonders die Zeile 9 sollte doch festlegen das nur eine bestimmte Anzahl an thread genutzt wird?

Gruß Air
 

deepthroat

Erfahrenes Mitglied
Hi.

Was heißt denn "es bricht unspezifisch ab"? Exceptions? Welche? Exit code?

Was meinst du mit "Da ich 4 angebe aber sobald mehrere dateien mit übergeben werden läuft alles weiter und es werden nicht die ersten vier bearbeitet und so weiter"?

Werden mehr Threads gestartet als angegeben?

Was passiert im Debugger? Backtrace?

Welches Betriebssystem? Welche Python Version? Python Version die zum Testen des Skripts (beim Entwickler) verwendet wurde?

Gruß
 
Zuletzt bearbeitet:

airtime

Mitglied
Hi,

also ich übergebe per command line die input files. Sobald die Dateien eingelesen werden, wird auf der console der gesamte befehl mit der einzelenen Datei angegeben.

dh.:
Code:
befehl -cores 4 -input file_1 file_2 file_3 file_4
Angabe:
Code:
befehl file_1
befehl file_2
befehl file_3
befehl file_4
wenn ich aber mehr Dateien als angegebene cores übergebe, also:
Code:
befehl -cores 4 -input file_ file_2 file_3 file_4 file_5 file_6 file_7 file_8 file_9 file_10
dann ist die Ausgabe
Code:
befehl file_1
befehl file_2
befehl file_3
befehl file_4
befehl file_5
befehl file_6
befehl file_7
befehl file_8
befehl file_9
befehl file_10
und es werden nur die letzten 4 Dateien bearbeitet (es hat den Anschein das nicht gewartet wird bis die ersten 4 threads fertig sind).
Der Entwickle4r hat Python 2.7 genutzt, aber es sollte bei folgenden Versionen gehen. Ich hab sowohl 2.7 als auch 3.0 probiert (genau das Gleiche). Das Betriebssystem ist Ubuntu 11.
Zu dem unspezifischen Abbrechen: Es gibt ein script zur Vorberechnung bestimmter parameter, dabei wird auch das threading genutzt. Wenn ich nur ein core nutzte läuft alles perfekt (benötigt aber über 50h). Sobald ich meherere threads nutzte läuft das programm, aber nach einer unspezifischen Zeit hängt es sich auf. Die threads schlafen und warten auf etwas (keine Ahnung was). Aber das ist nicht so wichtig die Vorberechnung ist schon erledigt.

Gruß Air
 

deepthroat

Erfahrenes Mitglied
Hi.

In der Schleife wird jedesmal eine neue Semaphore erzeugt, die natürlich nie voll ist, d.h. es wird immer ein Thread kreiert und nie blockiert.

Die Initialisierung muss vor die Schleife.
Python:
def runXCall(...):
  global global nCurThread; # wozu global?
  maxThread = options.ncpu;
  nCurThread = threading.Semaphore(maxThread); # überschreibt die globale Variable, könnte auch ein Fehler sein.
  allThread = [];

  for ...:
        nCurThread.acquire();
        t = threading.Thread(...);
        allThread.append(t);
        t.start();
Gruß
 

airtime

Mitglied
Hi,

vielen Dank für deine Hilfe. Ich werde es testen sobald ich wieder freie Laufzeit auf dem Rechner habe.
Melde mich dann, ob das Problem gelöst ist.

Gruß Air