// Module of monitor which can repeatedly make sum of // a given vector which contains items of the double type // Internal private class describing "piece of work" class Cl_part_work { public double[] data; // reference to data vector public int beg_of_part; // initial index of vector part public int end_of_part; // first index behind the part public Cl_part_work (double[] par_data, int par_beg, int par_end) { data = par_data; beg_of_part = par_beg; end_of_part = par_end; } public boolean empty () { // returns true for empty work if (beg_of_part == end_of_part) return true; else return false; } } // public class - monitor public class Cl_parop { // inner class for worker threads private class Cl_worker extends Thread { private Cl_parop monitor_ref; // reference to monitor instance private int my_id; // num. of worker private double loc_result; // local result public Cl_worker (Cl_parop par_monitor_ref, int id) { monitor_ref = par_monitor_ref; my_id = id; } public void run () { // worker activity //System.out.println("Run worker "+my_id); Cl_part_work part; while (monitor_ref.wait_for_work()) { // blocking loc_result = 0; do { // worker requests a piece of work part = monitor_ref.get_work(); if (part.empty() == false) { for (int i = part.beg_of_part; i < part.end_of_part; i++) loc_result += part.data[i]; System.out.println(my_id + " " + loc_result + " " + (part.end_of_part - part.beg_of_part)); yield(); } } while (part.empty() == false); //System.out.println("Worker " + my_id + " local result = "+loc_result); // update global result monitor_ref.inc(loc_result); } System.out.println("worker " + my_id + " terminated"); } } // monitor's data members private double[] data; // ref. to data vector private int n; // num of workers private int barrier_cnt; // counter of sync. barrier private int step; // vector part length private double op_result; // result of operation private int next_index; // first index of first free part private boolean is_work; // flag - there is still // something to do private boolean stop_workers; // flag to finish workers private Cl_worker[] worker; // references to worker threads // monitor's public methods - called from clients // constructor public Cl_parop (int n_proc, int par_step) { n = n_proc; step = par_step; is_work = false; stop_workers = false; barrier_cnt = 0; worker = new Cl_worker[n]; for (int i = 0; i < n; i++) { worker[i] = new Cl_worker (this, i); worker[i].start(); } } // operation public synchronized double op (double[] v) { data = v; // data setup for operation op_result = 0; next_index = 0; barrier_cnt = 0; is_work = true; notifyAll(); // it awakes waiting workers while (is_work) try { // wait for final result wait (); } catch (InterruptedException e) {}; return op_result; } // method to stop workers - use it carefully - monitor can't than be used anyway !!! public synchronized int stop () { stop_workers = true; notifyAll(); return 0; } // private methods - called from workers // method blocks workers until some work is ready private synchronized boolean wait_for_work() { while ((!is_work)&&(!stop_workers)) try { wait (); } catch (InterruptedException e) {}; if (stop_workers) { return false; // workers end their working loop } else { return true; // worker has something to do } } // method returns a descriptor of vector part to be processed private synchronized Cl_part_work get_work () { Cl_part_work part; int end_of_part = next_index + step; if (end_of_part >= data.length) { end_of_part = data.length; } part = new Cl_part_work (data, next_index, end_of_part); next_index = end_of_part; return part; } // method updates global result by adding a local result, private synchronized void inc (double par_result) { op_result += par_result; barrier_cnt++; while (barrier_cnt != n) { // while don't enter every worker inc() try { // sleep until everybody send its work wait (); } catch (InterruptedException e) {}; } is_work = false; // so there is no work left notifyAll(); // wake everybody to check status } }