/* Firehose-local algorithm by: Dan Bonachea, Christian Bell and Paul Hargrove for deadlock-free, thread-safe management of firehose-local resources using optimistic concurrency control $Revision: 1.3.1.1 $ Known bugs/issues ----------------- 1) To avoid race between unlock-move-lock (keeping table and conduit consistent), we need a TRANSIT state (used for both "in bound" and "out bound" buckets). If the local (non-AM) code hits such a bucket then it must UPYL until it is either pinned or unpinned. In the case that the AM handler encounters one we need a new queue to deffer the the handler's actual work. This queue is serviced soon after the table is updated (except in the handler). WE NEED TO WRITE THE HANDLER CODE EXAMPLE TOO! 2) We need to worry about the E->D state transition since it requires a ++LOnly. To keep FHC_MAXVICTIM_BUCKETS_AVAIL > 0 we may need to UPYL even if we hit, if the bucket we hit is in state E (refc_r > 0, refc_l == 0) and FHC_MAXVICTIM_BUCKETS_AVAIL == 0. I think this is another thing to check for when making our first pass over the bucket table to estimate how many "new" buckets we will need. Note that we already got WaitLocalBucketsToPin correct with respect to the B->E and D->E state transitions which will raise FHC_MAXVICTIM_BUCKETS_AVAIL. */ int fhi_CheckLocalBucketsToPin(int b_num, fhi_RegionPool_t *unpin_p) { int b_remain, b_avail; FH_TABLE_ASSERT_LOCKED; assert(FHC_MAXVICTIM_BUCKETS_AVAIL >= 0); b_avail = MIN(b_num, FHC_MAXVICTIM_BUCKETS_AVAIL); fhc_LocalOnlyBucketsPinned += b_avail; b_remain = b_num - b_avail; if (b_remain == 0) return 0; GASNETI_TRACE_PRINTF(C, ("Firehose Polls Local pinned needs to recover" " %d buckets from FIFO (currently %d buckets)", b_remain, fhc_LocalVictimFifoBuckets)); b_avail = MIN(b_remain, fhc_LocalVictimFifoBuckets); if (b_avail > 0) { int r_freed; firehose_region_t *reg; /* Append to unpin_p */ reg = &(unpin_p->region[unpin_p->regions_num]); /* Adjusts LocalVictimFifoBuckets count */ r_freed = fhi_FreeVictimLocal(b_avail, reg); fhc_LocalVictimFifoBuckets -= b_avail; b_remain -= b_avail; unpin_p->regions_num += r_freed; } assert(FHC_MAXVICTIM_BUCKETS_AVAIL >= 0); return b_remain; } void fhi_WaitLocalBucketsToPin(int b_num, fhi_RegionPool_t *unpin_p) { int b_remain; b_remain = b_num; while ((b_remain = fhi_CheckLocalBucketsToPin(b_remain, unpin_p))) { FH_TABLE_UNLOCK; gasnet_AMPoll(); FH_TABLE_LOCK; } assert(FHC_MAXVICTIM_BUCKETS_AVAIL >= 0); } int fhi_local_da = 0; pthread_cond_t fhi_local_da_cv = PTHREAD_COND_INITIALIZER; void local_pin_foo(uintptr_t addr, size_t len) { /* Yes, I really do mean static here (not useful otherwise): */ static pthread_cond_t fhi_transit_cv = PTHREAD_COND_INITIALIZER; int my_da = 0; int outer_count = 0; int b_total, b_new; int outer_limit, inner_limit; fhi_RegionPool_t *pin_p, *unpin_p; b_total = FH_NUM_BUCKETS(addr, len); assert(b_total <= fhc_MaxVictimBuckets); outer_limit = (gasnete_approx_num_threads() - 1); inner_limit = (b_total / 10); /* ??? */ #if CALLER_HOLDS_TABLE_LOCK FH_TABLE_ASSERT_LOCKED; #else FH_TABLE_LOCK; #endif pin_p = fhi_AllocRegionPool(FH_MIN_REGIONS_FOR_BUCKETS(b_total)); unpin_p = NULL; again: FH_TABLE_ASSERT_LOCKED; outer_count++; if_pf (my_da) { /* We already "own" fhi_local_da, no checks needed */ assert(fhi_local_da); } else if_pf (fhi_local_da) { /* Somebody else "owns" fhi_local_da, wait for them to finish */ do { pthread_cond_wait(&fhi_local_da_cv, &fh_table_lock); } while (fhi_local_da); /* loop until we win the race */ /* Do NOT reset outer_count here - * If 3 threads were contending before fhi_local_da was set, then * we still have 2 threads now and deadlock might still * be a problem. By reseting outer_count at this point * we could delay detection of any remaining deadlock. * Note there might be threads stalled by the da condition that are * not involved in the starvation condition (e.g. pure hits * on remotely-pinned buckets) - but once they leave the above * loop they're guaranteed to grab their hits without rechecking * fhi_local_da, and therefore won't get caught in a second da * stall if one occurs. */ } else if_pf (outer_count > outer_limit) { /* take ownership of fhi_local_da */ my_da = fhi_local_da = 1; /* give others a chance to release resources */ FH_TABLE_UNLOCK; gasnet_AMPoll(); gasneti_sched_yield(); gasnet_AMPoll(); FH_TABLE_LOCK; } /* fhi_LocalTryToAcquire() = update table for all the buckets we need return <0 if we hit one or more TRANSIT buckets otherwise return the number of new buckets needed (that we set to TRANSIT) */ b_new = fhi_LocalTryToAcquire(addr, len, pin_p); if_pt (b_new >= 0) { /* We saw no TRANSITs, and have put b_new into TRANSIT. We may need to acquire as many as b_new firehoses to unpin while still respecting the limit (LocalOnlyPinned <= MaxVictimBuckets). That is done by looping over calls to fhi_CheckLocalBucketsToPin(). */ int inner_count = 0; int b_remain; unpin_p = fhi_AllocRegionPool(b_new); /* fhi_CheckLocalBucketsToPin() - try to find b_new buckets (FIFO victims or unused buckets during startup), fill in unpin list for selected victims, return how many more buckets are still needed. */ b_remain = fhi_CheckLocalBucketsToPin(b_new, unpin_p); if_pf (b_remain) { do { FH_TABLE_UNLOCK; gasnet_AMPoll(); gasneti_sched_yield(); FH_TABLE_LOCK; inner_count++; if_pf (my_da) { /* We already "own" fhi_local_da, no checks needed */ assert(fhi_local_da); } else if_pf (fhi_local_da) { /* Somebody else "owns" fhi_local_da. We must back off and start from scratch. */ UNWIND(addr, len, pin_p, unpin_p, b_remain); fhi_FreeRegionPool(unpin_p); unpin_p = NULL; goto again; /* will recheck for (fhi_local_da != 0) */ } else if_pf (inner_count > inner_limit) { /* take ownership of fhi_local_da */ my_da = fhi_local_da = 1; } } while (b_remain = fhi_CheckLocalBucketsToPin(b_remain, unpin_p)); } /* We have everything we need... commit */ FH_TABLE_UNLOCK; assert(pin_p != NULL); assert(unpin_p != NULL); firehose_move_callback(fh_mynode, unpin_p->regions, unpin_p->regions_num, pin_p->regions, pin_p->regions_num); FH_TABLE_LOCK; fhi_FreeRegionPool(unpin_p); fhi_FreeRegionPool(pin_p); /* Cleanup so others get a chance to make progress too */ Clear_TRANSIT_status(pin_p); pthread_cond_broadcast(&fhi_transit_cv); if_pf (my_da) { assert(fhi_local_da); FH_TABLE_UNLOCK; gasnet_AMPoll(); FH_TABLE_LOCK; fhi_local_da = 0; pthread_cond_broadcast(&fhi_local_da_cv); } Service_AM_Transit_Queue(); } else { /* We saw one of more TRANSIT buckets. We unwind, wait * for them to be completed and then start over at the * begining (for a maximum of outer_limit times before * asserting fhi_local_da. */ UNWIND(addr, len, pin_p, NULL, 0); pthread_cond_wait(&fhi_transit_cv, &fh_table_lock); goto again; /* will recheck for (fhi_local_da != 0) */ } FH_TABLE_ASSERT_LOCKED; #if !CALLER_HOLDS_TABLE_LOCK FH_TABLE_UNLOCK; #endif }