#ifndef __monitor_cc #define __monitor_cc #include "lapi_comm.h" #include "monitor.h" #include swclock_handle_t mytime; #define TIME swclockReadSec(mytime) /******************************* The Titanium monitor model designates a home process for each monitor, which is the process that manages enter and exit requests. On the sp2clump backend, we allow any thread on the home process' node to perform this duty. Pseudocodish overview: monitor_enter() [Client wants to enter a monitor served by some node.] if (monitor home is on the local node) monitor_enter_local() else send message to monitor home asking for access spin until we get access monitor_enter_local() [Client wants to enter a monitor served locally] if (monitor is free) get it and return else put ourselves on the waiting list spin until we get access monitor_enter_ack_handler() [Home node gets request for monitor access] if (monitor is free) tell the requestor to go ahead else if (the requestor already holds the lock) [must be a recursive call; let the requestor get the monitor again.] track the recursive nesting depth tell the requestor to go ahead else put the requestor on the waiting list monitor_exit [Client wants to exit a monitor] if (monitor is homed locally) if (there is someone waiting) pull that someone off the waiting list wake him/her up. else send a message to the monitor's home asking to exit. wait until the message is handled. monitor_exit_req_handler() [Home node gets request for exit] if (there is someone waiting) pull that someone off the waiting list wake him/her up. Intra-node race conditions in monitor updates are prevented by the use of mutexes. Waiting is performed by a spinloop with message queue polls. */ char **mon_enter_req_table, **mon_enter_ack_table, **mon_exit_req_table; /* Holds all the spinlocks for each thread on a single node. A thread can only be waiting for one monitor at a time, so this is more efficient than having local-scope spin counters and passing them around. */ volatile static int *monitorSpins; void *monitor_enter_req_handler(lapi_handle_t *lapi_hndl_p, void *uhdr, int *uhdr_len, int *msg_len, compl_hndlr_t **comp_h, void **user_info); void *monitor_enter_ack_handler(lapi_handle_t *lapi_hndl_p, void *uhdr, int *uhdr_len, int *msg_len, compl_hndlr_t **comp_h, void **user_info); void *monitor_exit_req_handler(lapi_handle_t *lapi_hndl_p, void *uhdr, int *uhdr_len, int *msg_len, compl_hndlr_t **comp_h, void **user_info); void monitor_enter_local(titanium_monitor_t *mon); static pthread_mutex_t monlock; #define MONLOCK while (pthread_mutex_trylock(&monlock)) {} #define MONUNLOCK pthread_mutex_unlock(&monlock); /* Sets up the LAPI AM handlers. Do this before spawning threads.*/ void monitor_system_init() { int i; mon_enter_req_table = (char **)malloc(sizeof(char *) * NODES); mon_enter_ack_table = (char **)malloc(sizeof(char *) * NODES); mon_exit_req_table = (char **)malloc(sizeof(char *) * NODES); /* Perform exchanges to get the handler addresses on each proc. */ LAPI_SAFE(LAPI_Address_init(lapi_hndl, (void *)&monitor_enter_req_handler, (void **)mon_enter_req_table)); LAPI_SAFE(LAPI_Address_init(lapi_hndl, (void *)&monitor_enter_ack_handler, (void **)mon_enter_ack_table)); LAPI_SAFE(LAPI_Address_init(lapi_hndl, (void *)&monitor_exit_req_handler, (void **)mon_exit_req_table)); monitorSpins = (int *)ti_malloc(sizeof(int) * THREADS_PER_NODE); for (i = 0; i < THREADS_PER_NODE; i++) { monitorSpins[i] = 0; } pthread_mutex_init(&monlock, NULL); mytime = swclockInit(); } void monitor_init(titanium_monitor_t *mon) { mon->who = -1; mon->nesting = 0; mon->sleepingProcs = NULL; mon->waitingProcs = NULL; /* no processes waiting to enter the monitor */ pthread_mutex_init(&mon->lock, NULL); } void monitor_destroy(titanium_monitor_t * monitor) { waiting_proc *temp; if (monitor != NULL) { /* Hopefully, the monitor will never be deallocated before */ /* replies to all processes waiting to enter the monitor are sent. */ /* However, this is done just in case this situation arises. */ while (monitor->waitingProcs != NULL) { temp = monitor->waitingProcs; monitor->waitingProcs = monitor->waitingProcs->next; temp->next = NULL; ti_free(temp); } while (monitor->sleepingProcs != NULL) { temp = monitor->sleepingProcs; monitor->sleepingProcs = monitor->sleepingProcs->next; temp->next = NULL; ti_free(temp); } } } /* A thread calls monitor_enter when it wants to lock a monitor, and passes in a pointer to a titanium_monitor_t and the TPID (Ti Process ID) of the home of the monitor - a global pointer, in other words. If the home is on the local node, the thread attempts to lock the monitor itself; otherwise it contacts the home node. */ void monitor_enter(jGPointer pMon) { titanium_monitor_t *mon; int proc; lapi_cntr_t lapi_cntr; mon_req_msg msg; proc = TO_PROC(pMon); mon = (titanium_monitor_t *)TO_LOCAL(pMon); /* Local? */ if (NODE_MAP(proc) == MYNODE) { monitor_enter_local(mon); return; } /* No? Send a message to the monitor's home asking politely for access. */ msg.monitor = mon; msg.req_proc = MYPROC; MONLOCK; monitorSpins[MYTHREAD] = 0; LAPI_Amsend(lapi_hndl, NODE_MAP(proc), mon_enter_req_table[NODE_MAP(proc)], &msg, sizeof(msg), NULL, NULL, NULL, NULL, NULL); MONUNLOCK; LAPI_WAIT(monitorSpins[MYTHREAD]); } /* Attempt to enter a monitor whose home is on the local node. */ void monitor_enter_local(titanium_monitor_t *mon) { monitorSpins[MYTHREAD] = 0; while (pthread_mutex_trylock(&mon->lock)) {} /* If the monitor is free, let's get it. */ if (mon->nesting == 0) { mon->who = MYPROC; mon->nesting = 1; pthread_mutex_unlock(&mon->lock); return; } else { /* Allow ourselves to recursively enter the lock. */ if (mon->who == MYPROC) { mon->nesting++; pthread_mutex_unlock(&mon->lock); return; } else { /* Put ourselves on the waiting list. */ waiting_proc *proc = (waiting_proc *)ti_malloc(sizeof(waiting_proc)); proc->proc = MYPROC; proc->next = NULL; if (mon->waitingProcs == NULL) { mon->waitingProcs = proc; } else { waiting_proc *temp = mon->waitingProcs; while (temp->next != NULL) { temp = temp->next; } temp->next = proc; } pthread_mutex_unlock(&mon->lock); LAPI_WAIT(monitorSpins[MYTHREAD]); return; } } } /* Completion handler for monitor_enter_req_handler. This is called if the monitor that the requestor wanted is currently free. The routine sends a reply back telling the requestor to go ahead. */ void monitor_send_go_ahead(lapi_handle_t *lapi_hndl_p, void *params) { mon_req_msg *reqMsg = (mon_req_msg *)params; /* If the requestor was local we don't need to send a message, of course. */ if (NODE_MAP(reqMsg->req_proc) == MYNODE) { monitorSpins[TO_THREAD(reqMsg->req_proc)] = 1; } else { MONLOCK; LAPI_Amsend(*lapi_hndl_p, NODE_MAP(reqMsg->req_proc), mon_enter_ack_table[NODE_MAP(reqMsg->req_proc)], reqMsg, sizeof(mon_req_msg), NULL, 0, NULL, NULL, NULL); MONUNLOCK; } } /* One of the threads on this node (replyMsg->req_proc) can go ahead. Wake it up. */ void *monitor_enter_ack_handler(lapi_handle_t *lapi_hndl_p, void *uhdr, int *uhdr_len, int *msg_len, compl_hndlr_t **comp_h, void **user_info) { mon_req_msg *replyMsg = (mon_req_msg *)uhdr; monitorSpins[TO_THREAD(replyMsg->req_proc)] = 1; *comp_h = NULL; return NULL; } void *monitor_enter_req_handler(lapi_handle_t *lapi_hndl_p, void *uhdr, int *uhdr_len, int *msg_len, compl_hndlr_t **comp_h, void **user_info) { mon_req_msg *reqMsg = (mon_req_msg *)uhdr; titanium_monitor_t *mon = reqMsg->monitor; /* We're locking inside of a message handler. This lock controls monitor updates, not monitor accesses, so the waiting time should be quick. Replace with a spinlock if this holds up message progress. */ while (pthread_mutex_trylock(&mon->lock)) {} if (mon->nesting == 0) { mon->who = reqMsg->req_proc; mon->nesting = 1; /* If the monitor is free right now, tell the requestor to go ahead. */ /* We transfer control to the completion handler by filling in the appropriate values in the header handler's reference parms. Note that we need to explicitly save the header information so that the comp handler gets it; the header data we have is tossed after this function returns. This disconnect between the header and completion handlers is one of the more annoying "features" of the LAPI design... */ *comp_h = &monitor_send_go_ahead; mon->compBuf = *reqMsg; *user_info = (void *)&(mon->compBuf); pthread_mutex_unlock(&mon->lock); return NULL; } else { if (mon->who == reqMsg->req_proc) { /* If the requestor already holds the lock, allow it to get it again. (recursive nesting semantics) */ mon->nesting++; *comp_h = &monitor_send_go_ahead; mon->compBuf = *reqMsg; *user_info = (void *)&(mon->compBuf); pthread_mutex_unlock(&mon->lock); return NULL; } else { /* Else put the requestor on the list of waiters. */ waiting_proc *proc = (waiting_proc *)ti_malloc(sizeof(waiting_proc)); proc->proc = reqMsg->req_proc; proc->next = NULL; if (mon->waitingProcs == NULL) { mon->waitingProcs = proc; } else { waiting_proc *temp = mon->waitingProcs; while (temp->next != NULL) { temp = temp->next; } temp->next = proc; } *comp_h = NULL; *user_info = NULL; pthread_mutex_unlock(&mon->lock); return NULL; } } } /* Called when exiting a monitor. */ void monitor_exit(jGPointer pMon) { mon_req_msg msg; lapi_cntr_t lapi_cntr; titanium_monitor_t *mon; int proc; proc = TO_PROC(pMon); mon = (titanium_monitor_t *)TO_LOCAL(pMon); /* If the home of the monitor is local, update the monitor and wake up the next waiter. */ if (NODE_MAP(proc) == MYNODE) { while (pthread_mutex_trylock(&mon->lock)) {} assert((mon->who == MYPROC) && (mon->nesting > 0)); /* we hold mon and it is active */ mon->nesting--; if (mon->nesting == 0) { if (mon->waitingProcs != NULL) { /* Pull the next waiter off the list. */ waiting_proc *temp = mon->waitingProcs; mon->who = temp->proc; mon->waitingProcs = mon->waitingProcs->next; temp->next = NULL; ti_free(temp); if (mon->nesting < 0) { /* Used for timed locks, which aren't implemented. */ printf("%f\t%d: Illegal monitor nesting level.\n", TIME, MYPROC); } else { /* Wake up the next waiter. */ assert(mon->who != MYPROC); /* Should be nested, not waiting. */ mon->nesting = 1; /* The home locks the monitor, not the requestor. */ if (NODE_MAP(mon->who) == MYNODE) { pthread_mutex_unlock(&mon->lock); monitorSpins[TO_THREAD(mon->who)] = 1; } else { msg.monitor = mon; msg.req_proc = mon->who; pthread_mutex_unlock(&mon->lock); MONLOCK; LAPI_Amsend(lapi_hndl, NODE_MAP(mon->who), mon_enter_ack_table[NODE_MAP(mon->who)], &msg, sizeof(msg), NULL, 0, NULL, NULL, NULL); MONUNLOCK; } } } else { mon->who = -1; pthread_mutex_unlock(&mon->lock); } } else { pthread_mutex_unlock(&mon->lock); } return; } msg.monitor = mon; msg.req_proc = MYPROC; LAPI_Setcntr(lapi_hndl, &lapi_cntr, 0); MONLOCK; LAPI_Amsend(lapi_hndl, NODE_MAP(proc), mon_exit_req_table[NODE_MAP(proc)], &msg, sizeof(msg), NULL, 0, NULL, NULL, &lapi_cntr); MONUNLOCK; LAPI_WAIT(lapi_cntr); } /* A process in a monitor that we own wants to exit. */ void *monitor_exit_req_handler(lapi_handle_t *lapi_hndl_p, void *uhdr, int *uhdr_len, int *msg_len, compl_hndlr_t **comp_h, void **user_info) { mon_req_msg *reqMsg = (mon_req_msg *)uhdr; int reqProc = reqMsg->req_proc; titanium_monitor_t *mon = reqMsg->monitor; while (pthread_mutex_trylock(&mon->lock)) {} assert((mon->who == reqProc) && (mon->nesting > 0)); mon->nesting--; /* mon->nesting > 0 here if the monitor entry was recursive - we need to exit out all the way before waking anyone else up. */ if (mon->nesting == 0) { if (mon->waitingProcs != NULL) { /* Pull the next waiter off the list. */ waiting_proc *temp = mon->waitingProcs; mon->who = temp->proc; mon->waitingProcs = mon->waitingProcs->next; mon->nesting = 1; /* The home locks the monitor, not the requestor. */ temp->next = NULL; ti_free(temp); /* Call the completion handler to send the wake-up msg to the next waiter. */ reqMsg->req_proc = mon->who; *comp_h = &monitor_send_go_ahead; mon->compBuf = *reqMsg; *user_info = (void *)&(mon->compBuf); pthread_mutex_unlock(&mon->lock); return NULL; } else { mon->who = -1; } } *comp_h = NULL; *user_info = NULL; pthread_mutex_unlock(&mon->lock); return NULL; } /* doesn't use waitTime for now */ void monitor_wait(titanium_monitor_t *monitor, jlong waitTime, int proc) { } void monitor_wait_request_handler(void *token, int obj, int reqProc) { } void monitor_wait_reply_handler(void *token, int value, int handler, int proc) { } void monitor_wait_succeeded_handler(void *token) { } void monitor_notify(titanium_monitor_t *monitor, int proc) { } void monitor_notify_request_handler(void *token, int obj, int reqProc) { } void monitor_notify_reply_handler(void *token, int value) { } void monitor_notify_all(titanium_monitor_t *monitor, int proc) { } void monitor_notify_all_request_handler(void *token, int obj, int reqProc) { } void monitor_notify_all_reply_handler(void *token, int value) { } #endif