Next              Up                Back                   Contents

Επόμενο: Ασκήσεις Πάνω:Κεφάλαιο 6o : Σύγχρονος παραλληλισμός Πίσω:Αναφορές


 

ΠΡΟΓΡΑΜΜΑΤΙΣΤΙΚΕΣ ΕΡΓΑΣΙΕΣ

 

1. Διχρωματικός αλγόριθμος Jacobi

 

Στον αλγόριθμο Jacobi για την επίλυση της εξίσωσης του Laplace σε κάθε επανάληψη υπολογίζονται οι νέες τιμές για όλα τα στοιχεία του πίνακα χρησιμοποιώντας τις παλιές τιμές της προηγούμενης επανάληψης. Υπάρχουν όμως και όλες μορφές αλγορίθμων που επιλύουν την εξίσωση αυτή χρησιμοποιώντας νέες τιμές περισσότερο γρήγορα καθώς δημιουργούνται. Με τον τρόπο αυτό οι αλλαγές που συμβαίνουν στο ένα μέρος του πίνακα μπορούν να διαδοθούν πιο γρήγορα στα άλλα μέρη, επιτυγχάνοντας έτσι γρηγορότερη σύγκλιση.

Μια τέτοια τεχνική ονομάζεται Διχρωματικός αλγόριθμος Jacobi. Ο διδιάστατος πίνακας δεδομένων είναι σαν μια σκακιέρα με διαδοχικά κόκκινα και μαύρα τετράγωνα και κάθε στοιχείο δεδομένων είναι ένα τετράγωνο της σκακιέρας. Κάθε αλγόριθμος Jacobi υπολογίζει πάντα την τιμή ενός νέου στοιχείου από τις τιμές των τεσσάρων άμεσων γειτονικών του: πάνω, κάτω, αριστερά και δεξιά. Στην σκακιέρα κάθε κόκκινο στοιχείο έχει μόνο μαύρους γείτονες και κάθε μαύρο στοιχείο έχει μόνο κόκκινους γείτονες. Στον διχρωματικό αλγόριθμο ο υπολογισμός των νέων τιμών γίνεται εναλλάξ για τα μαύρα και κόκκινα στοιχεία. Πρώτα, υπολογίζονται οι νέες τιμές όλων των κόκκινων στοιχείων, χρησιμοποιώντας τις τελευταίες τιμές των γειτονικών μαύρων σημείων. Στην συνέχεια υπολογίζονται οι νέες τιμές των μαύρων σημείων χρησιμοποιώντας τις προσφάτως υπολογισμένες τιμές των κόκκινων σημείων. Μετά τα κόκκινα στοιχεία υπολογίζονται ξανά χρησιμοποιώντας τις νέες τιμές των μαύρων στοιχείων.

Στον Διχρωματικό αλγόριθμο κάθε επανάληψη έχει δυο φάσεις: την κόκκινη φάση και την μαύρη φάση. Κατά τη διάρκεια της κόκκινης φάσης υπολογίζονται οι νέες τιμές για όλα τα κόκκινα στοιχεία και όμοια στην μαύρη φάση. Είναι απαραίτητο η κόκκινη φάση να τελειώσει πριν να αρχίσει η μαύρη φάση. Όμοια, η μαύρη φάση πρέπει να ολοκληρωθεί πριν να αρχίσει η κόκκινη φάση στην επόμενη επανάληψη. Για αυτό το λόγο στην παράλληλη μορφή αυτού του αλγορίθμου είναι απαραίτητος ο συγχρονισμός των διεργασιών μετά την κόκκινη φάση και ξανά μετά την μαύρη φάση.

Η εργασία αφορά το γράψιμο ενός προγράμματος Multi-Pascal που υλοποιεί τον Διχρωματικό αλγόριθμο Jacobi σε ένα πίνακα δυο διαστάσεων με στοιχεία πραγματικούς αριθμούς, όπως αυτοί που χρησιμοποιήθηκαν στα προγράμματα του αλγορίθμου Jacobi αυτού του κεφαλαίου. Δημιουργείστε μια διεργασία για γραμμή του πίνακα, όπως έγινε και στα παραδείγματα των προγραμμάτων Jacobi του κεφαλαίου.

Πρέπει να γράψετε τρεις εκδόσεις του διχρωματικού αλγορίθμου και να συγκρίνετε τους χρόνους εκτέλεσής τους. Οι εκδόσεις 1 και 2 θα πραγματοποιούν ένα προκαθορισμένο αριθμό επαναλήψεων που θα ορίζεται από μια σταθερά μέσα στο πρόγραμμα. Η έκδοση 1 θα χρησιμοποιεί μια τεχνική καθολικού φράγματος για το συγχρονισμό των διεργασιών. Η έκδοση 2 θα χρησιμοποιεί τοπική τεχνική φράγματος. Η έκδοση 3 θα πραγματοποιεί ένα μεταβλητό αριθμό επαναλήψεων που θα καθορίζεται από έναν έλεγχο σύγκλισης και θα χρησιμοποιεί τοπικό φράγμα για το συγχρονισμό σε συνδυασμό με γενική συνάρτηση ομαδοποίησης για την πραγματοποίηση του ελέγχου σύγκλισης.

Κατά τη διάρκεια του προγράμματος, χρησιμοποιήστε ένα μικρό πίνακα δεδομένων για να μειώσετε το χρόνο εκτέλεσης του προγράμματος. Όταν ελέγχετε την απόδοση του προγράμματος θέστε ένα σημείο παύσης που διακόπτει το πρόγραμμα μετά από κάθε επανάληψη. Όταν συγκρίνετε το χρόνο εκτέλεσης των τριών εκδόσεων συγκρίνετε το χρόνο για μια μόνο επανάληψη. Επικεντρώνοντας σε μια μόνο επανάληψη δεν χρειάζεται να αφήσετε το πρόγραμμα να εκτελείται μέχρι να τελειώσει καταναλώνοντας έτσι πολύ χρόνο ιδιαίτερα για ένα μεγάλο πίνακα δεδομένων.

 

2. Απαλοιφή Gauss

 

Σε ένα μεγάλο φάσμα επιστημονικών και μηχανικών εφαρμογών είναι απαραίτητη η επίλυση ενός συστήματος γραμμικών εξισώσεων. Μια από τις πιο δημοφιλείς τεχνικές ονομάζεται Απαλοιφή Gauss. Για λόγους απλότητας, υποθέτουμε ότι έχουμε n εξισώσεις με n αγνώστους και συνεπώς οι συντελεστές κάθε εξίσωσης δημιουργούν έναν nxn πίνακα πραγματικών αριθμών. Το πιο σημαντικό βήμα στην απαλοιφή Gauss από υπολογιστική άποψη είναι η μετατροπή του πίνακα σε μια άνω τριγωνική μορφή, που σημαίνει ότι όλες οι θέσεις από τα αριστερά της κυρίας διαγωνίου θα είναι 0. Για παράδειγμα, ο παρακάτω πίνακας βρίσκεται στην άνω τριγωνική μορφή :

                                                                                    3      5     2        30      21

                                                                                    0      1    17      -3      -49

                                                                                    0      0    33      -6         2

                                                                                    0      0     0         9      12

                                                                                    0      0     0         0      13

 

Από τη στιγμή που ο πίνακας βρίσκεται στην άνω τριγωνική του μορφή, μπορεί να λυθεί με την Προς τα Πίσω Αντικατάσταση που παρουσιάστηκε στο 4ο κεφάλαιο. Το πρώτο βήμα στη μετατροπή ενός πίνακα Α σε άνω τριγωνική μορφή είναι να σαρώσουμε όλα τα στοιχεία της 1ης στήλης και να βρούμε το μεγαλύτερο. Έστω ότι το μεγαλύτερο στοιχείο βρίσκεται στη στήλη i, το επόμενο βήμα του αλγορίθμου είναι η ανταλλαγή όλης της γραμμής i με την 1η. Το στοιχείο Α[1,1] που βρίσκεται στην πάνω αριστερή γωνία του πίνακα ονομάζεται στοιχείο περιστροφής. Τα ακόλουθα δύο βήματα πραγματοποιούνται για κάθε γραμμή του πίνακα, εκτός από την 1η :

 

(1) Υπολογίστε έναν συντελεστή διαιρώντας την αρνητική τιμή του στοιχείου περιστροφής με την τιμή της θέσης 1 της i γραμμής: - Α[1.1] / Α[1,i].

(2) Πολλαπλασιάστε κάθε στοιχείο της στήλης j με τον συντελεστή της γραμμής και προσθέστε το στοιχείο που προκύπτει στο αντίστοιχο στοιχείο της j στήλης της 1η γραμμής.

Μετά την επανάληψη των παραπάνω βημάτων για κάθε γραμμή του πίνακα (εκτός από την 1η), ο πίνακας έχει όλα τα στοιχεία της 1ης στήλης ίσα με 0, εκτός ίσως από αυτό της πάνω αριστερής γωνίας. Τώρα η σειρά 1 και η στήλη 1 του πίνακα βρίσκονται στην κατάλληλη μορφή και μπορούμε να τις αγνοήσουμε εντελώς και να τις αφήσουμε ανέγγιχτες στις επόμενες μετατροπές. Το επόμενο βήμα του αλγορίθμου απαιτεί να επικεντρώσουμε την προσοχή μας στον υποπίνακα που δημιουργήθηκε από τη στήλη 2 ως n και από τη γραμμή 2 ως n. Χρησιμοποιώντας αυτόν τον τετραγωνικό υποπίνακα επαναλαμβάνεται η ίδια ακριβώς διαδικασία με την παραπάνω, μόνο που τώρα το ενδιαφέρον μας βρίσκεται στην 2η γραμμή και στήλη. Με τον τρόπο αυτό θα έχουμε την γραμμή 2 και τη στήλη 2 στην κατάλληλη μορφή. Στην συνέχεια ο αλγόριθμος συνεχίζει με τον τετραγωνικό υποπίνακα που έχει την άνω αριστερή γωνία στη θέση (3,3). Η διαδικασία αυτή συνεχίζεται μέχρι όλος ο πίνακας να έρθει στην άνω τριγωνική μορφή.

Γράψτε ένα παράλληλο πρόγραμμα σε Multi-Pascal που θα μετατρέπει ένα πίνακα πραγματικών αριθμών στην άνω τριγωνική μορφή. Προσπαθήστε να ελαχιστοποιήσετε το χρόνο εκτέλεσης και να μεγιστοποιήσετε την επιτάχυνση του προγράμματος, περιορίζοντας την ακολουθιακότητα του προγράμματος εφαρμόζοντας όσο το δυνατόν περισσότερο παραλληλισμό. Προτείνεται να χρησιμοποιήσετε την τεχνική καταστροφής των διεργασιών για τον συγχρονισμό: χρησιμοποιήστε έναν επαναληπτικό εξωτερικό βρόχο με μια σειρά FORALL δηλώσεων μέσα για τη δημιουργία ομάδων παράλληλων διεργασιών για κάθε φάση υπολογισμού.

Ελέγξτε την ορθότητα του προγράμματος στο σύστημα της Multi-Pascal χρησιμοποιώντας ένα μικρό πίνακα, 3x3 ίσως. Στη συνέχεια προσδιορίστε την επιτάχυνση του προγράμματός σας για τα ακόλουθα μεγέθη πινάκων : 20x20, 30x30 και 40x40. Για έναν πίνακα nxn μια ικανοποιητική έκδοση του αλγορίθμου πρέπει να έχει επιτάχυνση της τάξης του Ο(3n). Αρχικοποιήστε τις τιμές του πίνακα χρησιμοποιώντας τις παρακάτω εντολές :


For i := 1 to n do
  
    For j := 1 to n do
     
	    A[i,j] := i+j;

ΠΡΟΤΕΙΝΟΜΕΝΕΣ ΛΥΣΕΙΣ

 

1.

1. Με καθολικό φράγμα


program jacobi_1;


    Const 


        n=14;


        numiter=5;                    αριθμός επαναλήψεων


    Var 


        a,b:array[0..n+1,0..n+1] of real;


        i,j,count:integer;


        arrival,departure:spinlock;
 

procedure barrier;


    begin


        lock(arrival);                                 φάση άφιξης-άθροιση των διεργασιών


        count:=count+1;                             καθώς εισέρχονται



        if count‹n then 

            unlock(arrival)                             συνέχιση της φάσης άφιξης

        else 

            unlock(departure);                      τέλος της φάσης άφιξης

            lock(departure);                           φάση αναχώρησης-άθροιση των διεργασιών

            count:=count-1;                             καθώς εξέρχονται

            if count>0 then 

                unlock(departure)                     συνέχιση της φάσης αναχώρησης

            else 

                unlock(arrival);                         τέλος της φάσης αναχώρησης

    end;
 
procedure redblack(i:integer;rb:integer);

    var 

        j:integer;

    begin

        if (i mod 2)=(0+rb) then                 έλεγχος για κόκκινη / μαύρη φάση

        begin

            j:=1;

            while j<=n do                            υπολογισμός του μέσου όρου των τεσσάρων 

            begin                                        γειτονικών διεργασιών

                b[i,j]:=(a[i-1,j]+a[i+1,j]+a[i,j-1]+a[i,j+1])/4;

                j:=j+2;

            end;

        end

        else

        begin

            j:=2;

            while j<=n do

            begin

                b[i,j]:=(a[i-1,j]+a[i+1,j]+a[i,j-1]+a[i,j+1])/4;

                j:=j+2;

            end;

        end;

    end;

Begin

    for i:=0 to n+1 do                          Αρχικοποίηση

    begin

        a[0,i]:=5;

        a[n+1,i]:=10;

        a[i,0]:=5;

        a[i,n+1]:=10;

    end;

    for i:=1 to n do 

        for j:=1 to n do 

            a[i,j]:=0;

    count:=0;

    unlock(arrival);

    lock(departure);

    b:=a;

    forall i:=1 to n do                               δημιουργία διεργασίας για κάθε γραμμή

    var k:integer;

    begin

        for k:=1 to numiter do

        begin

            redblack(i,0);                             κόκκινη φάση

            barrier;

            a[i]:=b[i]; 

            barrier;

            redblack(i,1);                            μαύρη φάση

            barrier;

            a[i]:=b[i];

            barrier;

        end;

    end;

    for i:=1 to n do                                   Εμφάνιση αποτελεσμάτων

    begin

        writeln;

        for j:=1 to n do 

            write(a[i,j]:5:2);

    end;

End. 

2. Με τοπικό φράγμα


program jacobi_2;


    Const 


        n=14;


        numiter=5;                          αριθμός επαναλήψεων


    Var 


        a,b:array[0..n+1,0..n+1] of real;


        i,j:integer;


        higher,lower:array [1..n] of channel of integer;
 
procedure localbarrier(i:integer);


    var                                                          κάθε διεργασία συγχρονίζεται μόνο 


        dummy:integer;                                      με τους 2 άμεσους γείτονές της


    begin                                                  τα 'higher' και 'lower' δηλώνουν την πηγή του μηνύματος


        if i>1 then 

            higher[i-1]:=1;                              αποστολή μηνύματος στη διεργασία i-1

        if i‹n then 

        begin

            lower[i+1]:=1;                              αποστολή μηνύματος στη διεργασία i+1

            dummy:=higher[i];                          λήψη μηνύματος από τη διεργασία i+1

        end;

        if i>1 then 

            dummy:=lower[i];                          λήψη μηνύματος από τη διεργασία i-1

end;
 
procedure redblack(i:integer;rb:integer);

    var 

        j:integer;

    begin

        if (i mod 2)=(0+rb) then                      έλεγχος για κόκκινη / μαύρη φάση

        begin

            j:=1;

            while j<=n do                                  υπολογισμός του μέσου όρου των τεσσάρων

            begin                                              γειτονικών διεργασιών

                b[i,j]:=(a[i-1,j]+a[i+1,j]+a[i,j-1]+a[i,j+1])/4;

                j:=j+2;

            end;

        end

        else

        begin

            j:=2;

            while j<=n do

            begin

                b[i,j]:=(a[i-1,j]+a[i+1,j]+a[i,j-1]+a[i,j+1])/4;

                j:=j+2;

            end;

        end;

end;
 
Begin

    for i:=0 to n+1 do                                  Αρχικοποίηση

    begin

        a[0,i]:=5;a[n+1,i]:=10;

        a[i,0]:=5;a[i,n+1]:=10;

    end;

    for i:=1 to n do 

        for j:=1 to n do 

            a[i,j]:=0;

    b:=a;

    forall i:=1 to n do                                   δημιουργία διεργασίας για κάθε γραμμή

    var k:integer;

    begin

        for k:=1 to numiter do

        begin

            redblack(i,0);

            localbarrier(i);

            a[i]:=b[i]; 

            localbarrier(i);

            redblack(i,1);

            localbarrier(i);

            a[i]:=b[i];

            localbarrier(i);

        end;

    end;

    for i:=1 to n do                                          Εμφάνιση αποτελεσμάτων

    begin

        writeln;

        for j:=1 to n do 

            write(a[i,j]:5:2);

        end;

End. 
 

3. Με έλεγχο σύγκλισης


program jacobi_3;


    Const 


        n=14;


        tolerance=0.5;                          έλεγχος σύγκλισης


    Var 


        a,b:array[0..n+1,0..n+1] of real;


        i,j,count,tc:integer;


        arrival,departure,tcl:spinlock;


        globaldone:boolean;


        higher,lower:array [1..n] of channel of integer;
    
procedure localbarrier(i:integer);


    var                                                  κάθε διεργασία συγχρονίζεται μόνο με τους


        dummy:integer;                                          2 άμεσους γείτονές της


    begin                                          τα 'higher' και 'lower' δηλώνουν την πηγή του μηνύματος


        if i>1 then 

            higher[i-1]:=1;                                          αποστολή μηνύματος στη διεργασία i-1

        if i‹n then 

        begin

            lower[i+1]:=1;                              αποστολή μηνύματος στη διεργασία i+1

            dummy:=higher[i];                          λήψη μηνύματος από τη διεργασία i+1

        end;

        if i>1 then 

            dummy:=lower[i];                              λήψη μηνύματος από τη διεργασία i-1

   end;

function aggregate(mydone:boolean):boolean;

    begin

        lock(arrival);                                                      φάση άφιξης

        count:=count+1;

        globaldone:=globaldone and mydone;                  βήμα σύγκλισης

        if count‹n then 

            unlock(arrival)                                                  συνέχιση της φάσης άφιξης

        else 

            unlock(departure);                                              τερματισμός της φάσης άφιξης

        lock(departure);                                                      φάση αναχώρησης

        count:=count-1;

        aggregate:=globaldone;                                              επιστροφή της σημαίας flag στη διεργασία

        if count>0 then 

            unlock(departure)                                                   συνέχιση της φάσης αναχώρησης

        else 

        begin                                                                              αλλαγή της τιμής της μεταβλητής

            globaldone:=true;                                                      για την επόμενη επανάληψη

            unlock(arrival);                                                          τερματισμός της φάσης αναχώρησης

        end;

    end;

procedure redblack(i:integer;rb:integer;var maxchange:real);

    var 

        j:integer;

        change:real;
 
    begin

         if (i mod 2)=(0+rb) then                                              έλεγχος για κόκκινη / μαύρη φάση

           begin

                j:=1;

                while j<=n do                          υπολογισμών του μέσου όρου των τεσσάρων

                begin                          γειτονικών διεργασιών

                    b[i,j]:=(a[i-1,j]+a[i+1,j]+a[i,j-1]+a[i,j+1])/4;

                    change:=abs(b[i,j]-a[i,j]);

                    if change>maxchange then 

                        maxchange:=change;

                    j:=j+2;

                end;

        end

        else

        begin

            j:=2;

            while j<=n do

            begin

                b[i,j]:=(a[i-1,j]+a[i+1,j]+a[i,j-1]+a[i,j+1])/4;

                change:=abs(b[i,j]-a[i,j]);

                if change>maxchange then 

                    maxchange:=change;

                    j:=j+2;

                end;

            end; 

        end;
 
 
 
Begin
 
        for i:=0 to n+1 do Αρχικοποίηση

        begin

            a[0,i]:=5;a[n+1,i]:=10;

            a[i,0]:=5;a[i,n+1]:=10;

        end;

        for i:=1 to n do 

            for j:=1 to n do 

                a[i,j]:=0;

        tc:=0;

        count:=0;

        unlock(arrival);

        lock(departure);

        globaldone:=true; 

        b:=a;
    
        forall i:=1 to n do δημιουργία διεργασίας για κάθε γραμμή

        var j:integer;

            maxchange:real;

            done:boolean;

        begin

            repeat

                maxchange:=0;

                redblack(i,0,maxchange);

                localbarrier(i);

            a[i]:=b[i]; 

            localbarrier(i);

            redblack(i,1,maxchange);

            localbarrier(i);

            a[i]:=b[i];

            done:=aggregate(maxchange‹tolerance);

           tc:=tc+1; 

        until done;

    end;

    for i:=1 to n do Εμφάνιση αποτελεσμάτων

    begin

        writeln;

        for j:=1 to n do 

            write(a[i,j]:5:2);

  end;

writeln;

writeln; 

write('xreiasthkan');

writeln(tc,' epanalhpseis'); οι επαναλήψεις

End. 

 

2.


program gauss;


    Const 


        n=10;


    Var 


        i,j,line:integer;


        A:array[1..n,1..n]of real;


    procedure maxcol(i:integer;var line:integer); 


        var 


            j:integer;max:real;


        begin


            max:=0;


            line:=0;


            for j:=i to n do


                if abs(a[j,i])>=max then 

                begin

                    max:=abs(a[j,i]); (*βρίσκει το μέγιστο της στήλης*)

                    line:=j;

                end;

            end;

procedure change_ln(i,line:integer);

    var 

        j:integer;
     
    begin

        forall j:=i to n grouping 10 do

        var tmp:real;

        begin (*αλλαγή των γραμμών*)

            tmp:=a[i,j]; 

            a[i,j]:=a[line,j];

            a[line,j]:=tmp;

        end;

    end;

procedure change_col(i:integer;var tmp:real);

    var 

        j,sthlh:integer;
 
    begin

        sthlh:=i;

        tmp:=1;

        if sthlh‹n then 

            repeat

                sthlh:=sthlh+1;

           until (sthlh=n) or (a[i,sthlh]‹>0);

            if a[i,sthlh]‹>0 then

            begin

                forall j:=1 to n grouping 10 do (*αλλαγή στήλης*)

                var t:real;

                begin

                        t:=a[j,i];

                        a[j,i]:=a[j,sthlh]; 

                        a[j,sthlh]:=t;

                end;

                tmp:=a[i,i];

            end;

        end;

procedure g(i:integer);

    var 

        j:integer;

        tmp:real;
    
    begin

        tmp:=a[i,i];

        if tmp=0 then 

            change_col(i,tmp);

   
        forall j:=i to n grouping 10 do (*διαίρεση της γραμμής με το συντελεστή*)

            a[i,j]:=a[i,j]/tmp; (* του στοιχείου περιστροφής*)

        forall j:=i+1 to n grouping 10 do

        var t:real;k:integer;

        begin (*μετατροπή των υπόλοιπων γραμμών*)

            t:=-a[j,i];

            forall k:=i to n grouping 10 do 

                a[j,k]:=a[j,k]+a[i,k]*t;

        end;

    end;
 
Begin

    for i:=1 to n do (* Αρχικοποίηση*)

        for j:=1 to n do 

            a[i,j]:=i+j;

    for i:=1 to n do

    begin

        maxcol(i,line);

        change_ln(i,line);

        g(i);

   end;

    for i:=1 to n do (*Εμφάνιση αποτελεσμάτων*)

    begin

        writeln;

        for j:=1 to n do 

            write(a[i,j]:5:1,' ');

    end;

End.

 


     Next              Up                Back                   Contents

Επόμενο: Ασκήσεις Πάνω:Κεφάλαιο 6o : Σύγχρονος παραλληλισμός Πίσω:Αναφορές