3. Communication

Athapascan-0 provides various functions to transfer data between threads of two different nodes. They allow the user to specify the origin or destination of the data, as well as what kind of data are to be transferred. There are blocking and non-blocking (also called asynchronous) communication functions, to take profit of the architectures that can compute and communicate at the same time.

The basic communication functions operate with typed data. All C language scalar basic data types are supplied to the user. A higher-level library, called Athapascan-0 Formats is available to express complex data, pipeline communications and remote memory accesses.

3.1. Requests

In both synchronous and asynchronous communication, a function call have a request descriptor between its arguments. In a synchronous call, it is used by the kernel to hold the context of the blocked thread until its communication is terminated. In an asynchronous call, it is used to store the current status of the communication (whether it is completed or not).

The completion of a send is achieved when the data to send from the user space were buffered or sent to the destination node by the underlying communication library. The sent data cannot be de-allocated or modified before the completion of the request. The completion of a receive is achieved when the data to receive were already stored in the user space. The received data cannot be read before the completion of the request.

After the completion of a communication, its associated request contains no useful information for Athapascan-0. It can be reused right after the return of the synchronous communication function that required it. Nevertheless, after the return of a receive call, the request contains the number of elements actually received and the source of the message. They can be obtained from the request by calling the function a0GetRequestData().

When the user has no intention to inspect the contents of the request after the completion of the communication, a NULL value can be used. The synchronous Athapascan-0 communication functions will use an automatic variable to control the request instead of requiring the user to pass his/her own's. The asynchronous functions called with a NULL request is allowed, but they cannot have their completion tested. The programmer must have other means to identify the end of the request (a reply message, for example).

3.2. Ports

The identification of the origin and the destination of a message is done using a port, a node number and a tag. In the process of sending a message, the port and the tag are collated to the data into a message and sent to the destination node. In the destination node, when a receive is performed, it will be completed at the arrival of a message whose port, tag and source node are the same as specified. For a given port, node and tag, the messages arrive in the same exact order in which they were sent.

Sender and receiver have to agree about which port and which tag to use. It is up to the programmer to choose the correct values, but Athapascan-0 provides some functionality to allow the programmer to use the same ports in different nodes. Because Athapascan-0 identifier generators are deterministic, all objects get the same identifier on all nodes if the initialization sequence is the same on all the nodes. It allows to create the same ports in different nodes at the same time.

There are two ways of creating ports: global or local creation. Note that it is the creation that is either local or global. Ports are always local to a given node. The global creation must be done between the calls of a0Init() and a0InitCommit(). The local creation can be performed anywhere in the program. The function a0NewPort() is used to perform both local or global creation.

If the parameter type is A0GlobalCreation, all the nodes must call it simultaneously, and the returned value will be the same to all nodes. Remember that the creation can be global only if a0NewPort() is called between a0Init() and a0InitCommit(). In that place, as all nodes are expected to execute the same exact code, and the port generation is deterministic.

int main(int argc, char *argv[])
{
  ...
  a0Init(&argc, &argv);
  a0NewPort(&Port1, 1, A0GlobalCreation); /* all create same port */
  a0InitCommit();

  /* node 0 sends an integer to node 1 */
  if (a0SelfNode == 0)
    a0Send(&Port1, /*node*/ 1, ...);

  /* node 1 receives an integer from node 0 */
  if (a0SelfNode == 1)
    a0Receive(&Port1, /*node*/ 0, ...);
}

If type is A0LocalCreation when calling a0NewPort(), the port created will never be returned by any other call of a0NewPort(), either in the same node or not. As ports are just identifiers, whose can be referenced by any thread of any node, a port can be created in a node and used in another one. Ports can then be passed inside a message or in the parameters to a remote thread creation.

A common mean of communication between two threads, when one creates the another, is to pass a port in the arguments of the created thread. A port would be created by parent thread and stored in a buffer, that being used to create a child thread remotely. This port is then known by both parent and child thread, whose can use it to communicate to each other.

a0tError my_service(a0tBuffer *input)
{
  ...
  a0Unpack(input, ..., &port);
  a0Receive(&port, /*node*/ 0, ...);
  ...
}

  ...
  a0NewPort(&port, 1, A0LocalCreation);
  a0Pack(buff, ..., &port);
  a0CreateRemoteThread(/*my_service at node 1*/, buff);
  a0Send(&port, /*node*/ 1, ...);

A thread usually communicates with its neighbors, whom it is convenient to designate by a single index. For this reason the communication functions have a port and a tag associated. In the allocation process, the program specifies the maximum number of tags to be used with the new port. The kernel creates as many consecutive ports as the number of tags asked and returns the number of the first port. All the communications identify their destinations using a base port number and a tag, the latter being added to the former to find the actual port number.

3.3. Synchronous Communication

The functions a0Send() and a0Receive() are the basic thread blocking communication operators. These operators allow communication between any threads in the parallel machine. By the means of these functions, any data can be exchanged between remote threads. Additional arguments to a remote thread creation can be passed and results can be received.

When a thread calls a synchronous communication function, it stays blocked until the communication is completed. This means that the user data can be freely accessed right after the return of a blocking call. All the other threads keep running without any interference when the communicating thread is blocked.

  ...
  /* node 0 sends an integer to node 1 */
  if (a0SelfNode == 0)
    a0Send(port, /*node*/ 1, 0, &req, a0_INT, &data, 1);

  /* node 1 receives an integer from node 0 */
  if (a0SelfNode == 1)
    a0Receive(port, /*node*/ 0, 0, &req, a0_INT, &data, 1);
  ...

3.4. Asynchronous Communication

Athapascan-0 provides non-blocking send and receive in order to exploit the ability of the underlying communication kernel to advance independent communications as soon as it is possible. Every call to an asynchronous communication function return immediately and the associated request holds the current status of the communication. The completion of such communication can be tested with the function a0TestRequest(). Similarly, the function a0WaitRequest() blocks the calling thread until the communication request is completed.

In the following example, a thread sends data asynchronously to several others, do some processing, and then wait for the completion of the communications to reuse the sent variables. Note that the function a0ISend()1 is used instead of a0Send().

  float Data[SIZE*NUMBER];
  ...
  a0tRequest Request[NUMBER];
  for(i=0; i<NUMBER ; i++)       /* asynchronous arguments spreading */
    a0ISend(&Port[i], Node[i], TAG, &Request[i],
            a0_FLOAT, &Data[SIZE*i], SIZE);
  ... compute ...
  /* wait for completion before reusing Data[] array */
  for(i=0; i<NUMBER ; i++)
    a0WaitRequest(&Request[i]);

The same construction is possible with a0IReceive(). In this case, the a0WaitRequest() must be done before using the received data. The main advantage of the asynchronous communication is to provide an easy way to deal with race-free, deadlock-free and buffer-size-independent communications.

  int Neighbor[4];               /* Four neighbors */
  a0tRequest Request[8];         /* 8 communications */
  ...
  for(i=0;i<4;i++) {             /* initialize the exchange */
    a0ISend(&Port, Neighbor[i], TAG1, &Request[i],
            &SendType, &SendData[i], count);
    a0IReceive(&Port, Neighbor[i], TAG2, &Request[i+4],
            &ReceiveType, &ReceiveData[i], count);
  }
  ...
  for(i=0; i<8; i++)             /* wait completion */
      a0WaitRequest(&Request[i]);

3.5. Buffers

When the data to send are stored in memory regions with variable offsets, that cannot be defined by a simple data type , the user can pack them into a buffer and then send the buffer. On the other side, the user receives data into a buffer and unpacks them. However, the user cannot send some data using a normal a0Send() and receive using a buffer neither vice-versa. It is also up to the user to certify that the unpacked data from the buffer will have the same type of the data packed in it.

A buffer must be defined before using it, with the function a0NewBuffer(). The function a0Pack() is used to pack data into a buffer and a0Unpack() to the unpack them. The functions to send and receive a buffer are respectively a0SendBuffer() and a0ReceiveBuffer(). There are also non-blocking functions, called a0ISendBuffer() and a0IReceiveBuffer().

  a0tBuffer Buf;
  float F[10];
  int I[20];
  ...
  a0NewBuffer(&Buf, A0SendBufferType, BUFSIZE);
  ...
  a0Pack(&Buf, a0_FLOAT, &F[0], 10);
  a0Pack(&Buf, a0_INT,   &I[0], 20);
  a0SendBuffer(&Port, Dest, Tag, &Buf);


1 I stands for immediate return.