/* Private class Trn_work implements a thread, that processes */ /* one transaction */ /* Public class Trn_serv implements the control thread of */ /* transaction server - it creates limited (N = num. of */ /* processors) threads that process the transactions. */ class Trn_work extends Thread { // thread private int id_num; // number of transaction private int key_from; // key - source private int key_to; // key - destination private int trans_value; // how many private Table tab; // which table public Trn_work (Table par_tab, Message msg, int par_num) { key_from = msg.info[0]; key_to = msg.info[1]; trans_value = msg.info[2]; tab = par_tab; id_num = par_num; tab.inc_state(0); } private void print_trns (int mode) { switch (mode) { case 0: System.out.print("Trns " + id_num + " starts: "); break; case 1: System.out.print("Trns " + id_num + " done: "); break; case 2: System.out.print("Trns " + id_num + " aborted:"); break; } System.out.println(" " + key_from + " " + key_to + " " + trans_value); } public void run () { // run of transaction print_trns(0); yield(); if ( (key_from < 0) || (key_from >= tab.get_n()) || (key_to < 0) || (key_to >= tab.get_n()) || (key_from == key_to) ) { tab.dec_state(0); tab.inc_state(2); print_trns(2); return; } int key_min; int key_max; if (key_from < key_to) { key_min = key_from; key_max = key_to; } else { key_min = key_to; key_max = key_from; } int row1 = tab.find(key_min); if (row1 >= 0) yield(); else { tab.dec_state(0); tab.inc_state(2); print_trns(2); return; } while (tab.test_and_set_lock(row1)) yield(); // active waiting int row2 = tab.find(key_max); if (row2 >= 0) yield(); else { tab.release_lock(row1); tab.dec_state(0); tab.inc_state(2); print_trns(2); return; } while (tab.test_and_set_lock(row2)) yield(); // active waiting int row_sub; int row_add; if (key_from < key_to) { row_sub = row1; row_add = row2; } else { row_sub = row2; row_add = row1; } if (tab.sub(row_sub, trans_value)) yield(); else { // abort tab.release_lock(row2); yield(); tab.release_lock(row1); tab.dec_state(0); tab.inc_state(2); print_trns(2); return; } tab.add(row_add, trans_value); tab.release_lock(row2); yield(); tab.release_lock(row1); tab.dec_state(0); tab.inc_state(1); print_trns(1); } } public class Trn_serv extends Thread { // thread private Table tab; // table reference private Msg_port port; // port reference private final int n; // num of processors private int id_counter; // counter of created transactions public Trn_serv (Table par_tab, Msg_port par_port, int n_par) { tab = par_tab; port = par_port; n=n_par; id_counter=1; } public void run () { // server run Message request; Trn_work worker; int[] state; while (true) { state = tab.get_state(); if (state[0] >= n) { yield(); continue; } request = port.read(); // blocking call if (request.info[0] < 0) break; worker = new Trn_work (tab, request, id_counter); worker.start(); id_counter++; yield(); } while (true) { // waiting for finishing all transactions state = tab.get_state(); if (state[0] > 0) yield(); else break; } } }