/* Firehose-remote algorithm by: Dan Bonachea, Christian Bell and Paul Hargrove for deadlock-free, thread-safe management of firehose-remote resources using optimistic concurrency control $Revision: 1.3.1.1 $ Known bugs/issues ----------------- * In a few places (where marked) we increment bucket ref counts without checking whether we need to remove it from the FIFO * Race condition: We call destroy_bucket() on victims pulled from the FIFO before we actually send the AM to unpin them, and we release the firehose lock before sending that AM. This means another thread could miss on that same bucket, race ahead of us and send the pin request for the remote page, which would arrive before our unpin request for that page. Once we commit, we need a way to prevent other threads from hitting OR missing on our selected FIFO victims until we've been guaranteed our unpin request has been serviced (or will be before any subsequent pin request) - this probably means blocking access to those buckets until the AMReply comes back for the unpin request - GASNet does not guarantee point-to-point ordering of AM's, and by definition, no conduit with concurrent handler execution will provide it. Perhaps a "pending unpin" state for the FIFO victims? Needs to be worked into all the cases where we might encounter one... - FIX: Need REMOTE_PENDING_UNPIN state (distinct from REMOTE_PENDING). This state must only be created at COMMIT time. If we need to acquire such a bucket we must UPYL until it is gone. */ #define FHI_AVAIL(node) \ ( \ (fhc_RemoteBucketsM - fhc_RemoteBucketsUsed[node]) /* Free energy */ \ + fhc_RemoteVictimFifoBuckets[node] /* FIFO */ \ ) #define UPYL \ do { \ FH_UNLOCK; \ gasnet_AMPoll(); \ gasneti_yield(); /* Shoulds this be GASNET_WAITHOOK? */ \ FH_LOCK; \ } while (0) firehose_bucket_t *consume_one_bucket(int node, fhi_RegionPool_t *unpin_p) { fh_fifoq_t *fifo_head = &fh_RemoteNodeFifo[node]; firehose_bucket_t *bd; if (fhc_RemoteBucketsUsed[node] < fhc_RemoteBucketsM) { /* consume Free Energy */ ++fhc_RemoteBucketsUsed[node]; return; } /* We remove buckets from the FIFO and increment their ref counts, even though they are not for the page we actually want. This is to claim them and avoid double pinning of their page (as could happen if we just removed them from the hash table entirely. We will check later if our ref count of 1 became 2, and release the bucket if it did. */ gasneti_assert(fhc_RemoteVictimFifoBuckets[node] > 0); bd = FH_TAILQ_FIRST(fifo_head); FH_TAILQ_REMOVE(fifo_head, bd); ++(bd->Rrefc); Add_to_rpool(FH_ADDR(bd), unpin_p); fhc_RemoteVictimFifoBuckets[node]--; return bd; } int *da; /* dynamically-allocated array of gasnet_nodes() elements */ /* return count of the number of misses to resolve, or -1 if deadlock detected Note that we can call this from the deadlock resolution case, in which case my_da is 1 on entry. It is 0 otherwise. */ int fhi_EstimateRemoteRequest(int *my_da, int *da_count, node, start, end, ...) { int count; inner_again: count = 0; foreach bucket in [start, end] { bd = lookup_bucket(node, bucket_addr); if (bd == NULL) { /* Miss, count it */ count++; } else if (FH_IS_PENDING(bd) && !FH_IS_COMMITED(bd)) { ++(*da_count); if (*my_da) { /* Nothing to do, we "own" da[node] */ gasneti_assert(da[node]); } else if (da[node]) { /* Somebody else "owns" da[node], so give up */ return -1; } else if (*da_count >= SOME_LIMIT) { *my_da = da[node] = 1; gasneti_membar(); } UPYL; goto inner_again; /* start over */ } else if (FH_IS_REMOTE_FIFO(bd)) { /* Must reclaim from FIFO, so it counts against our limit */ count++; } } return count; } /* easy case: we know sufficient resources are available */ void fhi_PinNoLog(start, end, ...) { int first_pending = 1; foreach bucket in [start, end] { bd = lookup_bucket(node, bucket_addr); if (bd == NULL) { /* Miss */ create_new_bucket(node, bucket_addr, Rrefc=0, Lrefc=Pending/Committed); Add_to_rpool(pin_p, bucket_addr); } else if (IS_PENDING(bd)) { /* hit on PENDING bucket */ gasneti_assert(IS_COMMITTED(bd)); ++(bd->Rrefc); if (first_pending) { hang_on(bd); first_pending = 0; } } else { /* hit on non-pending bucket */ ++(bd->Rrefc); /* XXX: need to check if bucket is currently in FIFO and remove it if so */ } } /* now account for the resources we need: */ /* first claim "Free energy" */ tmp = MIN(n_buckets, fhc_RemoteBucketsM - fhc_RemoteBucketsUsed[node]); n_buckets -= tmp; fhc_RemoteBucketsUsed[node] += tmp; /* second claim needed FIFO buckets * This must come after we increment ref counts to avoid freeing our own hits */ if (n_buckets > 0) { gasneti_assert(n_buckets <= fhc_RemoteVictimFifoBuckets[node]); unpin_p->n_regions = fhi_FreeVictimRemote(node, n_buckets, &(unpin_p->regions)); fhc_RemoteVictimFifoBuckets[node] -= n_buckets; } } /* Get as much as we can, knowing it won't be enough: */ /* returns address of first "unresolved" page */ /* When we return the FIFO is either empty, OR it contains * only buckets that we need to hit on. */ uintptr_t fhi_PinWithLog(node, n_avail, start, end...) { int i; foreach bucket in [start, end] { bd = lookup_bucket(node, bucket_addr); if (bd) { /* hit */ if (IS_PENDING(bd)) { /* must be committed because we EstimateRemoteRequests ensures we never see pending uncommitted buckets on the (only) call to fhi_PinWithLog */ gasneti_assert(IS_COMMITTED(bd)); } ++(bd->Rrefc); /* XXX: need to check if bucket is currently in FIFO and remove it if so */ } else if (n_avail) { /* miss w/ resources still available */ try_this_again: bd = consume_one_bucket(node, unpin_p); if (bd != NULL && FH_BADDR(bd) > bucket_addr && FH_BADDR(bd) < end) { /* Oops, we just reclaimed a bucket that we need to hit on later */ unpin_p->num_buckets--; /* take it off the unpin list */ fhi_release_bucket(bd) /* put it back on the FIFO (at the far end) */ --n_avail; /* Reduce the amount of space we think we have */ if (!n_avail) return bucket_addr; goto try_this_again; } create_new_bucket(node, bucket_addr, Rrefc=0, Lrefc=Pending/UNcommitted); Add_to_rpool(pin_p, bucket_addr); --n_avail; } else { /* We've gone as far as we can w/ available resources */ return bucket_addr; } } gasneti_fatalerror("Reached unreachable code"); } /* Get as much "more" as we can, not knowing it will be enough: */ /* returns address of first "unresolved" page */ /* On success the returned page will be ROUNUP(start+len,PAGESIZE) */ /* Note that we've revalidated our status before entering this function */ /* When we return the FIFO is either empty, OR it contains * only buckets that we need to hit on. */ uintptr_t fhi_PinSomeMore(node, n_avail, saved_addr, end, ...) { int i; foreach bucket in [saved_addr, end] { bd = lookup_bucket(node, bucket_addr); if (bd) { if (IS_PENDING(bd) && !IS_COMMITTED(bd)) { return bucket_addr; } ++(bd->Rrefc); /* XXX: need to check if bucket is currently in FIFO and remove it if so */ } else if (n_avail) { /* miss w/ resources still available */ try_this_again: bd = consume_one_bucket(node, unpin_p); if (FH_BADDR(bd) > bucket_addr && FH_BADDR(bd) < end) { /* Oops, we just reclaimed a bucket that we need to hit on later */ unpin_p->num_buckets--; /* take it off the unpin list */ fhi_release_bucket(bd) /* put it back on the FIFO (at the far end) */ --n_avail; /* Reduce the amount of space we think we have */ if (!n_avail) return bucket_addr; goto try_this_again; } create_new_bucket(node, bucket_addr, Rrefc=0, Lrefc=Pending/UNcommitted); Add_to_rpool(pin_p, bucket_addr); --n_avail; } else { /* We've gone as far as we can w/ available resources */ return bucket_addr; } } /* If we get here we've got enough resources to finish! */ /* Indicate success */ return end + 1; } /* return zero if any of the FIFO buckets we claimed have been claimed by others as well. (We planned to unpin them, but can't if another thread needs them). */ int fhi_RevalidateResources(node, unpin_p) { foreach bucket in unpin_p { bd = lookup_bucket(node, bucket_addr); gasneti_assert(bd != NULL); gasneti_assert(bd->Rrefc > 0); if (bd->Rrefc != 1) { return 0; } } return 1; } /* XXX: Not yet discussed as a group */ void fhi_rollback(node, start, saved_addr, pin_p, unpin_p, ...) { int pin_count, unpin_count; for each bucket in [start, saved_addr] { bd = lookup_bucket(node, bucket_addr); gasneti_assert(bd != NULL); if (IS_PENDING(bd) && !IS_COMMITTED(bd)) { /* we must have created this pending uncommitted bucket, because it falls within the "handled" range */ destroy_bucket(bd); } else { /* decrement Rrefc and perhaps send to FIFO */ bucket_release_remote(bd); } } /* Restore "free energy" */ fhc_RemoteBucketsUsed[node] += (pin_p->n_buckets - unpin_p->n_buckets); } /* XXX: Not yet discussed as a group */ void fhi_commit(node, start, end, pin_p, unpin_p,...) { /* * Commit the pending buckets we created and hang on the * first pending bucket in the range */ int first_pending = 1; foreach bucket in [start, end] { bd = lookup_bucket(node, bucket_addr); gasneti_assert(bd != NULL); if (!IS_COMMITED(bd)) { SET_PENDING_COMMITTED(bd); } if (IS_PENDING(bd) && first_pending) { hang_on(bd); first_pending = 0; } } /* Destroy the FIFO buckets we were holding on to */ foreach bucket in unpin_p { bd = bucket_lookup(node, bucket_addr); gasneti_assert(bd != NULL); gasneti_assert(bd->Rrefc == 1); destroy_bucket(bd); /* DOB: there is a race condition here (see above) */ } } void fhi_SomePartOfRemotePin(...) { int da_count = 0; int my_da = 0; int n_buckets, n_avail; uintptr_t end = start + (len - 1); uintptr_t saved_addr; FH_LOCK; outer_again: while(da[node]) { UPYL; } /* stall for deadlock to end */ n_buckets = fhi_EstimateRemoteRequest(&my_da, &da_count, node, start, end,...); if (my_da) { gasneti_assert(da[node]); goto won_da; } /* this thread asserted da[node] */ if (n_buckets < 0) goto outer_again; /* other thread asserted da[node] */ n_avail = FHI_AVAIL(node); if (n_buckets <= n_avail) { fhi_PinNoLog(start, end, ...); goto done; /* ready to pin */ } else { saved_addr = fhi_PinWithLog(node, n_avail, start, end, ...); gasneti_assert(saved_addr < end); while (da_count < SOME_LIMIT) { da_count++; if_pf(da[node]) { gasneti_assert(!my_da); fhi_rollback(node, start, saved_addr, pin_p, unpin_p...); /* Did we want to reset da_count to 0? */ goto outer_again; } /* hope more resources become available */ UPYL; n_avail = FHI_AVAIL(node); /* Check that we didn't lose anything */ if (!fhi_RevalidateResources(node, unpin_p)) { /* XXX: room for optimization here where we could try to replace lost buckets with some from n_avail */ fhi_rollback(node, start, saved_addr, pin_p, unpin_p...); goto outer_again; } saved_addr = fhi_PinSomeMore(node, n_avail, saved_addr, end, ...); if (saved_addr == (end+1)) { /* note "> end" is not overflow-safe */ fhi_commit(node, start, saved_addr, pin_p, unpin_p...); goto done; } } /* da_count is too high now */ fhi_rollback(node, start, saved_addr, pin_p, unpin_p...); if (!da[node]) { /* WIN */ my_da = da[node] = 1; won_da: /* Wait for sufficient resources: */ do { UPYL; } while (fhi_EstimateRemoteRequest(&my_da, &da_count, node, start, end,...) > FHI_AVAIL(node)); fhi_PinNoLog(start, end, ...); da[node] = 0; goto done; } else { /* LOST */ FH_UNLOCK; goto outer_again; } } done: FH_UNLOCK; if (there were any misses) { SendAMRequest(...); } else { run request completion handler now.. } return; }