|
|||
require 'net/http' pages = %w( www.rubycentral.com www.awl.com www.pragmaticprogrammer.com ) threads = [] for page in pages threads << Thread.new(page) { |myPage| h = Net::HTTP.new(myPage, 80) puts "Fetching: #{myPage}" resp, data = h.get('/', nil ) puts "Got #{myPage}: #{resp.message}" } end threads.each { |aThread| aThread.join } |
Fetching: www.rubycentral.com Fetching: www.awl.com Fetching: www.pragmaticprogrammer.com Got www.rubycentral.com: OK Got www.pragmaticprogrammer.com: OK Got www.awl.com: OK |
Thread.new
-Aufruf erzeugt. Der bekommt einen Block mit, der den Code enthält, der in diesem neuen Thread laufen soll. In unserem Fall benutzt der Block die net/http
-Bibliothek, um sich die erste Seite jeder gewünschten Site zu holen. Das Tracing zeigt ganz klar, dass dieses Holen parallel abläuft.
Wenn wir den Thread erzeugen, übergeben wir die gewünschte HTML-Seite als Parameter. Dieser Parameter wird dem Block als myPage
übergeben. Warum machen wir das so, warum benutzen wir nicht einfach den Wert der Variablen page
innerhalb des Blocks?
Threads teilen alle globalen, Instanz- und lokalen Variablen, die zum Zeitpunkt ihrer Entstehung existierten. Wer einen kleinen Bruder hat, weiß, dass Teilen nicht immer so schön ist. In diesem Fall würden sich alle drei Threads die Variable page
teilen. Der erste Thread wird gestartet und page
wird auf http://www.rubycentral.com gesetzt. Währenddessen läuft die Schleife, die die Threads erzeugt, weiter. Beim zweiten Durchgang wird page
auf http://www.awl.com gesetzt. Wenn der erste Thread noch nicht fertig ist mit der Variablen page
, dann wird er jetzt deren neuen Wert benutzen. Solche Fehler sind nur sehr schwer nachzuvollziehen.
Allerdings sind lokale Variablen, die innerhalb eines Block erzeugt werden, auch wirklich lokal für diesen Thread --- jeder Thread arbeitet mit einer eigenen Kopie dieser Variablen. In unserem Fall wird die Variable myPage
gesetzt, wenn der Thread erzeugt wird, und jeder Thread besitzt seine eigene Kopie der Web-Adresse.
join
für jeden erzeugten Thread auf?
Wenn ein Ruby-Programm beendet wird, werden alle noch laufenden Threads abgeschossen, egal wie ihr Status ist. Man kann allerdings auf das Beenden eines speziellen Thread warten, wenn man die Thread#join
-Methode dieses Threads aufruft. Der aufrufende Thread wird so lange blockiert, bis der gewünschte Thread beendet ist. Durch den Aufruf von join
für jeden der gewünschten Threads geht man sicher, dass alle drei Threads beendet werden, bevor man das Hauptprogramm beendet.
Darüber hinaus gibt es noch ein paar andere nützliche Routinen, mit denen man Threads manipulieren kann. Zunächst einmal kann man den aktuellen Thread ermitteln mit Thread.current
. Man erhält eine Liste mit allen Threads mit Thread.list
, das liefert eine Liste von allen Thread
-Objekten, die lauffähig oder gerade gestoppt sind. Um den Status eines bestimmten Threads zu ermitteln, kann man Thread#status
und
Thread#alive?
benutzen.
Außerdem kann man die Priorität eines Thread mit
Thread#priority=
setzen. Threads mit höherer Priorität laufen vor solchen mit niedrigerer. Wir werden gleich über die Laufzeitaufteilung von Threads sprechen und darüber, wie man sie startet und stoppt.
Thread
besitzt dafür eine Eigenschaft, mit der man thread-eigene Variablen erzeugen und über den Namen ansprechen kann. Man behandelt das Thread-Objekt einfach, als wäre es ein
Hash
, und schreib in Elemente mit []=
und liest sie mit []
. In diesem Beipiel hält jeder Thread den aktuellen Wert der Variable count
in einer thread-lokalen Variablen unter dem Schlüssel mycount
. (Es gibt da ein kleines Problem mit Zugriffskollisionen in diesem Code, aber bis jetzt haben wir noch nicht über Synchronisation gesprochen, also ignorieren wir das erstmal.)
count = 0 arr = [] 10.times do |i| arr[i] = Thread.new { sleep(rand(0)/10.0) Thread.current["mycount"] = count count += 1 } end arr.each {|t| t.join; print t["mycount"], ", " } puts "count = #{count}" |
8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10 |
count
aus. Um das ein wenig interessanter zu machen, lassen wir jeden Thread eine zufällige Zeitspanne warten, bis er den Wert zurückliefert.
abort_on_exception
false
ist, (das ist der Default-Zustand) dann schießt eine unbehandelte Exception einfach nur den aktuellen Thread ab --- der ganze Rest läuft weiter. Im folgenden Beispiel wird Thread Nummer 3 hochgehen und keinerlei Output erzeugen. Trotzdem kann man immer noch die Ergebnisse der anderen Threads sehen.
threads = [] 6.times { |i| threads << Thread.new(i) { raise "Boom!" if i == 3 puts i } } threads.each {|t| t.join } |
01 2 4 5 prog.rb:4: Boom! (RuntimeError) from prog.rb:8:in `join' from prog.rb:8 from prog.rb:8:in `each' from prog.rb:8 |
abort_on_exception
auf true
setzt, dann killt eine nicht behandelte Exception alle anderen Threads. Wenn jetzt Thread 3 stirbt, wird kein Output mehr erzeugt.
Thread.abort_on_exception = true threads = [] 6.times { |i| threads << Thread.new(i) { raise "Boom!" if i == 3 puts i } } threads.each {|t| t.join } |
01 2 prog.rb:5: Boom! (RuntimeError) from prog.rb:7:in `initialize' from prog.rb:7:in `new' from prog.rb:7 from prog.rb:3:in `times' from prog.rb:3 |
Thread
besitzt eine Anzahl von Methoden, um den Thread-Scheduler zu kontrollieren. Der Aufruf von Thread.stop
stoppt den aktuellen Thread, während
Thread#run
dafür sorgt, dass ein bestimmter Thread läuft. Thread.pass
unterbricht den aktuellen Thread, damit andere auch mal drankommen, und Thread#join
und Thread#value
hält den aufrufenden Thread solange an, bis der angegebene Thread beendet ist.
Wir demonstrieren diese Eigenschaften mit folgendem völlig bedeutungslosen Programm.
t = Thread.new { sleep .1; Thread.pass; Thread.stop; } |
||
t.status |
» | "sleep" |
t.run |
||
t.status |
» | "run" |
t.run |
||
t.status |
» | false |
true
gesetzt ist (mit der Thread.critical=
-Methode), dann wird der Scheduler keinem anderen existierenden Thread die Kontralle zuteilen.
Allerdings blockiert das nicht die Erzeugung und den Ablauf von neuen Threads.
Bestimmte Thread-Operationen (wie etwa Stoppen oder Killen eines Threads, Schlafenlegen des aktuellen Threads oder das Auslösen einer Exception) können einen Thread zur Ausführung bringen, auch wenn man gerade in einer Critical Section ist.
Es ist natürlich möglich, Thread.critical=
direkt zu benutzen, aber es ist nicht so besonders überzeugend. Glücklicherweise gibt es in Ruby zusätzlich noch verschiedene Alternativen. Davon sind zwei der besten die Klasse Mutex
und die Klasse ConditionVariable
aus dem thread
-Bibliotheks-Modul; siehe die Dokumantation ab Seite 462.
Mutex
benutzt eine einfache Semaphore-Blockierung für den gegenseitigen Ausschluss des Zugriffs auf gemeinsame Daten. Das bedeutet, dass immer nur ein Thread zur Zeit die Blockierung besitzt. Andere Threads können sich entscheiden, zu warten, bis die Blockierung für sie verfügbar ist oder sie melden einfach den Fehler, dass die Blockierung nicht verfügbar ist.
Ein Mutex wird oft benutzt, wenn ein Update auf gemeinsam benutzte Daten nur vollständig ablaufen darf. Sagen wir, das Update soll im Rahmen einer Transaktion zwei Variablen ändern. Wir können das in einem einfachen Programm simulieren, indem wir zwei Zähler hochzählen. Die Änderungen sollten simultan laufen --- von außen sollte man niemals verschiedene Werte sehen. Ohne irgendeine Art von Mutex-Kontrolle klappt das gar nicht.
count1 = count2 = 0 |
||
difference = 0 |
||
counter = Thread.new do |
||
loop do |
||
count1 += 1 |
||
count2 += 1 |
||
end |
||
end |
||
spy = Thread.new do |
||
loop do |
||
difference += (count1 - count2).abs |
||
end |
||
end |
||
sleep 1 |
||
Thread.critical = 1 |
||
count1 |
» | 187651 |
count2 |
» | 187650 |
difference |
» | 77442 |
count1
und count2
vorgefunden hat.
Glücklicherweise können wir das mit einem Mutex bereinigen.
require 'thread' mutex = Mutex.new count1 = count2 = 0 difference = 0 counter = Thread.new do loop do mutex.synchronize do count1 += 1 count2 += 1 end end end spy = Thread.new do loop do mutex.synchronize do difference += (count1 - count2).abs end end end |
sleep 1 |
||
mutex.lock |
||
count1 |
» | 21636 |
count2 |
» | 21636 |
difference |
» | 0 |
require 'thread' mutex = Mutex.new cv = ConditionVariable.new a = Thread.new { mutex.synchronize { puts "A: I have critical section, but will wait for cv" cv.wait(mutex) puts "A: I have critical section again! I rule!" } } puts "(Later, back at the ranch...)" b = Thread.new { mutex.synchronize { puts "B: Now I am critical, but am done with cv" cv.signal puts "B: I am still critical, finishing up" } } a.join b.join |
A: I have critical section, but will wait for cv(Later, back at the ranch...) B: Now I am critical, but am done with cv B: I am still critical, finishing up A: I have critical section again! I rule! |
monitor.rb
und sync.rb
im lib
-Unterverzeichnis der Distribution.
system
- und Backquote-Methoden.
system("tar xzf test.tgz") |
» | tar (child): Cannot open archive test.tgz: No such file or directory\ntar (child): Error is not recoverable: exiting now\ntar: Child returned status 2\ntar: Error exit delayed from previous errors\nfalse |
result = `date` |
||
result |
» | "Sun Mar 4 23:24:12 CST 2001\n" |
Kernel::system
führt das gegebene Kommando in einem Subprozess aus; sie gibt true
zurück, wenn das Kommando gefunden und erfolgreich ausgeführt wurde, sonst false
. Im Falle eines Fehlschlags findet man den Rückgabecode in der globalen Variablen
$?
.
Ein Problem mit system
ist, dass das Kommando einfach seinen Output auf das selbe Ziel schreibt, wie das Programm, meistens möchte man das nicht. Um den Standard-Output eines Subprozesses abzufangen, kann man Backquotes benutzen wie bei `date`
im vorherigen Beispiel. Man denke aber daran, das man eventuell
String#chomp
braucht, um etwaige Zeilenende-Zeichen vom Ergebnis abzuschneiden.
Okay, für einfache Fälle ist das ja ganz schön --- wir lassen einige ander Prozesse laufen und warten auf ihre Rückgabe. Aber oft braucht man ein bisschen mehr Kontrolle. Wir wollen uns mit dem Subprozess unterhalten, vielleicht ein paar Daten schicken oder auch zurückerhalten.
Die Methode IO.popen
macht genau dies. Die popen
-Methode lässt ein Kommando als Subprozess laufen und verbindet dessen Standard-Input und Standard-Output mit einem Ruby-IO
-Objekt. Man schreibt in dieses IO
-Objekt und der Subprozess kann das über seinen Standard-Input lesen. Und was der Subprozess schreibt ist im Ruby-Programm lesbar als Output des IO
-Objekts.
Als Beispiel zeigen wir hier eins unserer nützlicheren Utilities,
pig
, ein Programm, das Wörter vom Standard-Input liest und sie in Pig-Latin (oder igPay,atinLay) wieder ausgibt. Wir benutzen das, wenn unsere Ruby-Programme Sachen ausgeben sollen, die unsere 5-jährigen Kinder nicht lesen sollen.
pig = IO.popen("pig", "w+") pig.puts "ice cream after they go to bed" pig.close_write puts pig.gets |
iceway eamcray afterway eythay ogay otay edbay |
pig
-Programm den von ihm geschriebenen Output nicht weiterleitet. Unser erster Versuch mit diesem Beispiel, mit pig.puts
gefolgt von pig.gets
, blieb einfach für immer hängen. Das pig
-Programm bearbeitete den Input, aber das Ergebnis wurde nie in die Pipe geschrieben. Wir mussten pig.close_write
einfügen. Das schickte ein end-of-file an den Standard-Input von pig
und der Output, auf den wir gewartet hatten, wurde weitergeleitet, sobald pig
beendet wurde.
Es gibt da eine weitere Eigenheit bei popen
. Wenn das übergebene Kommando ein einzelnes Minuszeichen ist (``--''), so wird popen
in einen neuen Ruby-Interpreter aufspalten. Beide, dieser und der Original-Interpreter, werden weiterlaufen, wenn popen
beendet wird. Der Original-Prozess erhält ein IO
-Objekt zurück, während der Kind-Prozess ein nil
erhält.
pipe = IO.popen("-","w+") if pipe pipe.puts "Get a job!" $stderr.puts "Child says '#{pipe.gets.chomp}'" else $stderr.puts "Dad says '#{gets.chomp}'" puts "OK" end |
Dad says 'Get a job!' Child says 'OK' |
popen
sind die üblichen Unix-Aufrufe Kernel::fork
, Kernel::exec
und IO.pipe
verfügbar, wenn die Plattform das unterstützt. Genauso produziert die Dateinamenskonvention von vielen IO
-Methoden und Kernel::open
Subprozesse, wenn man ein ``|
'' vor den Dateinamen setzt (ausführlicher ist das in der Einleitung zur IO
-Klasse auf Seite 329 beschrieben). Beachte, dass man mit File.new
keine Pipes erzeugen kann; das ist nur für Dateien.
exec("sort testfile > output.txt") if fork == nil # The sort is now running in a child process # carry on processing in the main program # then wait for the sort to finish Process.wait |
Kernel::fork
gibt dem Elternteil eine Prozess-Id zurück und dem Kind nil
, so dass der Kindprozess den
Kernel::exec
-Aufruf ausführt und die Sortierung startet. Irgendwann später setzen wir einen
Process::wait
-Aufruf ab, der auf das Ende der Sortierung wartet (und ihre Prozess-Id zurückgibt).
Wenn man lieber benachrichtigt werden will, wenn der Kindprozess fertig ist (statt nur herumzuwarten), kann man einen Signal-Hnadler aufstellen mit
Kernel::trap
(beschrieben auf Seite 431). Hier fangen wir SIGCLD
ab, das Signal für ``death of child
process'' (Tod des Kindprozesses).
trap("CLD") { pid = Process.wait puts "Child pid #{pid}: terminated" exit } exec("sort testfile > output.txt") if fork == nil # do other stuff... |
Child pid 14481: terminated |
IO.popen
arbeitet mit einem Block in etwa genauso wie File.open
.
Man gibt popen
ein Kommando mit, etwa date
, und der Block wird an das IO
-Objekt als Parameter weitergereicht.
IO.popen ("date") { |f| puts "Date is #{f.gets}" } |
Date is Sun Mar 4 23:24:12 CST 2001 |
IO
-Objekt wird automatisch geschlossen, wenn der Block beendet wird, genauso wie bei File.open
.
Wenn man einen Block einem Kernel::fork
zuordnet, wird der Code-Block in einem Ruby-Unterprozess ausgeführt, und der Elternteil arbeitet nach dem Block weiter.
fork do puts "In child, pid = #$$" exit 99 end pid = Process.wait puts "Child terminated, pid = #{pid}, exit code = #{$? >> 8}" |
In child, pid = 14488 Child terminated, pid = 14488, exit code = 99 |
$?
vor der Anzeige 8 Bits nach rechts geschoben? Das ist eine ``Eigenheit'' von Posix-Systemen: die unteren 8 Bits des Exit-Codes enthalten den Grund für das Programmende, während die oberen 8 Bits den tatsächlichen Exit-Code enthalten.