/* * * DECthreads example program * - monitor making sum of vector with items of double type, * - main program using the monitor services * */ #include #include #include #include #include /* function declaration */ void* worker_run (void* arg); /* constants */ #define N_MAX 10 /* max. num of workers */ #define N 4 /* num. of workers */ #define M 1000 /* size of vector */ /* macro implementing an exception */ #define check(status,string) if (status != 0) { \ errno = status; \ err_state = status; \ initialized = 0; \ fprintf (stderr, "%s status %d: %s\n", string, status, strerror(status)); \ return ((void*) status); \ } /* monitor data - simple type items */ int n; /* number of workers */ double* p_data; /* pointer to the vector to work with */ long int data_length; /* length of the vector to work with */ long int next_index; /* initial index of the first free part */ double op_result; /* result of the sum operation */ long int step; /* the size of vector part */ int free_work; /* flag - there is still a free part of vector */ int result_ready; /* flag - set up by the worker which finished all the operation */ int op_command; /* flag - reset by the worker which released the barrier */ int barrier_cnt; /* counter of workers waiting at sync. barrier */ int initialized = 0; /* state variable - monitor initialized */ int err_state = -110; /* code of the error state */ /* monitor data - pthread built-in types items */ pthread_mutex_t data_lock = PTHREAD_MUTEX_INITIALIZER; /* Mutex for shared data (next_index, op_result) change */ pthread_cond_t cond_command = PTHREAD_COND_INITIALIZER; /* Condition variable for start of workers */ pthread_mutex_t command_lock = PTHREAD_MUTEX_INITIALIZER; /* Mutex used for cond_command */ pthread_cond_t cond_ready = PTHREAD_COND_INITIALIZER; /* Condition var for end of main program waiting */ pthread_mutex_t ready_lock = PTHREAD_MUTEX_INITIALIZER; /* Mutex used for cond_ready */ pthread_cond_t cond_barrier = PTHREAD_COND_INITIALIZER; /* Cond.var. for workers sync. at the barrier - end of operation */ pthread_mutex_t barrier_lock = PTHREAD_MUTEX_INITIALIZER; /* Mutex used for cond_barrier */ pthread_t workers[N_MAX]; /* Array of worker threads */ /* monitor functions */ /* public function to initialize the monitor */ void* sum_ini (int par_n, long int par_step) { int i; int status; if ((!initialized) && (err_state==-110) && (par_n <= N_MAX)) { initialized = 1; n = par_n; step = par_step; barrier_cnt = 0; for (i = 0; i < n; i++) { status = pthread_create ( &workers[i], NULL, worker_run, (void*)i ); check (status, "Pthread_create"); printf("thread %d created\n",i); } err_state = 0; return ((void*) 0); } else { status = -111; check (status, "Monitor_init"); } } /* public function to stop the monitor */ void* sum_stop (void) { int i; int status = 0; void* exit_value; if (initialized) { /* change of workers loop predicate */ initialized = 0; /* start of workers */ free_work = 0; op_command = 1; status = pthread_cond_broadcast (&cond_command); /* waiting for disposing of workers */ for (i = 0; i < n; i++) { status = pthread_join (workers[i], &exit_value); check (status, "stop_pthread_join"); if (exit_value == (void*)i) printf ("Thread %d terminated normally\n",i); } return ((void*) 0); } else { status = -113; check (status, "Monitor_stop"); } } /* public function - to change the "piece of work" */ long int change_step (long int par_step) { long int tmp_step = step; step = par_step; return (tmp_step); } /* public function - the sum operation */ void* sum_op (double* par_p_data, long int par_length, double* p_result) { int i; int status; if ((initialized) && (par_p_data != NULL) && (par_length > 0)) { /* data setup */ p_data = par_p_data; data_length = par_length; next_index = 0; result_ready = 0; op_result = 0.0; free_work = 1; op_command = 1; /* start of workers */ status = pthread_cond_broadcast (&cond_command); check (status, "sum_cond_command_broadcast"); /* waiting for end of operation */ status = pthread_mutex_lock (&ready_lock); check (status, "sum_ready_lock"); while (!result_ready) { status = pthread_cond_wait (&cond_ready, &ready_lock); check (status, "sum_cond_ready"); } status = pthread_mutex_unlock (&ready_lock); check (status, "sum_ready_unlock"); *p_result = op_result; return 0; } else { status = -114; check (status, "Monitor_sum_op"); } } /* Worker thread program routine */ void* worker_run (void* arg) { int my_number; /* Worker thread identifier */ int status; /* Hold status from pthread calls */ long int ini_index; /* initial index of a part */ long int end_index; /* first index behind the part */ double loc_result; /* hold sum of the vector part */ long int j,k; /* loop counters */ my_number = (int)arg; while (initialized) { /* one operation */ status = pthread_mutex_lock (&command_lock); check (status, "Worker_command_lock"); while (!op_command) { status = pthread_cond_wait (&cond_command, &command_lock); check (status, "Worker_cond_command"); } status = pthread_mutex_unlock (&command_lock); check (status, "Worker_command_unlock"); while (free_work) { /* one piece of work */ /* critical section - change of next_free index */ status = pthread_mutex_lock (&data_lock); check (status, "worker_data_lock"); ini_index = next_index; next_index += step; status = pthread_mutex_unlock (&data_lock); check (status, "worker_data_unlock"); /* check of work statement validity */ if (ini_index >= data_length) { free_work = 0; break; /* no work */ } else { loc_result = 0.0; end_index = ini_index + step; } /* last part of the vector */ if (end_index > data_length) { end_index = data_length; free_work = 0; } /* sum operation - intentionally time consuming */ for (k = 0; k < 10000; k++){ loc_result = 0.0; for (j = ini_index; j < end_index; j++) { loc_result += p_data[j]; } } /* sum op. - normal, but not used for (j = ini_index; j < end_index; j++) loc_result += p_data[j]; */ /* critical section - change of global result value */ status = pthread_mutex_lock (&data_lock); check (status, "Worker_data_lock"); op_result += loc_result; status = pthread_mutex_unlock (&data_lock); check (status, "Worker_data_unlock"); pthread_testcancel(); /* debug message */ printf("%d %f\n\r", my_number, loc_result); } /* while (free_work) */ /* workers synchronization at the end of operation */ status = pthread_mutex_lock (&barrier_lock); check (status, "Barrier_lock"); barrier_cnt++; if (barrier_cnt < N) { /* execute all but the last */ status = pthread_cond_wait (&cond_barrier, &barrier_lock); check (status, "Worker_cond_barrier_wait"); } else { /* executes only the last worker approaching barrier */ op_command = 0; status = pthread_cond_broadcast (&cond_barrier); check (status, "Worker_cond_barrier_broadcast"); } /* all workers */ barrier_cnt--; if (barrier_cnt==0) { /* only the last worker leaving barrier */ /* start of waiting main program thread */ status = pthread_mutex_lock (&ready_lock); check (status, "Worker_ready_lock"); result_ready = 1; status = pthread_cond_signal (&cond_ready); check (status, "Worker_cond_ready_signal"); status = pthread_mutex_unlock (&ready_lock); check (status, "Worker_ready_unlock"); } /* all workers have to release the lock */ status = pthread_mutex_unlock (&barrier_lock); check (status, "Barrier_unlock"); } /* while(initialized) */ return (arg); } /* main program - it is located in the same module as the monitor (for simplicity) - in a more realistic application it should be written as a separate module */ main() { int i; int status; double vector[M]; double result; for (i = 0; i < M; i++) vector[i] = (double) i; status = (int)sum_ini (N, 111); printf ("monitor_init_status = %d\n\r", status); status = (int)sum_op (vector, M, &result); printf ("monitor_op_status = %d\n\r", status); printf ("sum_result = %f\n\r", result); change_step (77); status = (int)sum_op (vector, M, &result); printf ("monitor_op_status = %d\n\r", status); printf ("sum_result = %f\n\r", result); status = (int)sum_stop (); printf ("monitor_stop_status = %d\n\r", status); }