/* comm_utils.c - miscellaneous utilities */ /* see copyright.txt for usage terms */ #define _LARGEFILE64_SOURCE 1 #define _LARGEFILE_SOURCE 1 #define TI_NO_SRCPOS #include #include #include #include #include #include #include #include #include /* get the sysconf constant to retrieve number of processors */ #if defined(__sgi) #define _SC_NPROCESSORS_ONLN _SC_NPROC_ONLN #elif defined(CRAYX1) #define _SC_NPROCESSORS_ONLN _SC_NPROC_ONLN #elif defined(_CRAY) #define _SC_NPROCESSORS_ONLN _SC_CRAY_MAXPES #elif defined(__APPLE__) #include #include #endif #ifdef REXEC #include /* gethostbyname() */ #include #include #include #endif #include static int init = 0; int COMM_init_box_const_complete = 0; static int uniqueID = 0; julong gpid = 0; static boxInfo *boxes = NULL; static int numBoxes = 0; static boxInfo *myBox = NULL; static processInfo *processes = NULL; static int numProcesses = 0; extern int usingAMSPMD; /* This returns how much memory is needed for the box array */ int COMM_Init_One() { #if defined(COMM_AMLAPI) /* Init the LAPI system and get some process-local values. */ AMLAPI_LAPI_Init (); do { lapi_handle_t *lapi_hndl = AMLAPI_GetLAPIHandle (); /* Get My LAPI endpoint */ LAPI_Qenv(*lapi_hndl, NUM_TASKS, &numBoxes); if (!numBoxes) { /* Qenv(NUM_TASKS) acts funny. */ numBoxes = 1; } #if 0 /* Turn off error checking for performance. */ LAPI_Senv(*lapi_hndl, ERROR_CHK, 0); /* Use polling instead of interrupts. */ LAPI_Senv(*lapi_hndl, INTERRUPT_SET, 0); #endif } while (FALSE); #else /* !COMM_AMLAPI */ if (usingAMSPMD) { #if defined(COMM_AMUDP) numBoxes = AMUDP_SPMDNumProcs(); #elif defined(COMM_AMMPI) numBoxes = AMMPI_SPMDNumProcs(); #else abort(); #endif } else { char *variable = NULL; #if defined(COMM_GASNET) numBoxes = gasnet_nodes(); #elif defined(REXEC) variable = getenv("REXEC_PAR_DEGREE"); if (variable == NULL) { printf("Environment variable REXEC_PAR_DEGREE not defined.\nThis program must be run with rexec and the Titanium run script!\n"); exit(1); } numBoxes = atol(variable); #elif defined(GLUNIX) numBoxes = Glib_GetParallelDegree(); if (numBoxes < 0) { perror("Glib_GetParallelDegree"); exit(1); } #else numBoxes = 1; #endif } #endif /* COMM_AMLAPI */ init = 1; return numBoxes * sizeof(boxInfo); } /* copy the hostname and shorten it, if appropriate */ static void shortenHostname(const char *hostname, char *shorthostname) { strcpy(shorthostname, hostname); if (strchr(shorthostname,'.') && !isdigit(shorthostname[strlen(shorthostname)-1])) *strchr(shorthostname,'.') = '\0'; } /* This returns how much memory is needed for the processor array */ int COMM_Init_Two(char *memory) { int i = 0, j; char hostname[MAXHOSTNAMELEN]; char shorthostname[MAXHOSTNAMELEN]; struct hostent *he = NULL; char *processesPerProcessors; boxes = (boxInfo *)memory; #if defined(REXEC) if (!usingAMSPMD) { char *envhosts = getenv("REXEC_SVRS"); if (envhosts == NULL) { printf("Environment variable REXEC_SVRS not defined.\nThis program must be run with rexec and the Titanium run script!\n"); exit(1); } { /* make a copy so we can strtok it */ char *temp = (char *)ti_malloc(strlen(envhosts)+2); strcpy(temp, envhosts); envhosts = temp; } boxes[0].DNSname = strtok(envhosts, " \t\n"); for(i = 1; i < numBoxes && (boxes[i].DNSname = strtok(NULL, " \t\n")); i++) ; } #endif #if defined(PTHREAD) || defined(THREAD) processesPerProcessors = (char *)getenvMaster("TI_THREADS"); if (processesPerProcessors == NULL) /* support deprecated usage */ processesPerProcessors = (char *)getenvMaster("TI_PFORP"); else if (getenvMaster("TI_PFORP")) fprintf(stderr, "Warning: TI_PFORP ignored in favor of TI_THREADS\n"); if (processesPerProcessors == NULL) { printf("Environment variable TI_THREADS not defined.\nThis program should be run with the Titanium run script!\n"); exit(1); } { /* make a copy so we can strtok it */ char *temp = (char *)ti_malloc(strlen(processesPerProcessors)+2); strcpy(temp, processesPerProcessors); processesPerProcessors = temp; } #endif /* REXEC */ for (i = 0; i < numBoxes; i++) { boxes[i].boxNumber = i; boxes[i].processesPerProcessors = 0; } /* Get each processes/processor tag and associate it with each box */ #if defined(PTHREAD) || defined(THREAD) boxes[0].processesPerProcessors = strtok(processesPerProcessors, " \t\n,"); for(i=1; i < numBoxes && (boxes[i].processesPerProcessors = strtok(NULL, " \t\n,")); i++); { int gotsome = 0; /* check for missing ones */ for(i=0; i < numBoxes; i++) { if (boxes[i].processesPerProcessors == NULL) { char *newstring = ti_malloc(4); strcpy(newstring, "1/1"); boxes[i].processesPerProcessors = newstring; gotsome = 1; } } if (gotsome) { fprintf(stderr, "Tic: TI_THREADS too short - assuming 1 thread for unlisted processors\n"); fflush(stderr); } } #else /* no threads, but may still have multiple boxes */ for (i = 0; i < numBoxes; i++) { char *newstring = ti_malloc(4); strcpy(newstring, "1/1"); boxes[i].processesPerProcessors = newstring; } #endif gethostname(hostname, MAXHOSTNAMELEN); shortenHostname(hostname, shorthostname); #if defined(REXEC) if (!usingAMSPMD) he = gethostbyname(hostname); #endif for(i = 0; i < numBoxes; i++) { boxInfo *box = &boxes[i]; int thisIsMyBox = 0; #if defined(COMM_AMUDP) if (usingAMSPMD && AMUDP_SPMDMyProc() == i) thisIsMyBox = 1; #elif defined(COMM_AMMPI) if (usingAMSPMD && AMMPI_SPMDMyProc() == i) thisIsMyBox = 1; #endif if (!usingAMSPMD) { #if defined(REXEC) /* rexec could potentially put any hostname or IP address in REXEC_SVRS * check everything that could possibly specify this box */ #if 0 printf("box->DNSname=%s\n",box->DNSname); printf("trying: %s, %s\n", shorthostname, hostname); #endif thisIsMyBox = !strcasecmp(box->DNSname, shorthostname) || !strcasecmp(box->DNSname, hostname); if (!thisIsMyBox) { char temp[MAXHOSTNAMELEN]; int i; shortenHostname(he->h_name, temp); thisIsMyBox |= !strcasecmp(box->DNSname, he->h_name) || !strcasecmp(box->DNSname, temp); for (i=0; he->h_aliases[i]; i++) { shortenHostname(he->h_aliases[i], temp); #if 0 printf("trying: %s, %s\n", he->h_aliases[i], temp); #endif thisIsMyBox |= !strcasecmp(box->DNSname, he->h_aliases[i]) || !strcasecmp(box->DNSname, temp); } for (i=0; he->h_addr_list[i]; i++) { strcpy(temp, inet_ntoa(*(struct in_addr*)(he->h_addr_list[i]))); #if 0 printf("trying: %s\n", temp); #endif thisIsMyBox |= !strcmp(box->DNSname, temp); } } #elif defined(GLUNIX) thisIsMyBox = (Glib_GetMyVnn() == i); #elif defined(COMM_AMLAPI) int my_task_id; lapi_handle_t *lapi_hndl = AMLAPI_GetLAPIHandle (); /* Get My LAPI endpoint */ LAPI_Qenv(*lapi_hndl, TASK_ID, &my_task_id); if (my_task_id == i) thisIsMyBox = 1; #elif defined(COMM_GASNET) thisIsMyBox = (gasnet_mynode() == i); #else assert(numBoxes == 1); thisIsMyBox = 1; #endif } /* Set settings only for my box */ if (thisIsMyBox) { box->pid = getpid(); #if defined(__APPLE__) { int mib[2]; int hwprocs = 0; size_t len; mib[0] = CTL_HW; mib[1] = HW_NCPU; len = sizeof(hwprocs); if (sysctl(mib, 2, &hwprocs, &len, NULL, 0)) { perror("sysctl"); abort(); } if (hwprocs < 1) hwprocs = 1; box->totalProcessors = hwprocs; } #elif defined(HPUX) || defined(SUPERUX) || defined(__MTA__) box->totalProcessors = 1; /* appears to be no way to query CPU count on these */ #else { int hwprocs = 0; hwprocs = sysconf(_SC_NPROCESSORS_ONLN); if (hwprocs < 1) hwprocs = 1; /* catch failures on Solaris/Cygwin */ box->totalProcessors = hwprocs; } #endif myBox = box; } { char *p; char tmp[10]; strcpy(tmp,box->processesPerProcessors); p = strchr(tmp, '/'); if (p) { *p = '\0'; box->countProcesses = atol(tmp); box->countProcessors = atol(p+1); } else { box->countProcesses = atol(tmp); box->countProcessors = box->countProcesses; } if (box->countProcessors <= 0 || box->countProcesses <= 0 || box->countProcessors > box->countProcesses) { fprintf(stderr, "Tic: Error - malformed TI_THREADS environment variable\n"); exit(1); } else if (box->countProcesses > MAX_BOX_PROCS) { fprintf(stderr, "Tic: thread count %i specified for box %i exceeds per-box thread limit of %i\n", box->countProcesses, i, MAX_BOX_PROCS); exit(1); } numProcesses += box->countProcesses; } /* Check for too many processors */ if (thisIsMyBox) { if (box->countProcessors > box->totalProcessors) { box->countProcessors = box->totalProcessors; } /* set politeness (only polite if we're low on physical CPU's) */ if (box->countProcesses > box->countProcessors) { politep = 1; } } } if (myBox == NULL) { printf("Failed to determine myBox number during startup.\nThis program must be run with the Titanium run script!\n"); exit(1); } #ifndef NDEBUG if (!getenvMaster("TI_BACKEND_SILENT") && myBox->boxNumber == 0) { fprintf(stderr, "-----------------------------------------------------------------------\n" " WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING\n" "\n" " This application was built from a Titanium compiler installation \n" " configured with global, forced debugging mode.\n" " This usually has a SERIOUS impact on performance, so you should NOT\n" " trust any performance numbers obtained from this program run!!!\n" "\n" " WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING\n" "-----------------------------------------------------------------------\n" ); fflush(stderr); } #endif if (getenv("TI_POLITE_SYNC") != NULL) { if (!getenv("TI_BACKEND_SILENT")) { fprintf(stderr,"Box %i enabling \"polite\", low-performance synchronization algorithms\n", myBox->boxNumber); fflush(stderr); } politep = 1; #ifdef COMM_GASNET gasnet_set_waitmode(GASNET_WAIT_BLOCK); #endif } else if (politep) { if (!getenv("TI_BACKEND_SILENT")) { fprintf(stderr,"WARNING: Box %i running more threads (%i) than there are physical CPU's (%i)\n" " enabling \"polite\", low-performance synchronization algorithms\n", myBox->boxNumber, myBox->countProcesses, myBox->countProcessors); fflush(stderr); } #ifdef COMM_GASNET gasnet_set_waitmode(GASNET_WAIT_BLOCK); #endif } else { #ifdef COMM_GASNET gasnet_set_waitmode(GASNET_WAIT_SPIN); #endif } init = 2; return numProcesses * sizeof(processInfo); } void COMM_Init_Three(char *memory) { int i = 0, j = 0; processes = (processInfo *)memory; for(i = 0; i < numBoxes; i++) { boxInfo *box = &boxes[i]; box->processes = (processInfo *)memory; memory += box->countProcesses * sizeof(processInfo); for(j = 0; j < box->countProcesses; j++) { box->processes[j].processNumber = uniqueID++; box->processes[j].boxProcessNumber = j; box->processes[j].box = box; } } init = 3; } static int delay_gpid = 0; void COMM_Init_Gpid() { if (delay_gpid) { if (MYBOX != 0) { /* get gpid from zero */ jGPointer pgpid_zero; TO_GLOBALB_STATIC(pgpid_zero, 0, &gpid); DEREF_GLOBAL_jlong(gpid, pgpid_zero); } delay_gpid = 0; } } void COMM_Init_Four(int argc, char **argv) { julong randbytes = 0; int i; for (i=0; i < sizeof(julong); i++) randbytes = (randbytes << 8) ^ ((julong)rand()); if (usingAMSPMD) { /* use AllGather to generate a unique global pid */ julong *tmparr = ti_malloc(sizeof(julong)*numBoxes); randbytes = randbytes ^ getpid(); #if defined(COMM_AMUDP) AMUDP_SPMDAllGather(&randbytes, tmparr, sizeof(julong)); #elif defined(COMM_AMMPI) AMMPI_SPMDAllGather(&randbytes, tmparr, sizeof(julong)); #else abort(); /* bad AMSPMD? */ #endif gpid = 0; for (i = 0; i < numBoxes; i++) { gpid = gpid ^ tmparr[i]; } } else { #if defined(REXEC) char *pLLGuid = getenv("REXEC_GPID"); if (pLLGuid == NULL) { printf("Environment variable REXEC_GPID not defined.\nThis program must be run with rexec and the Titanium run script!\n"); exit(1); } for(; *pLLGuid != 0; pLLGuid++) { gpid = gpid * 10 + (*pLLGuid - '0'); } #elif defined(GLUNIX) gpid = Glib_GetMyNpid(); #else if (COMM_GetMyBoxNumber() == 0) gpid = randbytes ^ getpid(); if (numBoxes != 1) delay_gpid = 1; /* delay it until later */ #endif } myBox->argc = argc; myBox->argv = argv; init = 4; } processInfo * COMM_GetHisBoxProcess(Process p) { return &myBox->processes[p]; } processInfo * COMM_GetHisProcess(Process p) { return &processes[p]; } juint COMM_GetMyGuid() { assert(!delay_gpid); return gpid & 0xFFFFFFFF; } julong COMM_GetMyGuid64() { assert(!delay_gpid); return gpid; } Box COMM_GetMyBoxNumber() { assert(!COMM_init_box_const_complete); return myBox->boxNumber; } Box COMM_GetBoxNumberForProcNumber(Process processNumber) { return processes[processNumber].box->boxNumber; } Box COMM_GetHisBoxProcNumber(Process processNumber) { return processes[processNumber].boxProcessNumber; } Box COMM_GetMyBParallelDegree() { assert(!COMM_init_box_const_complete); return myBox->countProcesses; } float COMM_GetMyProcsPerProc() { return (float)(myBox->countProcesses) / (float)(myBox->countProcessors); } Box COMM_GetBParallelDegree() { assert(!COMM_init_box_const_complete); return numBoxes; } Process COMM_GetParallelDegree() { assert(!COMM_init_box_const_complete); return numProcesses; } Process COMM_GetProxyProcNumberForBoxNumber(Box boxNumber) { return boxes[boxNumber].processes[0].processNumber; } #ifndef COMM_AM2 void COMM_thread_cons_monitor(processInfo *pi, jGPointer monitor) { process_monitor_table *newpmt = pi->pmt_freelist; if (newpmt) pi->pmt_freelist = newpmt->cdr; newpmt = (process_monitor_table *)ti_malloc(sizeof(process_monitor_table)); newpmt->monitor = monitor; newpmt->nesting = 1; newpmt->cdr = pi->locked_monitors; pi->locked_monitors = newpmt; } void COMM_thread_remove_monitor(processInfo *pi, jGPointer monitor) { process_monitor_table *prev, *current; prev = pi->locked_monitors; if (prev == NULL) return; if (___eq_ptr_check(prev->monitor, monitor)) { pi->locked_monitors = prev->cdr; /* don't drop the end of the list! */ prev->cdr = pi->pmt_freelist; pi->pmt_freelist = prev; return; } current = prev->cdr; while(current != NULL) { if (___eq_ptr_check(current->monitor, monitor)) { prev->cdr = current->cdr; current->cdr = pi->pmt_freelist; pi->pmt_freelist = current; return; } prev = current; current = current->cdr; } } #endif /* COMM_AM2 */ #ifdef MISALIGNED_CSTATIC_DATA intptr_t *_tic_cstaticaddr_translationtable = NULL; static void _tic_set_fnstatics(); /* sample static data declared in many different ways */ static long _tic_staticdata_dummyvar0; static long _tic_staticdata_dummyvar1 = 0; static long _tic_staticdata_dummyvar2 = -1; long _tic_staticdata_dummyvar3; long _tic_staticdata_dummyvar4 = 0; long _tic_staticdata_dummyvar5 = -1; void *_tic_staticdata_addrs[] = { &_tic_staticdata_dummyvar0, &_tic_staticdata_dummyvar1, &_tic_staticdata_dummyvar2, &_tic_staticdata_dummyvar3, &_tic_staticdata_dummyvar4, &_tic_staticdata_dummyvar5, NULL, NULL, NULL, /* sample some static data declared elsewhere */ &TIC_MYBOX, &politep, &tic_thread_key, /* sample some code section entries as well */ &abort, &tic_init_staticdata_translation, &_tic_set_fnstatics }; static void _tic_set_fnstatics() { static long _tic_staticdata_dummyvar6; static long _tic_staticdata_dummyvar7 = 0; static long _tic_staticdata_dummyvar8 = -1; _tic_staticdata_addrs[6] = &_tic_staticdata_dummyvar6; _tic_staticdata_addrs[7] = &_tic_staticdata_dummyvar7; _tic_staticdata_addrs[8] = &_tic_staticdata_dummyvar8; } int _tic_staticdata_tablegather = 0; long **_tic_staticdata_table = NULL; ti_hsl_t _tic_staticdata_tablelock = ti_hsl_decl_initializer; TI_INLINE(staticdata_tablegather) void staticdata_tablegather(tic_amtoken_t token, void *_addrs, size_t addr_sz, tic_handlerarg_t srcid) { void **addrs = _addrs; int i; ti_hsl_lock(&_tic_staticdata_tablelock); assert(addr_sz == sizeof(_tic_staticdata_addrs)); if (_tic_staticdata_table == NULL) { _tic_staticdata_table = ti_malloc_handlersafe(sizeof(void *)*BOXES); _tic_set_fnstatics(); } /* make sure the linker isn't doing anything sneaky on us, and that all static data images look identical (although they may start at different base addresses) */ for (i=1; i < addr_sz/sizeof(void*); i++) { assert(((intptr_t)addrs[i]) - ((intptr_t)addrs[0]) == ((intptr_t)_tic_staticdata_addrs[i]) - ((intptr_t)_tic_staticdata_addrs[0])); } _tic_staticdata_table[srcid] = addrs[0]; tic_local_wmb(); _tic_staticdata_tablegather++; ti_hsl_unlock(&_tic_staticdata_tablelock); } TIC_AMMEDIUM(staticdata_tablegather, 1, 1, (token,addr,nbytes, a0), (token,addr,nbytes, a0)); TI_INLINE(staticdata_tablebcast) void staticdata_tablebcast(tic_amtoken_t token, void *_addrs, size_t addr_sz) { long **addrs = _addrs; int i; intptr_t *table = ti_malloc_handlersafe(sizeof(intptr_t)*BOXES); assert(addr_sz == sizeof(void *)*BOXES); assert(_tic_cstaticaddr_translationtable == NULL); /* calculate table */ for (i=0; i < BOXES; i++) { table[i] = ((intptr_t)addrs[i]) - ((intptr_t)_tic_staticdata_addrs[0]); } tic_local_wmb(); _tic_cstaticaddr_translationtable = table; } TIC_AMMEDIUM(staticdata_tablebcast, 0, 0, (token,addr,nbytes), (token,addr,nbytes)); /* called early at startup by a single thread */ extern void tic_init_staticdata_translation() { _tic_set_fnstatics(); if (BOXES == 1) { /* if there's only one box then AM may not be initialized - run handlers synchronously */ staticdata_tablegather(TIC_AM_DUMMYTOKEN, _tic_staticdata_addrs, sizeof(_tic_staticdata_addrs), MYBOX); assert(_tic_staticdata_tablegather == BOXES); staticdata_tablebcast(TIC_AM_DUMMYTOKEN, _tic_staticdata_table, sizeof(long *)*BOXES); assert(_tic_cstaticaddr_translationtable); } else { /* send our offsets to box 0 */ tic_AMRequestI(1,1,(0, TIC_AMIDX(staticdata_tablegather), _tic_staticdata_addrs, sizeof(_tic_staticdata_addrs), MYBOX)); if (MYBOX == 0) { int i; tic_poll_until(_tic_staticdata_tablegather == BOXES); /* distribute table */ for (i = 0; i < BOXES; i++) { tic_AMRequestI(0,0,(i, TIC_AMIDX(staticdata_tablebcast), _tic_staticdata_table, sizeof(long *)*BOXES)); } } tic_poll_until(_tic_cstaticaddr_translationtable); } assert(_tic_cstaticaddr_translationtable[MYBOX] == 0); } #endif /* attempt to maximize allowable cpu and memory resource limits for this process, ignoring any errors */ void tic_maxrlimit() { int debug = 0; struct res_s { int res; const char *desc; } res[] = { #ifdef RLIMIT_CPU { RLIMIT_CPU, "RLIMIT_CPU" }, #endif #ifdef RLIMIT_DATA { RLIMIT_DATA, "RLIMIT_DATA" }, #endif #ifdef RLIMIT_RSS { RLIMIT_RSS, "RLIMIT_RSS" }, #endif #ifdef RLIMIT_STACK { RLIMIT_STACK, "RLIMIT_STACK" }, #endif #ifdef RLIMIT_AS { RLIMIT_AS, "RLIMIT_AS" }, #endif }; #define SET_RLIMITS(structname, getrlimit, setrlimit) do { \ structname rval; size_t idx; \ for (idx = 0; idx < sizeof(res)/sizeof(struct res_s); idx++) { \ if (!getrlimit(res[idx].res, &rval)) { \ if (debug > 1) printf("%s == { %i, %i }\n", res[idx].desc, \ (int)rval.rlim_cur, (int)rval.rlim_max); \ if (rval.rlim_cur == RLIM_INFINITY || \ rval.rlim_max == RLIM_INFINITY) rval.rlim_cur = RLIM_INFINITY; \ else rval.rlim_cur = rval.rlim_max; \ if (debug > 1) { \ if (rval.rlim_cur == RLIM_INFINITY) \ printf("setting %s to RLIM_INFINITY\n", res[idx].desc); \ else printf("setting %s to %i\n", res[idx].desc, (int)rval.rlim_cur); \ } \ if (setrlimit(res[idx].res, &rval) && debug) perror("setrlimit"); \ } else if (debug) perror("getrlimit"); \ } \ } while (0) #if defined(HAVE_GETRLIMIT) && defined(HAVE_SETRLIMIT) SET_RLIMITS(struct rlimit, getrlimit, setrlimit); #endif /* do 64-bit second, to favor the potentially higher limits */ #if defined(HAVE_GETRLIMIT64) && defined(HAVE_SETRLIMIT64) SET_RLIMITS(struct rlimit64, getrlimit64, setrlimit64); #endif }