[WikiDyd] [TitleIndex] [WordIndex

Wątki

Programowanie wielowątkowe

Mimo rozwoju komputerów równoległych i rozproszonych (sieciowych) systemów obliczeniowych ciągle jeszcze większość programów działa w środowiskach sekwencyjnych, gdzie procesor wykonuje ciąg instrukcji jedna po drugiej. Wiele programów wymaga jednak "równoległego" wykonywania więcej niż jednego ciągu operacji. Typowym przykładem takiego zadania jest nadzór procesu technologicznego, w którym trzeba równolegle obserwować szereg parametrów (np. temperatura, wilgotność powietrza, ciśnienie) i, w razie potrzeby, uruchamiać urządzenia korygujące, które też moga wymagać nadzoru. Innym przykładem może być program sterujący elektronicznym systemem ochrony, który musi dzielić czas pomiędzy wyświetlanie obrazów z kilku kamer wideo i reagować na ewentualne akcje operatora systemu (poruszanie lub przełączanie kamer, powiadamianie strażników,...).

Jeżeli piszemy takie programy np. w C, to zmuszeni jesteśmy symulować równoległość przez przełączanie sterowania pomiędzy różne procesy, np.:

   1 while( 1 ) {
   2    t1= getTemperature( sensor1 );
   3    t2= getTemperature( sensor2 );
   4    t3= getTemperature( sensor3 );
   5    p1= getPressure( sensor5 );
   6    p2= getPressure( sensor6 );
   7    if( t1 > TMAX || t2 > TMAX )
   8       stopHeater( h1 );
   9    if( t1 < t2 )
  10       switchHeater( h2 );
  11    if( t3 > TMAX && p1 > PMAX )
  12       openValve( );
  13    /*
  14      itd.
  15    */
  16 }

   1 while( n_nodes > 4 ) {
   2    check_sharp_edges( &n_nodes,  nodes, x, y, np, nop, ne );
   3    if( n_nodes < 5 )
   4       break;
   5    if( userInput() )  /* sprawdzamy, czy uzytkownik sie nie znudzil */
   6       break;
   7    find_edge_seq( &n_nodes, nodes, x, y, np, nop, ne, &l_seq, &ptr );
   8    if( userInput() )  /* sprawdzamy, czy uzytkownik sie nie znudzil */
   9       break;
  10    if( l_seq > 1 )
  11       close_edge_seq( &n_nodes, nodes, x, y, np, nop, ne, &l_seq, &ptr );
  12    else
  13       catalizer( &n_nodes, nodes, x, y, np, nop, ne );
  14    if( n_nodes < 5 )
  15       break;
  16    if( userInput() )  /* sprawdzamy, czy uzytkownik sie nie znudzil */
  17       break;
  18 }
  19 if( userInput() ) /*  uzytkownik sie znudzil */
  20    processUserInput();

W obu tych przypadkach potrzeba mieszania kodu wykonującego różne czynności powoduje zaciemnienie programu, utrudnia jego konserwację i zwiększa możliwość popełnienia błędu.

Składnia Javy umożliwia zdefiniowanie i uruchomienie w ramach jednego programu szeregu tak zwanych "wątków", które działają "równolegle", to znaczy współdzielą (w czasie) zasoby JVM.

Każdy wątek może opisywać tylko jedną sekwencję czynności (np. interakcję z operatorem, wyświetlanie obrazu z określonej kamery, monitoring czujnika podczerwieni). Wątki mogą się ze sobą komunikować i dzielić wspólne obiekty w taki sposób, aby sobie nie "przeszkadzać".

Tworzenie wątków

Przeanalizujmy najpierw dwa przykłady podobne do programów PingPong prezentowanych w książce [Java]. Pierwszy przykład jest analogiczny do programu z rozdziału 9.1: tworzymy klasę pochodną od Thread:

   1 class W1 extends Thread {
   2    String name;
   3    long delay;
   4    long nRepet;
   5 
   6    W1( String w, long dt, long n ) {
   7       name= w;
   8       delay= dt;
   9       nRepet= n;
  10    }
  11 
  12    public void run() {
  13       try {
  14          for( int i= 0; i < nRepet; i++) {
  15             System.out.print( name + " " );
  16             sleep( delay );
  17          }
  18       } catch( InterruptedException e ) {
  19          System.out.println( "Exiting " + name + "\n" );
  20          return;
  21       }
  22    }
  23 
  24    public static void main( String args[] ) 
  25       throws InterruptedException
  26    {
  27 
  28       if( args.length == 0 || args.length % 3 != 0 ) {
  29          System.err.print( "Przyklad potrzebuje podzielnej przez 3 liczby argumentow.\n" +
  30                            "W kazdej trojce pierwszy argument to nazwa wątku,\n" +
  31                            "drugi to opoznienie w milisekundach\n" +
  32                            "trzeci to liczba powtorzen.\n" +
  33                            "Kazdy watek wypisuje swoja nazwe odpowiednia liczbe razy,\n" +
  34                            "co zadane opoznienie.\n" );
  35       } else {
  36          for( int i= 0; i < args.length; i += 3 )
  37             new W1( args[i], 
  38                     Long.valueOf( args[i+1] ).longValue(), 
  39                     Long.valueOf( args[i+2] ).longValue()  
  40                   ).start();
  41       }
  42    }
  43 }

Każda klasa pochodna od Thread powinna implementować metodę run, zastępując pustą taką metodę definiowanę przez Thread. Uruchomienie tej metody następuje przez wywołanie metody start. Wątek kończy działanie po zakończeniu run. Program kończy działanie po zakończeniu wszystkich swoich wątków (także tego, który reprezentuje metodę main).

Inny sposób tworzenia wątków to definicja klasy, która implementuje interfejs Runnable, który deklaruje metodę run. Pozwala to uczynić klasę wątku pochodną od innej klasy, niż Thread.

Przepiszemy teraz klasę W1 w takiej właśnie wersji, tworząc W2:

   1 class W2 implements Runnable {
   2    String name;
   3    long delay;
   4    long nRepet;
   5 
   6    W2( String w, long dt, long n ) {
   7       name= w;
   8       delay= dt;
   9       nRepet= n;
  10    }
  11 
  12    public void run() {
  13       try {
  14          for( int i= 0; i < nRepet; i++) {
  15             System.out.print( name + " " );
  16             Thread.sleep( delay );
  17          }
  18       } catch( InterruptedException e ) {
  19          System.out.println( "Exiting " + name + "\n" );
  20          return;
  21       }
  22    }
  23 
  24    public static void main( String args[] ) 
  25       throws InterruptedException
  26    {
  27 
  28       if( args.length == 0 || args.length % 3 != 0 ) {
  29          System.err.print( "Przyklad potrzebuje podzielnej przez 3 liczby argumentow.\n" +
  30                            "W kazdej trojce pierwszy argument to nazwa wątku,\n" +
  31                            "drugi to opoznienie w milisekundach\n" +
  32                            "trzeci to liczba powtorzen.\n" +
  33                            "Kazdy watek wypisuje swoja nazwe odpowiednia liczbe razy,\n" +
  34                            "co zadane opoznienie.\n" );
  35       } else {
  36          for( int i= 0; i < args.length; i += 3 ) {
  37             Runnable r= new W2( args[i], 
  38                     Long.valueOf( args[i+1] ).longValue(), 
  39                     Long.valueOf( args[i+2] ).longValue()  
  40                   );
  41             new Thread( r ).start();
  42          }
  43       }
  44    }
  45 }

Zatrzymywanie wątków

Ponieważ wątek kończy działanie przez zakończenie swojej metody run, a więc chcąc zatrzymać wątek możemy przekazać jego metodzie run informację o potrzebie zatrzymania - np. przez specjalną zmienną-flagę, którą metoda run okresowo sprawdza.

class Wx extends Thread {    |     class WxUser {
   public boolean canRun;    |        // jakis tam kod
   // inne pola              |
                             |        public user( ) {
   public void start() {     |           Wx t= new Wx();
      canRun= true;          |
      super.start();         |           t.start();
   }                         |           // cos tam
                             |           if( trzebaPrzerwac_t )
   public void run() {       |              t.canRun= false;
   // inicjalizacja          |           //
                             |        }
      while( canRun ) {      |     }
         // cos tam robimy   |
      }                      |
   }                         |
                             |
   // inne metody            |
}                            |

Wątek może być też zatrzymany przez wywołanie jego metody stop. Ten sposób nie jest jednak zalecany, w Javie 1.1 został zastąpiony przez mechanizm przerwania, o którym napiszemy dalej. Przeanalizujmy najpierw klasę W1, w której do metody main dodamy kod zatrzymujący uruchomione wątki po odczekaniu pięciu sekund. Przy okazji obejrzymy też działanie metod suspend i resume, pozwalających zatrzymywać wątek na pewien czas i wznawiać jego działanie.

   1 class W3 extends Thread {
   2    String name;
   3    long delay;
   4    long nRepet;
   5 
   6    W3( String w, long dt, long n ) {
   7       name= w;
   8       delay= dt;
   9       nRepet= n;
  10    }
  11 
  12    public void run() {
  13       try {
  14          for( int i= 0; i < nRepet; i++) {
  15             System.out.print( name + " " );
  16             sleep( delay );
  17          }
  18       } catch( InterruptedException e ) {
  19          System.out.println( "Exiting " + name + "\n" );
  20          return;
  21       }
  22       System.out.print( "\nEnd of " + name + ".\n" );
  23    }
  24 
  25    public static void main( String args[] ) 
  26       throws InterruptedException
  27    {
  28 
  29       if( args.length == 0 || args.length % 3 != 0 ) {
  30          System.err.print( "Przyklad potrzebuje podzielnej przez 3 liczby argumentow.\n" +
  31                            "W kazdej trojce pierwszy argument to nazwa watku,\n" +
  32                            "drugi to opoznienie w milisekundach\n" +
  33                            "trzeci to liczba powtorzen.\n" +
  34                            "Kazdy watek wypisuje swoja nazwe odpowiednia liczbe razy,\n" +
  35                            "co zadane opoznienie.\n" +
  36                            "Dodatkowo dziala watek, który 50 razy, co 0.1 s\n" +
  37                            "wypisuje znak nowej linii." +
  38                            "Ten watek jest zawieszany po jednej sekundzie na 2 s.,\n" +
  39                            "a potem wznawiany." +
  40                            "Po dalszych 2 s. zatrzymywane sa wszystkie watki uzytkownika." );
  41       } else {
  42          // start "backgroud" thread
  43          W3 tlo= new W3( "\n", 100, 50 );
  44          tlo.start();
  45          W3 [] all= new W3[ args.length / 3 ];
  46          // start "user" threads
  47          for( int i= 0; i < args.length; i += 3 ) {
  48             all[ i / 3 ] = new W3( args[i], 
  49                     Long.valueOf( args[i+1] ).longValue(), 
  50                     Long.valueOf( args[i+2] ).longValue()  
  51                   );
  52             all[ i / 3].start();
  53          }
  54          sleep( 1000 );  // wait 1 second
  55          tlo.suspend();  // suspend background
  56          sleep( 2000 );  // wait 2 second
  57          tlo.resume();   // resume background
  58          sleep( 2000 );  // wait again
  59          // stop user threads
  60          for( int i= 0; i < args.length; i += 3 )
  61             all[ i / 3].stop();
  62       }
  63    }
  64 }

Po uruchomieniu tego programu, np.

java W3 Ala 100 10 ma 100 10 Asa. 100 10

możemy sie przekonać, że jedynie wątek wypisujący znak nowej linii kończy swoje działanie (metodę run) naturalnie. Inne wątki są przerywane nagle, bez możliwości "posprzątania po sobie". Dzieje się tak dlatego, że metoda stop zgłasza wyjątek ThreadDeath, który jest podklasą klasy Error. Metoda run nie powinna go przechwytywać (a jeśli go przechwyci, to powinna go znowu zgłosić) - więcej o tym można poczytać w rozdziale 9.9 [Java].

Mechanizm przerwań umożliwia gładkie zakończenie wątku, z wykonaniem ewentualnych czynności końcowych. Działanie tego mechanizmu pokazuje zmodyfikowana metoda main w klasie W3:

   1    /* to co przedtem */
   2    public static void main( String args[] ) 
   3       throws InterruptedException
   4    {
   5       /* to co przedtem */
   6          for( int i= 0; i < args.length; i += 3 )
   7             all[ i / 3].interrupt();
   8       }
   9    }
  10 }

Wywołanie interrupt powoduje zgłoszenie wyjątku InterruptedException w niektórych metodach - w naszym przykładzie jest to sleep. Metoda run może przechwycić to przerwanie, albo, jeżeli nie wywołuje żadnej metody zgłaszającej InterruptedException, sprawdzić, przez wywołanie Thread.interrupted(), czy bieżący wątek otrzymał przerwanie.

Zewnętrznie można sprawdzić, czy dany wątek otrzymał przerwanie, wywołując dla niego metodę isInterupted().

Jeżeli chcemy, aby wątek zaczekał na zakończenie innego wątku, to możemy wykorzystać metodę join tego innego wątku. W przedstawionym niżej fragmencie programu metoda generation uruchamia wątki IndividualFitness, a następnie czeka na ich zakończenie przed wywołaniem metody populationStatistics:

   1 public class IndividualFitness extends Thread {
   2 // tu definicja klasy
   3 }
   4 
   5 public class GA {
   6    // ...
   7    public void generation() {
   8       oldPopulation= newPopulation;
   9       IndividualFitness [] FitPool= new IndividualFitness[ populationSize ];
  10    
  11       for( int i= 0; i < populationSize; i++ ) {
  12          newPopulation[i]= new Individual( oldPopulation );
  13          FitPool[i]= new individualFitness( newPopulation[i] );
  14          FitPool[i].start();
  15       }
  16       for( int i= 0; i < populationSize; i++ ) {
  17          FitPool[i].join();
  18          newPopulation[i].setFitness( FitPool.result() );
  19       }
  20       populationStatistics( newPopulation );
  21    }
  22    // ..
  23 }

Wątki-demony

Po utworzeniu, ale przed uruchomieniem wątku możemy go zadeklarować jako demona (przez wywołanie jego metody setDaemon z argumentem true). Wątek zadeklarowany jako demon jest przerywany po zakończeniu wszystkich wątków nie-demonów. Wątki pochodne od demona są także demonami o ile nie zmienimy tego przez wywołanie setDaemon.

Stanu wątku nie można zmienić po jego uruchomieniu, można się jednak dowiedzieć, czy wątek jest demonem przy pomocy metody getDaemon.

Hierarchia wątków i klasa ThreadGroup

Wątki podzielone są na grupy, które tworzą hierarchie (to znaczy, że wątek dziedziczy grupę od rodzica, ale może tworzyć nowe podgrupy w ramach swojej grupy).

Do tworzenia hierarchii wątków służy klasa ThreadGroup. Obok regulacji bezpieczeństwa klasa ta umożliwia grupowe sterowanie wątkami i ich priorytetami (patrz następny rozdział).

Przykład - metoda kończąca "gładko" wszystkie wątki potomne, która może być wywołana przed zakończeniem danego wątku:

   1 public static void stopThreads( ) {
   2    Thread tenWatek= Tread.currentThread(); // pobierz biezacy watek
   3    ThreadGroup taGrupa= tenWatek.getThreadGroup(); // i jego grupe
   4    Thread[] watki = new Thread[ taGrupa.activeCount() + 50 ];
   5    // activeCount tylko estymuje (cos moglo umrzec i cos sie urodzic)
   6    // dlatego + 50  - na zapas
   7 
   8    taGrupa.enumerate( watki, true ); // pakujemy watki do tablicy
   9                                      // true jest nadmiarowe, bo jest tez
  10                   // przeciazona, domyslnie rekurencyjna wersja enumerate
  11 
  12    // przerywamy wszystkie z wyjatkiem biezacego
  13    for( int i= 0; i < watki.length; i++ )
  14       if( watki[i] != null && watki[i] != tenWatek )
  15          watki[i].interrupt();
  16 
  17    // czekamy az stana
  18    for( int i= 0; i < watki.length; i++ )
  19       if( watki[i] != null && watki[i] != tenWatek ) {
  20          try {
  21             watki[i].join(); // join zglasza InterruptedException
  22          } catch( InterruptedException e ) {
  23             // zlekcewaz - czekamy na "potomstwo"
  24          }
  25       }
  26    // zostal tylko tenWatek
  27 }

Priorytety, szeregowanie wątków

Na początku powiedzieliśmy, że mechanizm wątków służy bardziej czytelności programów niż przystosowaniu Javy do wykonywania obliczeń równoległych, czy rozproszonych (choć bez wątpienia należy stwierdzić, że wątki ułatwiają "zrównoleglanie" kodu). Program wielowątkowy uruchamiany w środowisku, gdzie nie ma możliwości przydzielenia każdemu wątkowi osobnego procesora wymaga procedury symulującej równoległość - przydzielającej poszczególnym wątkom czas procesora w postaci przeplecionych "kwantów".

Każdy wątek ma przyporządkowany określony priorytet - liczbę całkowitą z zakresu od Thread.MIN_PRIORITY do Thread.MAX_PRIORITY i standardową wartością Thread.NORM_PRIORITY. Początkowo wątek dziedziczy priorytet od swojego rodzica, można odczytać tę wartość przy pomocy metody getPriority i zmienić w dowolnej chwili przy pomocy setPriority.

Przydział czasu procesora zależy od wartości priorytetu. Wątek, który wykonuje powtarzalną czynność w tle (np. wyświetlanie danych, odświeżanie ekranu, długotrwałe obliczenia) powinien ustawić sobie (lub otrzymać) możliwie niski (bliski MIN_PRIORITY) priorytet. Wątek obsługujący dialog z użytkownikiem powinien mieć priorytet wysoki, aby mógł odpowiednio szybko zareagować na wprowadzanie danych.

Procedura szeregowania wątków gwarantuje, że zawsze wykonywane będą wątki o najwyższym priorytecie, oraz że wątki o niższych priorytetach będą wykonywane, gdy te o wyższych będą zablokowane (np. uśpione). Gdy wykonywany wątek zostaje zablokowany, Java wybiera gotowy do wykonania wątek sposród tych o najwyższym priorytecie. Może się też zdarzyć, że wątki o niższych priorytetach będą wykonywane równolegle z wątkami o wyższych priorytetach.

Wątek zostaje zablokowany przez wykonanie funkcji sleep (na określony czas) lub może przekazać sterowanie do procedury szeregowania wątków przez wywołanie funkcji yield. Procedura szeregowania może ponownie przekazać sterowanie do tego samego wątku.

Przykład - funkcja, która przydziela wszystkim wątkom potomnym priorytet o jeden mniejszy od priorytetu wątku bieżącego:

   1 public static void setThreadsPriority( ) {
   2    Thread tenWatek= Thread.currentThread(); // pobierz biezacy watek
   3    ThreadGroup taGrupa= tenWatek.getThreadGroup(); // i jego grupe
   4    Thread[] watki = new Thread[ taGrupa.activeCount() + 50 ];
   5    // activeCount tylko estymuje (cos moglo umrzec i cos sie urodzic)
   6    // dlatego + 50  - na zapas
   7 
   8    taGrupa.enumerate( watki ); // pakujemy watki do tablicy
   9    
  10    // bieżący dostaje max priorytet
  11    tenWatek.setPriority( Thread.MAX_PRIORITY );
  12 
  13    // mogl nie dostac MAX_PRIORITY, bo ograniczenia dla grupy 
  14    // to uniemozliwialy
  15    int otherPriority= tenWatek.getPriority() - 1;
  16 
  17    // ustawiamy innym mniejszy
  18    for( int i= 0; i < watki.length; i++ )
  19       if( watki[i] != null && watki[i] != tenWatek )
  20          watki[i].setPriority( otherPriority );
  21    // ale potem cos innego mogloby zwiekszyc priorytet
  22    // ktoregos z tych watkow
  23 
  24    //Mozna skuteczniej:
  25    taGrupa.setMaxPriority( otherPriority );
  26 
  27 }

Synchronizacja

Jeżeli kilka wątków (np. 2) korzysta z tego samego obiektu, to próba "równoczesnego" dostępu do danych może powodować problemy: klasyczny przykład to dwa wątki modyfikujace stan konta:

1. Wątek A pobiera starą wartość 2. Wątek B pobiera starą wartość 3. Wątek A ustawia nową wartość na "stara + X" 4. Wątek B ustawia nową wartość na "stara + Y"

w efekcie której stan konta zwiększa sie tylko o Y a nie o X+Y.

Zadeklarowanie określonej metody jako synchronizowanej powoduje, że obiekt jest blokowany, gdy jakiś wątek wykonuje synchronizowaną metodę. Obiekt zablokowany jest niedostępny dla innych wątków.

   1 class SP {
   2    private double x,y;
   3 
   4    SP( ) {
   5       x= 0;
   6       y= 0;
   7    }
   8 
   9    public synchronized double x() {
  10       return x;
  11    }
  12 
  13    public synchronized double y() {
  14       return y;
  15    }
  16 
  17    public synchronized void move( double dx, double dy ) {
  18       x += dx;
  19       y += dy;
  20    }
  21 
  22    public synchronized void newX( double x ) {
  23       this.x = x;
  24    }
  25 
  26    public synchronized void newY( double y ) {
  27       this.y = y;
  28    }
  29 
  30    public synchronized void moveToward( SP other, double a ) {
  31       this.x += ( other.x() - this.x ) * a;
  32       this.y += ( other.y() - this.y ) * a;
  33    }
  34 }

Jeżeli kilka wątków używa jednego obiektu klasy SP, to obiekt ten jest blokowany tak, że tylko jeden wątek może w danym momencie wykonywać dowolną metodę poza konstruktorem. Konstruktor nie wymaga synchronizacji, gdyż jest on zawsze wykonywany lokalnie w którymś z wątków. Synchronizacja zapewnia, że dwa wątki nie będą równocześnie wykonywać np. metody move, co mogłoby doprowadzić do utraty jednego z przesunięć. Synchronizacja nie chroni jednak przed wykonaniem czegoś takiego:

----- wątek A ---------------------wątek B -------
                        |
   double a= p.x();     |
                        |        double b= p.x();
   a += 10.;            |
                        |        b += 20.;
   p.newX( a );         |
                        |        p.newX( b )
                        |
------- p.x zwiększyło się o 20 a nie o 30 ! -----

Zamiast używać metod synchronizowanych można niekiedy stosować instrukcję synchronized, która ma następującą postać:

synchronized ( <obiekt> ) 
   <kod>

Np.:

/** Move polygon by [ dx, dy ] */
public static void move( Point[] polygon, double dx, double dy ) {
   synchronized( polygon ) {
      for( int i= 0; i < polygon.length; i++ )
         polygon[i].move( dx, dy );
   }
}

Zaprojektowanie klasy z metodami synchronizowanymi jest jednak zdecydowanie bezpieczniejsze, gdyż przy częstym stosowaniu instrukcji synchronized spore jest prawdopodobieństwo, że gdzieś o niej zapomnimy. Wykrycie błędów niesynchronizowanej modyfikacji danych jest zawsze trudne, gdyż efekty tych błędów mogą być niepowtarzalne.

Kolejnym problemem, który nie jest wykrywany przez Javę, i z którym musi poradzić sobie programista jest tak zwane zakleszczenie. Polega ono na tym, że kilka (najczęściej dwa, ale może być więcej) wątków wzajemnie się blokuje. Załóżmy następującą sytuację:

----- wątek A ---------------------wątek B ----------------------
  a= p1.moveToward( p2, 0.25 );  |  b= p2.moveToward( p1, 0.25 );
---------------------------------+-------------------------------
  blokada p1                     |
                                 |  blokada p2
  oczekiwanie na p2              |  
                                 |  oczekiwanie na p1
                                 |
-------------------------- zakleszczenie: -----------------------

wątek A czeka na p2 i nie zwalnia p1, a wątek B czeka na p1 i nie zwalnia p2 => A i B wzajemnie się blokują.

Komunikacja pomiędzy wątkami

Wyobraźmy sobie program, który przyjmuje zlecenia do obsługi, umieszcza je w kolejce LIFO i wykonuje w miarę wolnych zasobów. Można to elegancko zaprojektować w postaci wielowątkowej: wybrany wątek bedzie pobierał zlecenia z synchronizowanej kolejki LIFO, która będzie wypełniana przez inny wątek. W takiej implementacji pojawia się problem komunikacji pomiedzy wątkami: po umieszczeniu zlecenia w kolejce należy poinformować wątek pobierający i umożliwić mu podjęcie sterowania.

Komunikację pomiędzy wątkami realizuje się przez odpowiednią implementację klasy synchronizującej:

   1 import java.util.Stack;  // o tej klasie powiemy troszke 
   2                          // na kolejnym wykladzie
   3 
   4 public class SQueue {
   5    private Stack q;
   6 
   7    public SQueue() {
   8       q = new Stack();
   9    }
  10 
  11    public synchronized void insert( Object o ) {
  12       q.push( o );
  13       notify();    // powiadom watek pobierajacy
  14    }
  15 
  16    public synchronized Object get() 
  17       throws InterruptedException  // z wait()
  18    {
  19       while( q.empty() )
  20          wait(); // czekaj az element sie pojawi
  21 
  22       return q.pop();
  23    }
  24 }

Napiszmy teraz program testujący SQueue, w którym zdefiniujemy wątek wstawiający elementy (SQFeeder) i wątek pobierający elementy (SQEater). Całość może wyglądać tak:

   1 class SQFeeder extends Thread {
   2    int start;
   3    int delay;
   4    int nRepet;
   5    SQueue sq;
   6 
   7    SQFeeder( int s, int n, int dt, SQueue q ) {
   8       start= s;
   9       delay= dt;
  10       nRepet= n;
  11       sq= q;
  12    }
  13 
  14    public void run() {
  15       try {
  16          for( int i= start; i < start+nRepet; i++) {
  17             sq.insert( new Integer( i ) );
  18             sleep( delay );
  19          }
  20       } catch( InterruptedException e ) {
  21          System.out.println( "SQFeeder interrupted\n" );
  22          return;
  23       }
  24    }
  25 }
  26 
  27 class SQEater extends Thread {
  28    SQueue sq;
  29 
  30    SQEater( SQueue q ) {
  31       sq= q;
  32    }
  33 
  34    public void run() {
  35       try {
  36          while( true )
  37             System.out.println( sq.get() );
  38       } catch( InterruptedException e ) {
  39          System.out.println( "SQEater interrupted\n" );
  40          return;
  41       }
  42    }
  43 }
  44 
  45 public class SQTester {
  46    public static void main( String [] args )
  47       throws InterruptedException  // ze sleep()
  48    {
  49       SQueue q= new SQueue();
  50       SQFeeder f= new SQFeeder( 0,  20, 250, q );
  51       SQEater e= new SQEater( q );
  52 
  53       f.start();  // start feeder
  54 
  55       Thread.sleep( 4000 );  // wait 4 s.
  56 
  57       e.start(); // start eater - it should print 15-0 immediately,
  58                  // then 16-19 with 250 ms. pauses
  59 
  60       f.join();  // wait for feeder to finish
  61 
  62       e.interrupt(); // finish eater
  63       e.join();      // let him stop
  64    }
  65 }

Metody wait (jest kilka przeciążonych wersji - zobacz [Java], rodział 9.4), notify i notifyAll (też zobacz [Java], 9.4) są zdefiniowane w klasie Object, a więc można ich używać we wszystkich klasach.

wait powinna być zawsze wywoływana w metodzie synchronizowanej, w pętli (jak w metodzie SQueue.get). Jej wywołanie zwalnia blokadę obiektu, z którego została wywołana i JEDNOCZEŚNIE wstrzymuje wykonanie bieżącego wątku.

notify powiadamia jeden (ten czekający najdłużej) wątek czekający na zwolnienie blokady obiektu, że należy sprawdzić stan obiektu. Jeżeli na zmianę stanu czeka więcej niż jeden wątek, to lepiej (a raczej "trzeba") użyć metody notifyAll. Metody te także należy wywoływać z kodu synchronizowanego.

Implementacja mechanizmu powiadamiania polega na tym, że wątki czekające na spełnienie określonego warunku (związanego ze stanem synchronizowanego obiektu X) wykonują w pętli metody wait, cyklicznie zwalniając obiekt. Jeżeli wątek wykonujący wait otrzyma powiadomienie przez metodę X.notify(), lub X.notifyAll(), to wychodzi z wait i ponawia sprawdzanie warunku. Jeśli warunek nie jest spełniony, to wątek ponownie wywołuje wait().

Każdy wątek, który może zmienić stan warunku ma szansę to zrobić, gdyż metody wait odblokowują synchronizowany obiekt X i przekazują sterowanie innym wątkom.

Obiekty volatile

Czasem występują sytuacje, gdy kilka wątków współdzieli obiekt niesynchronizowany i chcemy poinformować kompilator o tym, że jakiś obiekt może być zmieniony przez inny kod. Na przykład w kodzie

//...
percentDone= 0;
while( true ) {
   g.drawString( Double( percentDone ).toString() + "%", 50,25 );
   Thread.sleep( 500 );
}
//...

kompilator mógłby założyć, że wartość zmiennej percentDone nie ulega zmianie i zastapić wyrażenie będące pierwszym argumentem metody drawString przez napis "0%", co może nie być właściwe, jeżeli wartość percentDone może być zmieniana przez inne wątki. Zadeklarowanie zmiennej percentDone jako volatile zapobiega takiej optymalizacji kodu pośredniego.

Diagnostyka

Uruchamianie programów wielowątkowych może być przyczyną wielu frustracji, a nawet chorób nerwowych. Klasy Thread i ThreadGroup zawierają kilka metod, ułatwiających diagnostykę takich programów. Wyliczenie tych metod zawiera rozdział 9.14 [Java].

Obejrzyjmy w przykładowym działaniu metody Thread.toString i Thread.dumpStack:

   1 /** SQTester - wersja nieco zmodyfikowana,
   2     z demo Thread.toString i Thread.dumpStack
   3 */
   4 class SQFeeder extends Thread {
   5    int start;
   6    int delay;
   7    int nRepet;
   8    SQueue sq;
   9 
  10    SQFeeder( int s, int n, int dt, SQueue q ) {
  11       start= s;
  12       delay= dt;
  13       nRepet= n;
  14       sq= q;
  15    }
  16 
  17    public void run() {
  18       try {
  19          for( int i= start; i < start+nRepet; i++) {
  20             sq.insert( new Integer( i ) );                                 
  21             sleep( delay );
  22             if( i == start ) {
  23                System.out.println( "About to call dumpStack in " + this );
  24                Thread.dumpStack();
  25             }
  26          }
  27       } catch( InterruptedException e ) {
  28          System.out.println( "SQFeeder interrupted\n" );
  29          return;
  30       }
  31    }
  32 }
  33 
  34 class SQEater extends Thread {
  35    SQueue sq;
  36 
  37    SQEater( SQueue q ) {
  38       sq= q;
  39    }
  40 
  41    public void run() {
  42       try {
  43          while( true )
  44             System.out.println( sq.get() );
  45       } catch( InterruptedException e ) {
  46          System.out.println( "SQEater: <" + this + "> interrupted\n" );
  47          return;
  48       }
  49    }
  50 }
  51 
  52 public class SQTester {
  53    public static void main( String [] args )
  54       throws InterruptedException  // ze sleep()
  55    {
  56       SQueue q= new SQueue();
  57       SQFeeder f= new SQFeeder( 0,  20, 250, q );
  58       SQEater e= new SQEater( q );
  59 
  60       f.start();  // start feeder
  61 
  62       Thread.sleep( 4000 );  // wait 4 s.
  63 
  64 
  65       e.start(); // start eater - it should print 15-0 immediately,
  66                  // then 16-19 with 250 ms. pauses
  67 
  68       // comment out the line below to see a mixture of output
  69       // from the eater and debug code
  70       synchronized( q )
  71       {
  72          System.out.println( "About to call dumpStack in " + Thread.currentThread() );
  73          Thread.dumpStack();
  74       }
  75 
  76       f.join();  // wait for feeder to finish
  77 
  78       e.interrupt(); // finish eater
  79       e.join();      // let him stop
  80    }
  81 }

2015-09-23 06:44