#include "lapi_comm.h" /* LAPI-AM handlers to perform bulk array copies. */ /* Helpful note: LAPI AM differs in design from Berkeley AM. There is no request-reply structure; you send a message and that's it. If you want a reply back, you need to send the address of some memory where a reply can be put. LAPI AM also has this notion of a "header handler" and "completion handler". When you send an AM, you send over two pieces of data; a user-defined header, which has to be small, and a chunk of data, which can be large. This is like Berkeley AM having AM function parameters as well as piggybacked data. The header handler runs first, and it is given the header data only. When the header handler returns, the system pulls in all the data, then calls the completion handler. The job of the header handler is to provide buffer space where the incoming data can be put, and to set up data that the completion handler needs to do your real work, such as the address of the buffer space. */ /* A get from a remote node works as follows: The local node sends a "pack" AM to the remote node telling it to pack up the array data it wants. The local node must tell the remote the address of the pack method, and give it the array descriptor and the rectdomain that the pack operation is to be performed on. The remote node will allocate a buffer, call the pack method to fill in the buffer, and send the buffer to the local node with an "pack-reply" AM. The local node spins in a polling loop until this reply message comes in, and then calls its local unpack method to unpack the remote data into the local array. */ typedef struct { void *pack_method; void *copy_desc; int sender; void *put_here; } pack_hdr_t; int reply_ready; void *reply_data; typedef void *(*pack_method_t)(void *, int *, void *); void *pack_reply_hdr_handler(lapi_handle_t *hndl, void *uhdr, int *uhdr_len, int *msg_len, compl_hndlr_t **comp_h, void **user_info); /* Called when the copy descriptor shows up, which happens sometime after pack_hdr_handler is called. */ void pack_comp_handler(lapi_handle_t *hndl, void *udata) { pack_hdr_t *pack_hdr = (pack_hdr_t *)udata; void *array_data; int array_data_len; lapi_cntr_t comp_cntr; int reply; int i; pack_method_t pack_method = (pack_method_t)pack_hdr->pack_method; /* Call the pack method with the pack_hdr we are given. WARNING: LAPI states that you can't assume that SPMD programs will have mirrored code addresses; LAPI wants you to exchange all addresses. I have not actually found a case where addresses are not mirrored, however. */ /* printf("%d: In pack_comp_handler: calling %x.\n", MYPROC, pack_method); */ array_data = (*pack_method)(pack_hdr->copy_desc, &array_data_len, NULL); comp_cntr = 0; LAPI_Put(*hndl, pack_hdr->sender, array_data_len, pack_hdr->put_here, array_data, NULL, NULL, &comp_cntr); LAPI_WAIT(comp_cntr); free(pack_hdr->copy_desc); free(pack_hdr); free(array_data); } /* Part 1 of the LAPI two-step dance. Accepts a pack_hdr, which has the address of the array's pack routine, the sender ID, the place on the sender where the data should be placed, and the signal flag on the sender that needs to be tripped when the data transfer is complete. */ void *pack_hdr_handler(lapi_handle_t *hndl, void *uhdr, int *uhdr_len, int *msg_len, compl_hndlr_t **comp_h, void **user_info) { pack_hdr_t *pack_hdr = (pack_hdr_t *)malloc(sizeof(pack_hdr_t)); /* The user data header on LAPI (the pack_hdr) doesn't stay around. Since the completion handler needs it, we need to copy it. */ memcpy(pack_hdr, uhdr, sizeof(pack_hdr_t)); /* Allocate space for the copy descriptor, put a pointer to the space in the header, and tell LAPI to pass the header on to the completion handler. */ /* printf("%d: In pack_hdr_handler.\n", MYPROC); printf("%d: Got header: %x %d\n", MYPROC, pack_hdr->pack_method, pack_hdr->sender);*/ pack_hdr->copy_desc = (void *)malloc(*msg_len); *user_info = (void *)pack_hdr; *comp_h = pack_comp_handler; return pack_hdr->copy_desc; /* Control passes over to the comp handler. */ } int pack_buf[1000000]; int hdr_buf[900]; void *fastpack_hdr_handler(lapi_handle_t *hndl, void *uhdr, int *uhdr_len, int *msg_len, compl_hndlr_t **comp_h, void **user_info) { int array_data_len; void *array_data; double startTime, endTime; /* The first word in the header is the address of the pack method. The rest of the header is the copy descriptor. */ pack_method_t pack_method = *((pack_method_t *)uhdr); array_data = (*pack_method)((void *)((int)uhdr + sizeof(void *)), &array_data_len, pack_buf); *user_info = NULL; *comp_h = NULL; return NULL; } void fast_get_array(void *pack_method, void *copy_desc, int copy_desc_size, int tgt_node, void *buffer, int data_size) { lapi_cntr_t comp_cntr; double startTime, endTime; COMMVIEW_START; *((pack_method_t *)hdr_buf) = (pack_method_t)pack_method; memcpy((void *)((int)hdr_buf + sizeof(pack_method_t)), copy_desc, copy_desc_size); comp_cntr = 0; LAPI_Amsend(lapi_hndl, tgt_node, (void *)fastpack_hdr_handler, hdr_buf, copy_desc_size + sizeof(pack_method_t), NULL, NULL, NULL, NULL, &comp_cntr); LAPI_WAIT(comp_cntr); /* LAPI_Waitcntr(lapi_hndl, &comp_cntr, 1, NULL); */ comp_cntr = 0; LAPI_Get(lapi_hndl, tgt_node, data_size, pack_buf, buffer, NULL, &comp_cntr); LAPI_WAIT(comp_cntr); /* LAPI_Waitcntr(lapi_hndl, &comp_cntr, 1, NULL); */ COMMVIEW_END(MYPROC, tgt_node); } /* Gets a Titanium array from a remote node. The data shows up packed into a contiguous buffer (which is the *buffer parm) and must be unpacked. */ void get_array(void *pack_method, void *copy_desc, int copy_desc_size, int tgt_node, void *buffer) { pack_hdr_t pack_hdr; lapi_cntr_t comp_cntr; COMMVIEW_START; pack_hdr.pack_method = pack_method; pack_hdr.sender = MYPROC; pack_hdr.put_here = buffer; comp_cntr = 0; LAPI_Amsend(lapi_hndl, tgt_node, (void *)pack_hdr_handler, &pack_hdr, sizeof(pack_hdr_t), copy_desc, copy_desc_size, NULL, NULL, &comp_cntr); LAPI_WAIT(comp_cntr); COMMVIEW_END(MYPROC, tgt_node); } /* A put to a remote node works like this: The sender sends over an active message with the address of the unpack handler, its node number, the address of a reply location, and the data. The receiver receives the data, calls the unpack handler on the data, and sends a reply back to the reply location. */ typedef struct { void *unpack_method; int copy_desc_size; void *data; int sender; } unpack_hdr_t; typedef void *(*unpack_method_t)(void *, void *); /* Called after the copy descriptor and data show up. */ void unpack_comp_handler(lapi_handle_t *hndl, void *udata) { unpack_hdr_t *unpack_hdr = (unpack_hdr_t *)udata; void *data_ptr, *copy_desc, *array_data; int array_data_len; lapi_cntr_t comp_cntr; int reply; unpack_method_t unpack_method = (unpack_method_t)unpack_hdr->unpack_method; data_ptr = unpack_hdr->data; copy_desc = data_ptr; array_data = (void *)((int)data_ptr + unpack_hdr->copy_desc_size); /* Call the unpack method with the pack_hdr we are given. WARNING: LAPI states that you can't assume that SPMD programs will have mirrored code addresses; LAPI wants you to exchange all addresses. I have not actually found a case where addresses are not mirrored, however. */ (*unpack_method)(copy_desc, array_data); free(unpack_hdr->data); free(unpack_hdr); } void *unpack_hdr_handler(lapi_handle_t *hndl, void *uhdr, int *uhdr_len, int *msg_len, compl_hndlr_t **comp_h, void **user_info) { unpack_hdr_t *unpack_hdr = (unpack_hdr_t *)malloc(sizeof(unpack_hdr_t)); /* The user data header on LAPI (the unpack_hdr) doesn't stay around. Since the completion handler needs it, we need to copy it. */ memcpy(unpack_hdr, uhdr, sizeof(unpack_hdr_t)); /* Allocate space for the array data, put a pointer to the space in the header, and tell LAPI to pass the header on to the completion handler. */ unpack_hdr->data = (void *)malloc(*msg_len); *user_info = (void *)unpack_hdr; *comp_h = unpack_comp_handler; return unpack_hdr->data; } void put_array(void *unpack_method, void *copy_desc, int copy_desc_size, void *array_data, int array_data_size, int tgt_node) { unpack_hdr_t unpack_hdr; lapi_cntr_t org_cntr; void *data; int i; COMMVIEW_START; data = (void *) malloc(copy_desc_size + array_data_size); memcpy(data, copy_desc, copy_desc_size); memcpy((void *)((int)data + copy_desc_size), array_data, array_data_size); unpack_hdr.unpack_method = unpack_method; unpack_hdr.sender = MYPROC; unpack_hdr.copy_desc_size = copy_desc_size; org_cntr = 0; LAPI_Amsend(lapi_hndl, tgt_node, (void *)unpack_hdr_handler, &unpack_hdr, sizeof(pack_hdr_t), data, copy_desc_size + array_data_size, NULL, NULL, &org_cntr); /* LAPI_Waitcntr(lapi_hndl, &org_cntr, 1, NULL); */ free(data); COMMVIEW_END(MYPROC, tgt_node); }