#define TI_NO_SRCPOS #include #include /************************** * INITIALIZATION/STATICS * *************************/ #ifndef COMM_AM2 int _tic_dummy_newbulk_c = 0; /* define at least one symbol to avoid warnings */ #else /* COMM_AM2 */ /* preallocate buffer size in bytes */ static int tic_prealloc; /* pipelining is 1 if using pipelining */ static int tic_pipelining; /* struct definition to hold information about each remote buffer */ struct Buffer{ void *ptr; int size; }; /* holds information about each remote buffer each thread has its own array array has the same length as the number of nodes */ static struct Buffer **bufferArray; /* initialize data structure for buffer reuse assumes only one thread in the box will run this code so it does not acquire any locks */ void buffer_init(){ int i, j; bufferArray = (struct Buffer **) ti_malloc_atomic_uncollectable(sizeof(struct Buffer *)*MYBOXPROCS); for (i = 0; i < MYBOXPROCS; i++){ struct Buffer *tmp; bufferArray[i] = (struct Buffer *) ti_malloc_atomic_uncollectable(sizeof(struct Buffer)*BOXES); tmp = bufferArray[i]; for (j = 0; j < BOXES; j++){ tmp->size = -1; tmp->ptr = NULL; tmp++; } } } /* in the case of preallocate buffer, return a pointer to the buffer of size datasz or greater for node remote_box */ void *get_remote_buffer(int datasz, int remote_box){ void * volatile remoteAllocBuf = NULL; struct Buffer *myBuffers = bufferArray[MYBOXPROC]; struct Buffer *buffer = myBuffers+remote_box; /* Allocate a buffer to hold the array data on the remote sideif there is no preallocated buffer of this size or greater. */ if (buffer->size >= datasz){ remoteAllocBuf = buffer->ptr; } else if (buffer->size == -1){ tic_AMRequest(2,3,(remote_box, TIC_AMIDX(misc_alloc_request), datasz, TIC_AMSEND_PTR(&remoteAllocBuf))); tic_poll_until(remoteAllocBuf); assert((((jUIntPointer)remoteAllocBuf) % sizeof(void *) == 0)); buffer->size = datasz; buffer->ptr = remoteAllocBuf; } else{ /* first delete old buffer that is too small */ tic_AMRequest(1,2,(remote_box, TIC_AMIDX(misc_delete_request), TIC_AMSEND_PTR(buffer->ptr))); /* allocate new buffer with size datasz */ tic_AMRequest(2,3,(remote_box, TIC_AMIDX(misc_alloc_request), datasz, TIC_AMSEND_PTR(&remoteAllocBuf))); tic_poll_until(remoteAllocBuf); assert((((jUIntPointer)remoteAllocBuf) % sizeof(void *) == 0)); buffer->size = datasz; buffer->ptr = remoteAllocBuf; } return remoteAllocBuf; } typedef void *(*pack_method_t)(void *, int *, void *); typedef void *(*unpack_method_t)(void *, void *); /* ------------------------------------------------------------------------------------ */ /*************************** * Miscellaneous handlers. * ***************************/ TI_INLINE(misc_null_reply) void misc_null_reply(tic_amtoken_t token) { return; } TIC_AMSHORT(misc_null_reply, 0, 0, (token), (token)); /* ------------------------------------------------------------------------------------ */ TI_INLINE(misc_delete_request) void misc_delete_request(tic_amtoken_t token, void *addr) { ti_free_handlersafe(addr); TIC_NULL_REPLY(token); } TIC_AMSHORT(misc_delete_request, 1, 2, (token, TIC_AMRECV_PTR32(a0) ), (token, TIC_AMRECV_PTR64(a0, a1))); /* ------------------------------------------------------------------------------------ */ /* Size is in bytes. */ TI_INLINE(misc_alloc_request) void misc_alloc_request(tic_amtoken_t token, int size, void *destptr) { void *buf = ti_malloc_handlersafe(size); tic_AMReply(2,4,(token, TIC_AMIDX(misc_alloc_reply), TIC_AMSEND_PTR(buf), TIC_AMSEND_PTR(destptr))); } TIC_AMSHORT(misc_alloc_request, 2, 3, (token, a0, TIC_AMRECV_PTR32(a1) ), (token, a0, TIC_AMRECV_PTR64(a1, a2))); /* ------------------------------------------------------------------------------------ */ TI_INLINE(misc_alloc_reply) void misc_alloc_reply(tic_amtoken_t token, void *buf, void *destptr) { void ** premoteAllocBuf = (void **)destptr; (*premoteAllocBuf) = (void *)buf; } TIC_AMSHORT(misc_alloc_reply, 2, 4, (token, TIC_AMRECV_PTR32(a0), TIC_AMRECV_PTR32(a1)), (token, TIC_AMRECV_PTR64(a0, a1), TIC_AMRECV_PTR64(a2, a3))); /* ------------------------------------------------------------------------------------ * * Rectangular strided copy * ------------------------------------------------------------------------------------ */ /************************************ * Get an array from a remote node. * ***********************************/ /* Call _packMethod to pack the array with copy_desc as its "copy descriptor", returning the packed data to remote address remoteBuffer. */ TI_INLINE(strided_pack_request) void strided_pack_request(tic_amtoken_t token, void *copy_desc, size_t copy_desc_size, void *_packMethod, void *remoteBuffer, void *pack_info_ptr, int atomicelements) { int array_data_len; void *array_data; pack_method_t packMethod = (pack_method_t)_packMethod; /* A NULL buffer pointer tells the packer to allocate its own buffer. * (void*)-1 tells it to use handlersafe, uncollectable memory so it doesn't disappear on us */ array_data = (*packMethod)(copy_desc, &array_data_len, (void*)-1); #if defined(USE_DISTRIBUTED_GC) && !defined(USE_GC_NONE) /* inform distributed GC that some pointers may be escaping */ if (!atomicelements) dgc_ptr_esc_all_gps(array_data, array_data_len, 1); #endif if (array_data_len <= TIC_AM_MAX_MEDIUM) { /* Common, fast case. Send back the packed data with the reply. */ tic_AMReplyI(4,7,(token, TIC_AMIDX(strided_pack_reply), array_data, array_data_len, TIC_AMSEND_PTR(remoteBuffer), TIC_AMSEND_PTR(0), 0, TIC_AMSEND_PTR(pack_info_ptr))); ti_free_handlersafe(array_data); } else { #if 0 tic_local_wmb(); /* ensure data is committed before signalling initiator to pull data */ #endif /* Slower case. Why doesn't the darned AM_ReplyXfer() work? */ tic_AMReplyI(4,7,(token, TIC_AMIDX(strided_pack_reply), array_data, TIC_AM_MAX_MEDIUM, TIC_AMSEND_PTR(remoteBuffer), TIC_AMSEND_PTR(array_data), array_data_len, TIC_AMSEND_PTR(pack_info_ptr))); } } TIC_AMMEDIUM(strided_pack_request, 4, 7, (token,addr,nbytes, TIC_AMRECV_PTR32(a0), TIC_AMRECV_PTR32(a1), TIC_AMRECV_PTR32(a2), a3), (token,addr,nbytes, TIC_AMRECV_PTR64(a0, a1), TIC_AMRECV_PTR64(a2, a3), TIC_AMRECV_PTR64(a4, a5), a6)); /* ------------------------------------------------------------------------------------ */ /* The reply handler needs to pass this information back to the caller. */ typedef struct { volatile int pack_spin; /* 0 while waiting, 1 if completed, 2 if using fragmentation */ void * volatile remoteBuffer; volatile int dataSize; } pack_return_info_t; /* Sets the spin flag to 1 if the array fit in the message reply and we're done; otherwise it sets the spinlock variable to be the remote address of the packed data, which must be sent over with a libtic bulk transfer. */ TI_INLINE(strided_pack_reply) void strided_pack_reply(tic_amtoken_t token, void *array_data, size_t array_data_size, void *localBuffer, void *remoteBuf, int dataSz, void *pack_info_ptr) { int moreWaiting = (remoteBuf != 0 && dataSz != 0); pack_return_info_t* pinfo = (pack_return_info_t*)pack_info_ptr; memcpy((void *)localBuffer, array_data, array_data_size); if (!moreWaiting) { /* got it all in a single chunk */ tic_local_wmb(); /* ensure data is committed before signalling */ pinfo->pack_spin = 1; } else { /* need more chunks */ assert(array_data_size == TIC_AM_MAX_MEDIUM); /* first chunk always max medium in size */ pinfo->remoteBuffer = (jbyte *)remoteBuf; pinfo->dataSize = dataSz; tic_local_wmb(); /* ensure data is committed before signalling */ pinfo->pack_spin = 2; } } TIC_AMMEDIUM(strided_pack_reply, 4, 7, (token,addr,nbytes, TIC_AMRECV_PTR32(a0), TIC_AMRECV_PTR32(a1), a2, TIC_AMRECV_PTR32(a3) ), (token,addr,nbytes, TIC_AMRECV_PTR64(a0, a1), TIC_AMRECV_PTR64(a2, a3), a4, TIC_AMRECV_PTR64(a5, a6))); /* ------------------------------------------------------------------------------------ */ extern void get_array(void *pack_method, void *copy_desc, int copy_desc_size, int tgt_box, void *buffer, int atomicelements) { pack_return_info_t info; info.pack_spin = 0; /* A copy_desc is an object that contains the array descriptor. */ assert(copy_desc && copy_desc_size > 0 && copy_desc_size <= TIC_AM_MAX_MEDIUM); tic_AMRequestI(4,7,(tgt_box, TIC_AMIDX(strided_pack_request), copy_desc, copy_desc_size, TIC_AMSEND_PTR(TIC_TRANSLATE_FUNCTION_ADDR(pack_method,tgt_box)), TIC_AMSEND_PTR(buffer), TIC_AMSEND_PTR(&info), atomicelements)); tic_poll_until(info.pack_spin); /* If the data was too big to be sent in the reply, use a Tic call to do the job. */ if (info.pack_spin != 1) { /* pull the data to use using a bulk copy (first fragment has already been copied) */ ti_bulk_read((void *)((jUIntPointer)buffer+TIC_AM_MAX_MEDIUM), tgt_box, ((jbyte *)info.remoteBuffer) + TIC_AM_MAX_MEDIUM, info.dataSize - TIC_AM_MAX_MEDIUM, tic_no_ptrs); /* now tell remote to delete temp space */ tic_AMRequest(1,2,(tgt_box, TIC_AMIDX(misc_delete_request), TIC_AMSEND_PTR(info.remoteBuffer))); } } /* ------------------------------------------------------------------------------------ */ /************************************ * Write an array to a remote node. * ***********************************/ /* The fastest way of doing things....take the data given and unpack it. */ TI_INLINE(strided_unpackAll_request) void strided_unpackAll_request(tic_amtoken_t token, void *packedArrayData, size_t nBytes, void *_unpackMethod, int copyDescSize, void *unpack_spin_ptr) { void *temp = NULL; if (((jUIntPointer)packedArrayData) % 8 != 0) { /* it seems that some crappy AM implementations (ahem.. the NOW) * fail to double-word align the medium-sized message buffer * fix that problem if it occurs */ temp = ti_malloc_handlersafe(nBytes+8); memcpy(temp, packedArrayData, nBytes); packedArrayData = temp; } { unpack_method_t unpackMethod = (unpack_method_t)_unpackMethod; void *copyDesc = packedArrayData; void *arrayData = (void *)((jUIntPointer)packedArrayData + copyDescSize); assert(((jUIntPointer)copyDesc) % 8 == 0 && ((jUIntPointer)arrayData) % 8 == 0); (*unpackMethod)(copyDesc, arrayData); #if 0 tic_local_wmb(); /* ensure data is committed before signalling completion */ #endif tic_AMReply(1,2,(token, TIC_AMIDX(strided_unpack_reply), TIC_AMSEND_PTR(unpack_spin_ptr))); } if (temp) ti_free_handlersafe(temp); } TIC_AMMEDIUM(strided_unpackAll_request, 3, 5, (token,addr,nbytes, TIC_AMRECV_PTR32(a0), a1, TIC_AMRECV_PTR32(a2) ), (token,addr,nbytes, TIC_AMRECV_PTR64(a0, a1), a2, TIC_AMRECV_PTR64(a3, a4))); /* ------------------------------------------------------------------------------------ */ TI_INLINE(strided_unpack_reply) void strided_unpack_reply(tic_amtoken_t token, void *unpack_spin_ptr) { int *punpack_spin = (int *)unpack_spin_ptr; *(punpack_spin) = 1; } TIC_AMSHORT(strided_unpack_reply, 1, 2, (token, TIC_AMRECV_PTR32(a0) ), (token, TIC_AMRECV_PTR64(a0, a1))); /* ------------------------------------------------------------------------------------ */ /* The data was provided by previous calls; just unpack it with the given descriptor. */ TI_INLINE(strided_unpackOnly_request) void strided_unpackOnly_request(tic_amtoken_t token, void *copyDesc, size_t copyDescSize, void *bufAddr, void *_unpackMethod, void *unpack_spin_ptr) { unpack_method_t unpackMethod = (unpack_method_t)_unpackMethod; (*unpackMethod)(copyDesc, (void *)bufAddr); ti_free_handlersafe((void *)bufAddr); #if 0 tic_local_wmb(); /* ensure data is committed before signalling completion */ #endif tic_AMReply(1,2,(token, TIC_AMIDX(strided_unpack_reply), TIC_AMSEND_PTR(unpack_spin_ptr))); } TIC_AMMEDIUM(strided_unpackOnly_request, 3, 6, (token,addr,nbytes, TIC_AMRECV_PTR32(a0), TIC_AMRECV_PTR32(a1), TIC_AMRECV_PTR32(a2) ), (token,addr,nbytes, TIC_AMRECV_PTR64(a0, a1), TIC_AMRECV_PTR64(a2, a3), TIC_AMRECV_PTR64(a4, a5))); /* ------------------------------------------------------------------------------------ */ /* Send a contiguous array of data to a node, and have it get unpacked into a Titanium array. */ extern void put_array(void *unpack_method, void *copy_desc, int copy_desc_size, void *array_data, int array_data_size, int tgt_box) { void *data; /* ensure double-word alignment for array data */ int copy_desc_size_padded = ((copy_desc_size-1)/8 + 1) * 8; int data_size = copy_desc_size_padded + array_data_size; volatile int unpack_spin = 0; /* Fast(er), hopefully common case. */ if (data_size <= TIC_AM_MAX_MEDIUM) { /* The remote end needs the descriptor and the array data. Since the data is small, copying the array data will be faster than sending two messages. */ data = (void *) ti_malloc_atomic_huge(data_size); assert(data && copy_desc_size_padded % 8 == 0); memcpy(data, copy_desc, copy_desc_size); /* All math is done in bytes. sizeof(void*) should be irrelevant. */ memcpy((void *)((jUIntPointer)data + copy_desc_size_padded), array_data, array_data_size); tic_AMRequestI(3,5,(tgt_box, TIC_AMIDX(strided_unpackAll_request), data, data_size, TIC_AMSEND_PTR(TIC_TRANSLATE_FUNCTION_ADDR(unpack_method,tgt_box)), copy_desc_size_padded, TIC_AMSEND_PTR(&unpack_spin))); tic_poll_until(unpack_spin); ti_free(data); } else { /* Slow case. */ /* Allocate a buffer to hold the array data on the remote side. */ void * volatile remoteAllocBuf = NULL; tic_AMRequest(2,3,(tgt_box, TIC_AMIDX(misc_alloc_request), array_data_size, TIC_AMSEND_PTR(&remoteAllocBuf))); tic_poll_until(remoteAllocBuf); /* Transfer the data to the buffer with libtic. */ ti_bulk_write(tgt_box, remoteAllocBuf, array_data, array_data_size); /* Tell the remote side to unpack the data. */ assert(copy_desc_size <= TIC_AM_MAX_MEDIUM); tic_AMRequestI(3,6,(tgt_box, TIC_AMIDX(strided_unpackOnly_request), copy_desc, copy_desc_size, TIC_AMSEND_PTR(remoteAllocBuf), TIC_AMSEND_PTR(TIC_TRANSLATE_FUNCTION_ADDR(unpack_method,tgt_box)), TIC_AMSEND_PTR(&unpack_spin))); tic_poll_until(unpack_spin); } } /* ------------------------------------------------------------------------------------ * * Sparse scatter-gather * ------------------------------------------------------------------------------------ */ /* use for packing in gather handlers */ #define FAST_PACK(elem_sz, data, addr_list) do { \ switch(elem_sz){ \ case sizeof(jubyte): \ { \ jubyte *ubyteData = (jubyte *) data; \ jubyte **ubyteAddrList = (jubyte **) addr_list; \ for (i = 0; i < num_elem; i++) { \ ubyteData[i] = *(ubyteAddrList[i]); \ } \ } \ break; \ case sizeof(jchar): \ { \ jchar *charData = (jchar *) data; \ jchar **charAddrList = (jchar **) addr_list; \ for (i = 0; i < num_elem; i++) { \ charData[i] = *(charAddrList[i]); \ } \ } \ break; \ case sizeof(juint): \ { \ juint *uintData = (juint *) data; \ juint **uintAddrList = (juint **) addr_list; \ for (i = 0; i < num_elem; i++) { \ uintData[i] = *(uintAddrList[i]); \ } \ } \ break; \ case sizeof(julong): \ { \ julong *ulongData = (julong *) data; \ julong **ulongAddrList = (julong **) addr_list; \ for (i = 0; i < num_elem; i++) { \ ulongData[i] = *(ulongAddrList[i]); \ } \ } \ break; \ default: \ for (i = 0; i < num_elem; i++) { \ memcpy(data + i*elem_sz, addr_list[i], elem_sz);\ } \ }} while(0) /* use for unpacking in scatter handlers */ #define FAST_UNPACK(elem_sz, data, addr_list) do { \ switch(elem_sz){ \ case sizeof(jubyte): \ { \ jubyte *ubyteData = (jubyte *) data; \ jubyte **ubyteAddrList = (jubyte **) addr_list; \ for (i = 0; i < num_elem; i++) { \ *(ubyteAddrList[i]) = ubyteData[i]; \ } \ } \ break; \ case sizeof(jchar): \ { \ jchar *charData = (jchar *) data; \ jchar **charAddrList = (jchar **) addr_list; \ for (i = 0; i < num_elem; i++) { \ *(charAddrList[i]) = charData[i]; \ } \ } \ break; \ case sizeof(juint): \ { \ juint *uintData = (juint *) data; \ juint **uintAddrList = (juint **) addr_list; \ for (i = 0; i < num_elem; i++) { \ *(uintAddrList[i]) = uintData[i]; \ } \ } \ break; \ case sizeof(julong): \ { \ julong *ulongData = (julong *) data; \ julong **ulongAddrList = (julong **) addr_list; \ for (i = 0; i < num_elem; i++) { \ *(ulongAddrList[i]) = ulongData[i]; \ } \ } \ break; \ default: \ for (i = 0; i < num_elem; i++) { \ memcpy(addr_list[i], data + i*elem_sz, elem_sz);\ } \ }} while(0) /* this macro requires dest and src have alignment of at least length */ #define FAST_MEMCPY(dest, src, length) do { \ switch(length) { \ case sizeof(jubyte): \ *((jubyte *)(dest)) = *((jubyte *)(src)); \ break; \ case sizeof(jchar): \ *((jchar *)(dest)) = *((jchar *)(src)); \ break; \ case sizeof(juint): \ *((juint *)(dest)) = *((juint *)(src)); \ break; \ case sizeof(julong): \ *((julong *)(dest)) = *((julong *)(src)); \ break; \ default: \ memcpy(dest, src, length); \ } } while(0) extern void sparse_scatter_serial(void **remote_addr_list, void *src_data_list, int remote_box, int num_elem, int elem_sz, int atomic_elements) { volatile int done_ctr = 0; int datasz; int offset; assert(src_data_list && remote_addr_list && remote_box >= 0 && remote_box < BOXES && num_elem > 0 && elem_sz > 0); offset = num_elem * sizeof(void *); datasz = offset + num_elem * elem_sz; if (datasz <= TIC_AM_MAX_MEDIUM) { /* fast case - everything fits in a medium msg */ char *data = ti_malloc_atomic_huge(datasz); memcpy(data, remote_addr_list, offset); memcpy(data+offset, src_data_list, num_elem * elem_sz); #if defined(USE_DISTRIBUTED_GC) && !defined(USE_GC_NONE) /* inform distributed GC that some pointers may be escaping */ if (!atomic_elements) GC_PTR_ESC_ALL_GPS(data+offset, num_elem * elem_sz); #endif tic_AMRequestI(3,4,(remote_box, TIC_AMIDX(sparse_simpleScatter_request), data, datasz, num_elem, elem_sz, TIC_AMSEND_PTR(&done_ctr))); tic_poll_until(done_ctr); ti_free(data); } else { /* slower case - need multiple messages */ void * volatile remoteAllocBuf = NULL; if (datasz <= tic_prealloc){ /* use preallocate buffer if one is available */ remoteAllocBuf = get_remote_buffer(datasz, remote_box); } else{ /* Allocate a buffer to hold the array data on the remote side. */ tic_AMRequest(2,3,(remote_box, TIC_AMIDX(misc_alloc_request), datasz, TIC_AMSEND_PTR(&remoteAllocBuf))); tic_poll_until(remoteAllocBuf); } /* Transfer the addresses and data to the buffer with libtic (this could be optimized somewhat) */ ti_bulk_write_weak(remote_box, remoteAllocBuf, remote_addr_list, offset); #if defined(USE_DISTRIBUTED_GC) && !defined(USE_GC_NONE) /* inform distributed GC that some pointers may be escaping */ if (!atomic_elements) GC_PTR_ESC_ALL_GPS(src_data_list, num_elem * elem_sz); #endif ti_bulk_write_weak(remote_box, ((char *)remoteAllocBuf)+offset, src_data_list, num_elem * elem_sz); ti_write_sync(); /* Tell the remote side to scatter the data. */ tic_AMRequest(4,6,(remote_box, TIC_AMIDX(sparse_generalScatter_request), TIC_AMSEND_PTR(remoteAllocBuf), num_elem, elem_sz, TIC_AMSEND_PTR(&done_ctr))); tic_poll_until(done_ctr); } } /* ------------------------------------------------------------------------------------ */ extern void sparse_scatter_pipeline(void **remote_addr_list, void *src_data_list, int remote_box, int num_elem, int elem_sz, int atomic_elements) { volatile int done_ctr = 0; int datasz; int offset; assert(src_data_list && remote_addr_list && remote_box >= 0 && remote_box < BOXES && num_elem > 0 && elem_sz > 0); offset = num_elem * sizeof(void *); datasz = offset + num_elem * elem_sz; if (datasz <= TIC_AM_MAX_MEDIUM) { /* fast case - everything fits in a medium msg */ char *data = ti_malloc_atomic_huge(datasz); memcpy(data, remote_addr_list, offset); memcpy(data+offset, src_data_list, num_elem * elem_sz); #if defined(USE_DISTRIBUTED_GC) && !defined(USE_GC_NONE) /* inform distributed GC that some pointers may be escaping */ if (!atomic_elements) GC_PTR_ESC_ALL_GPS(data+offset, num_elem * elem_sz); #endif tic_AMRequestI(3,4,(remote_box, TIC_AMIDX(sparse_simpleScatter_request), data, datasz, num_elem, elem_sz, TIC_AMSEND_PTR(&done_ctr))); tic_poll_until(done_ctr); ti_free(data); } else { /* slower case - need multiple messages */ int total = num_elem; int loadSize; int count = 0; int messageCount; int pairSize = sizeof(void *)+elem_sz; char *localBuffer; char *localCurBuf; int bufferSize; int loadSizeInBytes; int bufferPadding; int lastLoadSize; int i; int addrLoadSize; int dataLoadSize; #if AM2_HAS_HUGE_SEGMENTS void * volatile remoteAllocBuf = NULL; void *curAllocBuf = NULL; loadSize = TIC_AM_MAX_LONG_REQUEST / pairSize; #else loadSize = TIC_AM_MAX_MEDIUM / pairSize; #endif addrLoadSize = loadSize*sizeof(void *); dataLoadSize = loadSize*elem_sz; loadSizeInBytes = addrLoadSize+dataLoadSize; bufferPadding = loadSizeInBytes % sizeof(void *); if (bufferPadding != 0){ bufferPadding = sizeof(void *) - bufferPadding; } loadSizeInBytes += bufferPadding; lastLoadSize = num_elem % loadSize; if (lastLoadSize == 0){ messageCount = num_elem / loadSize; bufferSize = loadSizeInBytes * messageCount; /* combine the address list and data list into a local buffer before sending */ localBuffer = ti_malloc_atomic_huge(bufferSize); localCurBuf = localBuffer; for (i = 0; i < messageCount; i++){ memcpy(localCurBuf, remote_addr_list+i*loadSize, addrLoadSize); memcpy(localCurBuf+addrLoadSize, ((char *)src_data_list+i*dataLoadSize), dataLoadSize); localCurBuf += loadSizeInBytes; } } else{ messageCount = num_elem / loadSize + 1; bufferSize = loadSizeInBytes * (messageCount - 1) + lastLoadSize * pairSize; /* combine the address list and data list into a buffer before sending */ localBuffer = ti_malloc_atomic_huge(bufferSize); localCurBuf = localBuffer; for (i = 0; i < messageCount-1; i++){ memcpy(localCurBuf, remote_addr_list+i*loadSize, addrLoadSize); memcpy(localCurBuf+addrLoadSize, ((char *)src_data_list+i*dataLoadSize), dataLoadSize); localCurBuf += loadSizeInBytes; } memcpy(localCurBuf, remote_addr_list+(messageCount-1)*loadSize, lastLoadSize*sizeof(void *)); memcpy(localCurBuf+lastLoadSize*sizeof(void *), ((char *)src_data_list+(messageCount-1)*dataLoadSize), lastLoadSize*elem_sz); } localCurBuf = localBuffer; #if AM2_HAS_HUGE_SEGMENTS if (bufferSize <= tic_prealloc){ /* use preallocate buffer if one is available */ remoteAllocBuf = get_remote_buffer(bufferSize, remote_box); } else{ /* get a buffer for this gather use only */ tic_AMRequest(2,3,(remote_box, TIC_AMIDX(misc_alloc_request), bufferSize, TIC_AMSEND_PTR(&remoteAllocBuf))); tic_poll_until(remoteAllocBuf); } curAllocBuf = remoteAllocBuf; #endif #if defined(USE_DISTRIBUTED_GC) && !defined(USE_GC_NONE) /* inform distributed GC that some pointers may be escaping */ if (!atomic_elements) GC_PTR_ESC_ALL_GPS(src_data_list, num_elem * elem_sz); #endif { int volatile * done_ctr_array = (int *)ti_malloc_atomic_uncollectable(sizeof(int) * messageCount); int *curCtr; for (i = 0; i < messageCount; i++){ done_ctr_array[i] = 0; } while (total > 0){ int currentLoadSize; int currentLoadSizeInBytes; if (total >= loadSize){ currentLoadSize = loadSize; currentLoadSizeInBytes = loadSizeInBytes; } else{ currentLoadSize = total; currentLoadSizeInBytes = total*pairSize; } curCtr = (int *) (done_ctr_array+count); #if AM2_HAS_HUGE_SEGMENTS tic_AMRequestXfer(3,4,(remote_box, (jUIntPointer)(((char*)curAllocBuf)-TIC_AM_SEGOFFSET), TIC_AMIDX(sparse_largeScatterNoDelete_request), (void *) (localCurBuf), currentLoadSizeInBytes, currentLoadSize, elem_sz, TIC_AMSEND_PTR(curCtr))); curAllocBuf = (void *) (((char *) curAllocBuf) + loadSizeInBytes); #else tic_AMRequestI(3,4,(remote_box, TIC_AMIDX(sparse_simpleScatter_request), localCurBuf, currentLoadSizeInBytes, currentLoadSize, elem_sz, TIC_AMSEND_PTR(curCtr))); #endif total -= currentLoadSize; localCurBuf += loadSizeInBytes; count++; } for (i = 0; i < messageCount; i++){ tic_poll_until(done_ctr_array[i]); } ti_free((void *) done_ctr_array); ti_free((void *) localBuffer); #if AM2_HAS_HUGE_SEGMENTS if (bufferSize > tic_prealloc){ tic_AMRequest(1,2,(remote_box, TIC_AMIDX(misc_delete_request), TIC_AMSEND_PTR(remoteAllocBuf))); } #endif } } } /* ------------------------------------------------------------------------------------ */ extern void sparse_scatter(void **remote_addr_list, void *src_data_list, int remote_box, int num_elem, int elem_sz, int atomic_elements) { if (tic_pipelining){ sparse_scatter_pipeline(remote_addr_list, src_data_list, remote_box, num_elem, elem_sz, atomic_elements); } else{ sparse_scatter_serial(remote_addr_list, src_data_list, remote_box, num_elem, elem_sz, atomic_elements); } } /* ------------------------------------------------------------------------------------ */ TI_INLINE(sparse_simpleScatter_request) void sparse_simpleScatter_request(tic_amtoken_t token, void *addr_data_list, size_t addr_data_list_size, int num_elem, int elem_sz, void *_done_ctr) { int i; void **addr_list = (void**)addr_data_list; int offset = num_elem * sizeof(void *); char *data = ((char*)addr_data_list) + offset; assert(addr_data_list_size == offset + num_elem * elem_sz); FAST_UNPACK(elem_sz, data, addr_list); tic_AMReply(1,2,(token, TIC_AMIDX(sparse_done_reply), TIC_AMSEND_PTR(_done_ctr))); } TIC_AMMEDIUM(sparse_simpleScatter_request, 3, 4, (token,addr,nbytes, a0, a1, TIC_AMRECV_PTR32(a2) ), (token,addr,nbytes, a0, a1, TIC_AMRECV_PTR64(a2, a3))); /* ------------------------------------------------------------------------------------ */ TI_INLINE(sparse_done_reply) void sparse_done_reply(tic_amtoken_t token, void *_done_ctr) { int *done_ctr = (int *)_done_ctr; *done_ctr = 1; } TIC_AMSHORT(sparse_done_reply, 1, 2, (token, TIC_AMRECV_PTR32(a0) ), (token, TIC_AMRECV_PTR64(a0, a1))); /* ------------------------------------------------------------------------------------ */ TI_INLINE(sparse_generalScatter_request) void sparse_generalScatter_request(tic_amtoken_t token, void *addr_data_list, int num_elem, int elem_sz, void *_done_ctr) { int i; void **addr_list = (void**)addr_data_list; int offset = num_elem * sizeof(void *); char *data = ((char*)addr_data_list) + offset; FAST_UNPACK(elem_sz, data, addr_list); tic_AMReply(1,2,(token, TIC_AMIDX(sparse_done_reply), TIC_AMSEND_PTR(_done_ctr))); if ((num_elem*elem_sz+offset) > tic_prealloc){ ti_free_handlersafe((void*)addr_data_list); } } TIC_AMSHORT(sparse_generalScatter_request, 4, 6, (token, TIC_AMRECV_PTR32(a0), a1, a2, TIC_AMRECV_PTR32(a3) ), (token, TIC_AMRECV_PTR64(a0, a1), a2, a3, TIC_AMRECV_PTR64(a4, a5))); /* ------------------------------------------------------------------------------------ */ TI_INLINE(sparse_largeScatterNoDelete_request) void sparse_largeScatterNoDelete_request(tic_amtoken_t token, void *addr_data_list, size_t addr_data_list_size, int num_elem, int elem_sz, void *_done_ctr){ int i; void **addr_list = (void**)addr_data_list; int offset = num_elem * sizeof(void *); char *data = ((char*)addr_data_list) + offset; FAST_UNPACK(elem_sz, data, addr_list); tic_AMReply(1,2,(token, TIC_AMIDX(sparse_done_reply), TIC_AMSEND_PTR(_done_ctr))); } TIC_AMLONG(sparse_largeScatterNoDelete_request, 3, 4, (token,addr,nbytes, a0, a1, TIC_AMRECV_PTR32(a2)), (token,addr,nbytes, a0, a1, TIC_AMRECV_PTR64(a2, a3))); /* ------------------------------------------------------------------------------------ */ TI_INLINE(sparse_largeGather_request) void sparse_largeGather_request(tic_amtoken_t token, void *_addr_list, size_t addr_list_size, int num_elem, int elem_sz, void *_tgt_data_list, void *_done_ctr, int atomic_elements) { int i; void **addr_list = (void**)_addr_list; int datasz = num_elem * elem_sz; int offset = num_elem * sizeof(void *); char *data = ((char*)_addr_list) + offset; assert(addr_list_size == sizeof(void *)*num_elem); FAST_PACK(elem_sz, data, addr_list); #if defined(USE_DISTRIBUTED_GC) && !defined(USE_GC_NONE) /* inform distributed GC that some pointers may be escaping */ if (!atomic_elements) dgc_ptr_esc_all_gps(data, datasz, 1); #endif tic_AMReplyXfer(1,2,(token, (jUIntPointer)(((char*)_tgt_data_list)-TIC_AM_SEGOFFSET), TIC_AMIDX(sparse_largeGather_reply), data, datasz, TIC_AMSEND_PTR(_done_ctr))); } TIC_AMLONG(sparse_largeGather_request, 5, 7, (token,addr,nbytes, a0, a1, TIC_AMRECV_PTR32(a2), TIC_AMRECV_PTR32(a3), a4), (token,addr,nbytes, a0, a1, TIC_AMRECV_PTR64(a2, a3), TIC_AMRECV_PTR64(a4, a5), a6)); /* ------------------------------------------------------------------------------------ */ TI_INLINE(sparse_largeGather_reply) void sparse_largeGather_reply(tic_amtoken_t token, void *data_list, size_t data_list_size, void *_done_ctr){ int *done_ctr = (int *)_done_ctr; *done_ctr = 1; } TIC_AMLONG(sparse_largeGather_reply, 1, 2, (token,addr,nbytes, TIC_AMRECV_PTR32(a0) ), (token,addr,nbytes, TIC_AMRECV_PTR64(a0, a1))); /* ------------------------------------------------------------------------------------ */ extern void sparse_gather_pipeline(void *tgt_data_list, void **remote_addr_list, int remote_box, int num_elem, int elem_sz, int atomic_elements) { volatile int done_ctr = 0; assert(tgt_data_list && remote_addr_list && remote_box >= 0 && remote_box < BOXES && num_elem > 0 && elem_sz > 0); if (num_elem * sizeof(void *) <= TIC_AM_MAX_MEDIUM && num_elem * elem_sz <= TIC_AM_MAX_MEDIUM) { /* fast case - everything fits in a medium msg */ tic_AMRequestI(5,7,(remote_box, TIC_AMIDX(sparse_simpleGather_request), remote_addr_list, num_elem * sizeof(void *), num_elem, elem_sz, TIC_AMSEND_PTR(tgt_data_list), TIC_AMSEND_PTR(&done_ctr), atomic_elements)); tic_poll_until(done_ctr); } else { /*pipeline gather requests */ int total = num_elem; int loadSize; int ptrLimit; int dataLimit; int count = 0; int messageCount; #if AM2_HAS_HUGE_SEGMENTS void * volatile remoteAllocBuf = NULL; void *curAllocBuf = NULL; int bufferSize; int loadSizeInBytes; int bufferPadding; int lastLoadSize; ptrLimit = TIC_AM_MAX_LONG_REQUEST / sizeof(void *); dataLimit = TIC_AM_MAX_LONG_REPLY / elem_sz; #else ptrLimit = TIC_AM_MAX_MEDIUM / sizeof(void *); dataLimit = TIC_AM_MAX_MEDIUM / elem_sz; #endif if (ptrLimit > dataLimit){ loadSize = dataLimit; } else{ loadSize = ptrLimit; } #if AM2_HAS_HUGE_SEGMENTS loadSizeInBytes = loadSize*(sizeof(void *) + elem_sz); bufferPadding = loadSizeInBytes % sizeof(void *); if (bufferPadding != 0){ bufferPadding = sizeof(void *) - bufferPadding; } loadSizeInBytes += bufferPadding; lastLoadSize = num_elem % loadSize; if (lastLoadSize == 0){ messageCount = num_elem / loadSize; bufferSize = loadSizeInBytes * messageCount; } else{ messageCount = num_elem / loadSize + 1; bufferSize = loadSizeInBytes * (messageCount - 1) + lastLoadSize * (sizeof(void *) + elem_sz); } if (bufferSize <= tic_prealloc){ /* use preallocate buffer if one is available */ remoteAllocBuf = get_remote_buffer(bufferSize, remote_box); } else{ /* get a buffer for this gather use only */ tic_AMRequest(2,3,(remote_box, TIC_AMIDX(misc_alloc_request), bufferSize, TIC_AMSEND_PTR(&remoteAllocBuf))); tic_poll_until(remoteAllocBuf); } curAllocBuf = remoteAllocBuf; #else if (num_elem % loadSize == 0){ messageCount = num_elem / loadSize; } else{ messageCount = num_elem / loadSize + 1; } #endif { int i; int volatile * done_ctr_array = (int *)ti_malloc_atomic_uncollectable(sizeof(int) * messageCount); int *curCtr; for (i = 0; i < messageCount; i++){ done_ctr_array[i] = 0; } while (total > 0){ int currentLoadSize; if (total >= loadSize){ currentLoadSize = loadSize; } else{ currentLoadSize = total; } curCtr = (int *) (done_ctr_array+count); #if AM2_HAS_HUGE_SEGMENTS tic_AMRequestXfer(5,7,(remote_box, (jUIntPointer)(((char*)curAllocBuf)-TIC_AM_SEGOFFSET), TIC_AMIDX(sparse_largeGather_request), (void *) (remote_addr_list + count*loadSize), currentLoadSize*sizeof(void*), currentLoadSize, elem_sz, TIC_AMSEND_PTR(((char *) tgt_data_list + count*elem_sz*loadSize)), TIC_AMSEND_PTR(curCtr), atomic_elements)); curAllocBuf = (void *) (((char *) curAllocBuf) + loadSizeInBytes); #else tic_AMRequestI(5,7,(remote_box, TIC_AMIDX(sparse_simpleGather_request), (void *) (remote_addr_list + count*loadSize), currentLoadSize * sizeof(void *), currentLoadSize, elem_sz, TIC_AMSEND_PTR(((char *) tgt_data_list + count*elem_sz*loadSize)), TIC_AMSEND_PTR(curCtr), atomic_elements)); #endif total -= currentLoadSize; count++; } for (i = 0; i < messageCount; i++){ tic_poll_until(done_ctr_array[i]); } ti_free((void *) done_ctr_array); #if AM2_HAS_HUGE_SEGMENTS if (bufferSize > tic_prealloc){ tic_AMRequest(1,2,(remote_box, TIC_AMIDX(misc_delete_request), TIC_AMSEND_PTR(remoteAllocBuf))); } #endif } } } /* ------------------------------------------------------------------------------------ */ void sparse_gather_serial(void *tgt_data_list, void **remote_addr_list, int remote_box, int num_elem, int elem_sz, int atomic_elements) { volatile int done_ctr = 0; assert(tgt_data_list && remote_addr_list && remote_box >= 0 && remote_box < BOXES && num_elem > 0 && elem_sz > 0); if (num_elem * sizeof(void *) <= TIC_AM_MAX_MEDIUM && num_elem * elem_sz <= TIC_AM_MAX_MEDIUM) { /* fast case - everything fits in a medium msg */ tic_AMRequestI(5,7,(remote_box, TIC_AMIDX(sparse_simpleGather_request), remote_addr_list, num_elem * sizeof(void *), num_elem, elem_sz, TIC_AMSEND_PTR(tgt_data_list), TIC_AMSEND_PTR(&done_ctr), atomic_elements)); tic_poll_until(done_ctr); } else { /* slower case - need multiple messages */ int offset = num_elem * sizeof(void *); int datasz = offset + num_elem * elem_sz; void * volatile remoteAllocBuf = NULL; if (datasz <= tic_prealloc){ /* use a preallocated buffer if one is available */ remoteAllocBuf = get_remote_buffer(datasz, remote_box); } else{ /* Allocate a buffer to hold the array data on the remote side. */ tic_AMRequest(2,3,(remote_box, TIC_AMIDX(misc_alloc_request), datasz, TIC_AMSEND_PTR(&remoteAllocBuf))); tic_poll_until(remoteAllocBuf); } /* Transfer the addresses to the buffer with libtic (this could be optimized somewhat) */ ti_bulk_write(remote_box, remoteAllocBuf, remote_addr_list, offset); /* Tell the remote side to gather the data. */ tic_AMRequest(4,6,(remote_box, TIC_AMIDX(sparse_generalGather_request), TIC_AMSEND_PTR(remoteAllocBuf), num_elem, elem_sz, TIC_AMSEND_PTR(&done_ctr))); tic_poll_until(done_ctr); /* Transfer the data from the buffer with libtic (this could be optimized somewhat) */ ti_bulk_read(tgt_data_list, remote_box, ((char *)remoteAllocBuf)+offset, num_elem * elem_sz, (atomic_elements?tic_no_ptrs:tic_gp_only)); /* (handles gp escape too) */ if (datasz > tic_prealloc){ tic_AMRequest(1,2,(remote_box, TIC_AMIDX(misc_delete_request), TIC_AMSEND_PTR(remoteAllocBuf))); } } } /* ------------------------------------------------------------------------------------ */ void sparse_gather(void *tgt_data_list, void **remote_addr_list, int remote_box, int num_elem, int elem_sz, int atomic_elements) { if (tic_pipelining){ sparse_gather_pipeline(tgt_data_list, remote_addr_list, remote_box, num_elem, elem_sz, atomic_elements); } else{ sparse_gather_serial(tgt_data_list, remote_addr_list, remote_box, num_elem, elem_sz, atomic_elements); } } /* ------------------------------------------------------------------------------------ */ TI_INLINE(sparse_simpleGather_request) void sparse_simpleGather_request(tic_amtoken_t token, void *_addr_list, size_t addr_list_size, int num_elem, int elem_sz, void *_tgt_data_list, void *_done_ctr, int atomic_elements) { int i; void **addr_list = (void**)_addr_list; int datasz = num_elem * elem_sz; char *data = ti_malloc_handlersafe(datasz); assert(addr_list_size == sizeof(void *)*num_elem); FAST_PACK(elem_sz, data, addr_list); #if defined(USE_DISTRIBUTED_GC) && !defined(USE_GC_NONE) /* inform distributed GC that some pointers may be escaping */ if (!atomic_elements) dgc_ptr_esc_all_gps(data, datasz, 1); #endif tic_AMReplyI(4,6,(token, TIC_AMIDX(sparse_simpleGather_reply), data, datasz, num_elem, elem_sz, TIC_AMSEND_PTR(_tgt_data_list), TIC_AMSEND_PTR(_done_ctr))); ti_free_handlersafe(data); } TIC_AMMEDIUM(sparse_simpleGather_request, 5, 7, (token,addr,nbytes, a0, a1, TIC_AMRECV_PTR32(a2), TIC_AMRECV_PTR32(a3), a4), (token,addr,nbytes, a0, a1, TIC_AMRECV_PTR64(a2, a3), TIC_AMRECV_PTR64(a4, a5), a6)); /* ------------------------------------------------------------------------------------ */ TI_INLINE(sparse_simpleGather_reply) void sparse_simpleGather_reply(tic_amtoken_t token, void *data_list, size_t data_list_size, int num_elem, int elem_sz, void *_tgt_data_list, void *_done_ctr) { void *tgt_data_list = (void *)_tgt_data_list; int *done_ctr = (int *)_done_ctr; assert(data_list_size == num_elem * elem_sz); memcpy(tgt_data_list, data_list, data_list_size); *done_ctr = 1; } TIC_AMMEDIUM(sparse_simpleGather_reply, 4, 6, (token,addr,nbytes, a0, a1, TIC_AMRECV_PTR32(a2), TIC_AMRECV_PTR32(a3) ), (token,addr,nbytes, a0, a1, TIC_AMRECV_PTR64(a2, a3), TIC_AMRECV_PTR64(a4, a5))); /* ------------------------------------------------------------------------------------ */ TI_INLINE(sparse_generalGather_request) void sparse_generalGather_request(tic_amtoken_t token, void *addr_data_list, int num_elem, int elem_sz, void *_done_ctr) { int i; void **addr_list = (void**)addr_data_list; int offset = num_elem * sizeof(void *); char *data = ((char*)addr_data_list) + offset; FAST_PACK(elem_sz, data, addr_list); tic_AMReply(1,2,(token, TIC_AMIDX(sparse_done_reply), TIC_AMSEND_PTR(_done_ctr))); } TIC_AMSHORT(sparse_generalGather_request, 4, 6, (token, TIC_AMRECV_PTR32(a0), a1, a2, TIC_AMRECV_PTR32(a3) ), (token, TIC_AMRECV_PTR64(a0, a1), a2, a3, TIC_AMRECV_PTR64(a4, a5))); /* ------------------------------------------------------------------------------------ */ /* read environment variables prealloc and pipelining */ void gather_init(){ char *preallocstr; char *pipeliningstr; preallocstr = (char *) getenvMaster("TI_PREALLOC"); if (preallocstr == NULL){ tic_prealloc = 0; } else{ tic_prealloc = atoi(preallocstr); assert(tic_prealloc >= 0); if (tic_prealloc > 0){ buffer_init(); /* convert KB to bytes */ tic_prealloc *= 1024; } } pipeliningstr = (char *) getenvMaster("TI_PIPELINING"); if (pipeliningstr == NULL){ tic_pipelining = 0; } else{ tic_pipelining = atoi(pipeliningstr); } } #endif /* COMM_AM2 */