Threading und Multiprocessing von Modulen in Python

Ich versuche, mehrere Dateien zu erstellen, die mit einem eigenständigen Programm als Teil einer Hochdurchsatzanalyse analysiert werden, die in Python geschrieben wurde.

for foo in X: write foo_file os.system(run_program foo_file) 

Für 15.000 verschiedene Einzeldateien läuft das schneller, wenn ich sie auf mehrere Kerne ausführen kann, aber ich möchte nicht meinen Server sumfen. Wie richte ich mehrere Threads ein, um in os laufen zu lassen, aber ein Maximum auf die Anzahl der Threads, die sofort geöffnet sind? Ich mache mir keine Sorgen um die Geschwindigkeit der Laichprozesse, da die Laufzeit durch einen externen Programmstandard in meinem Bereich definiert ist.

Ich habe mir die Dokumentation zum Threading und Multiprocessing angesehen und mich überwältigt.

  • Was ist Ruby's Äquivalent zu Pythons Multiprocessing-Modul?
  • Tasks parallel in Python ausführen
  • Lösen von Formeln parallel zu z3
  • Effizientes Multiprocessing von massiven, Brute-Kraft-Maximierung in Python 3
  • Wie man Python benutzt, um Datenbank parallel zu verfassen
  • Vermeidung von Rennbedingungen in Python 3 Multiprocessing Queues
  • Python-Threading unerwartet langsamer
  • So führen Sie parallele Instanzen einer Luigi-Pipeline aus: Pid-Set läuft bereits
  • 2 Solutions collect form web for “Threading und Multiprocessing von Modulen in Python”

    Eine einfache Möglichkeit, die Gesamtzahl der erzeugten Prozesse zu begrenzen, besteht darin, einen Multiprocessing-Pool zu verwenden .

    Ein einfaches Beispiel, das einen Multiprocessing-Pool demonstriert, ist:

    Test.py

     from multiprocessing.pool import Pool # @NOTE: The two imports below are for demo purposes and won't be necessary in # your final program import random import time def writeOut(index): """ A function which prints a start message, delays for a random interval and then prints a finish message """ delay = random.randint(1,5) print("Starting process #{0}".format(index)) time.sleep(delay) print("Finished process #{0} which delayed for {1}s.".format(index, delay)) # Create a process pool with a maximum of 10 worker processes pool = Pool(processes=10) # Map our function to a data set - number 1 through 20 pool.map(writeOut, range(20)) 

    Was sollte Ihnen eine Ausgabe ähnlich wie:

     [mike@tester ~]$ python test.py Starting process #0 Starting process #2 Starting process #3 Starting process #1 Starting process #4 Starting process #5 Starting process #6 Starting process #7 Starting process #8 Starting process #9 Finished process #2 which delayed for 1s. Starting process #10 Finished process #7 which delayed for 1s. Finished process #6 which delayed for 1s. Starting process #11 Starting process #12 Finished process #9 which delayed for 2s. Finished process #12 which delayed for 1s. Starting process #13 Starting process #14 Finished process #1 which delayed for 3s. Finished process #5 which delayed for 3s. Starting process #15 Starting process #16 Finished process #8 which delayed for 3s. Starting process #17 Finished process #4 which delayed for 4s. Starting process #18 Finished process #10 which delayed for 3s. Finished process #13 which delayed for 2s. Starting process #19 Finished process #0 which delayed for 5s. Finished process #3 which delayed for 5s. Finished process #11 which delayed for 4s. Finished process #15 which delayed for 2s. Finished process #16 which delayed for 2s. Finished process #18 which delayed for 2s. Finished process #14 which delayed for 4s. Finished process #17 which delayed for 5s. Finished process #19 which delayed for 5s. 

    Wie Sie sehen können, starten die ersten zehn Prozesse und dann beginnt jeder nachfolgende Prozess nur sobald ein weiterer Prozess-Pool-Worker fertig ist (wird verfügbar). Die Verwendung mehrerer Prozesse (im Gegensatz zu mehreren Threads) umgeht die globale Interpretersperre (GIL) .

    Um diesen Beispielcode zu erhalten, um mit deiner Aufgabe zu arbeiten, musst du eine Dateiausgabefunktion schreiben und sie übergeben und die pool.map() von writeOut an pool.map() anstelle von writeOut und range(20) schreiben.

    Versuche dies:

     class ThreadWriteFile(threading.Thread): def __init__(self, queue_to_write, queue_to_run): threading.Thread.__init__(self) self.queue_to_write = queue_to_write self.queue_to_run = queue_to_run def run(self): while True: foo_file = self.queue_to_write.get() write foo_file self.queue_to_run.put(foo_file) self.queue_to_write.task_done() class ThreadRunProgram(threading.Thread): def __init__(self, queue_to_run): threading.Thread.__init__(self) self.queue_to_run = queue_to_run def run(self): while True: foo_file = self.queue_to_run.get() os.system(run_program foo_file) self.queue_to_run.task_done() queue_to_write = Queue.Queue() queue_to_run = Queue.Queue() for foo in X: twf = ThreadWriteFile(queue_to_write, queue_to_run) twf.daemon = True twf.start() queue_to_write.put(foo) trf = ThreadRunProgram(queue_to_run) trf.daemon = True trf.start() queue_to_write.join() queue_to_run.join() 
    Python ist die beste Programmiersprache der Welt.