Regular-Sampling-Quick-Sorting (MPI)

A. First Edition
This is simple practicing because I want to prove something.
B.The problem

Actually I want to say that this is intellectually shallow because it is a simple variant of merge-sort which is one of external sorting algorithm used extensively in database to sort very big list which cannot be placed totally in memory.

 

C.The idea of program
 

Originally the definition of sorting in MPI is that lists are scattered around a series of working clusters before sorting. After sorting they still are scattered in those machines except that each segment is sorted and if one element in A is bigger than any element in B, then no element in A would not be smaller than any element in B. (Is it complicated? No, it just says segments are sorted.)

D.The major functions
I modify the algorithm a little bit because I think it is meaningless that the sorted lists are still segments 
existed in different machines. So, I require whole list will be assembled in one node finally.
1. firstPhase: sorting local segment list and sample elements in sorted list.
2. secondPhase: sampled elements are sent to one machine, say master node, and sorted then broadcasted to all
working node.
3. thirdPhase: each working node use these samples to slice sorted list into sub lists and send back the length 
of these sub lists back to master node so that master node can arrange to receive these sub list in corresponding
position in next phase.
4. fourthPhase: receiving these sub lists using temporary buffers . And merge these lists into one final array.
5. zeroPhase: in order to compare with speed of sorting with single machine, I invented this function.
E.Further improvement
 
F.File listing
1. quicksort.cpp
 
file name: quick.cpp
#include <stdlib.h>
#include <stdio.h>
#include <time.h>
#include <math.h>
#include "mpi.h"

int myRank;
int mySize;

const int SingleArrayLength=10000000;
const int MaxArrayElement=1000000;
const int MinArrayElement=100;
int* myArray;

const int WorkerNumber=7;

const int SampleOffset=SingleArrayLength/WorkerNumber;
const int StartSampleOffset=WorkerNumber/2-1;


int* sampleArray;
int**mergeBuf;

MPI_Request* sampleRequests;

const int SAMPLE=100;
const int PIVOT=101;
const int MaxIntegerPrintLength=10;

void myExit()
{
	//printf("\n\nrank %d finishes\n", myRank);
}


void printArray(int* array, int length, char* comment="Array print out:");

void printArray(int* array, int length, char* comment)
{
	char* buf;
	char temp[MaxIntegerPrintLength];
	int commentLength=0;
	commentLength=strlen(comment)+5;
	buf=new char[length*MaxIntegerPrintLength+commentLength];
	sprintf(buf, "rank[%d] %s:*****", myRank, comment);
	for (int i=0; i<length; i++)
	{
		sprintf(temp, "%d,", array[i]);
		strcat(buf, temp);
	}
	strcat(buf, "\n");
	printf(buf);
	delete []buf;
}


void initialize()
{
	int i;
	atexit(myExit);
	srand(myRank*time(0));
	if (myRank==0)
	{
		sampleArray=new int[WorkerNumber*WorkerNumber];
		sampleRequests=new MPI_Request[WorkerNumber];
		myArray=new int[SingleArrayLength*WorkerNumber];
		mergeBuf=new int*[WorkerNumber];
		for (i=0; i<WorkerNumber; i++)
		{
			mergeBuf[i]=new int[SingleArrayLength];
		}
	}
	else
	{
		sampleArray=new int[WorkerNumber];
		sampleRequests=new MPI_Request[1];
		//srand(time(0));
		myArray=new int[SingleArrayLength];
		for (i=0; i<SingleArrayLength; i++)
		{
			myArray[i]=rand()%MaxArrayElement+MinArrayElement;
		}
	}

	

}


int intComp(const void* first, const void* second)
{
	return *(int*)first - *(int*)second;
}

void zeroPhase()
{
	int i;
	double start, end;
	if (myRank==0)
	{
		for (i=0; i<WorkerNumber; i++)
		{
			sampleRequests[i]=MPI_REQUEST_NULL;
			MPI_Irecv(myArray+i*SingleArrayLength, SingleArrayLength, MPI_INT, i+1, 0, MPI_COMM_WORLD, sampleRequests+i);
		}
		MPI_Waitall(WorkerNumber, sampleRequests, MPI_STATUSES_IGNORE);
		start=MPI_Wtime();
		qsort(myArray, WorkerNumber*SingleArrayLength, sizeof(int), intComp);
		end=MPI_Wtime();
		printf("single machine sorting array of length %d takes %f\n",SingleArrayLength*WorkerNumber, end-start);
				
	}
	else
	{
		MPI_Send(myArray, SingleArrayLength, MPI_INT, 0, 0, MPI_COMM_WORLD);
	}
}		



void firstPhase()
{
	if (myRank!=0)
	{
		qsort(myArray, SingleArrayLength, sizeof(int), intComp);
		//printArray(myArray, SingleArrayLength, "worker data array print out");
		//retrieve samples
		for (int i=0; i<WorkerNumber; i++)
		{
			sampleArray[i]=myArray[i*SampleOffset];
		}		
	}
}


void secondPhase()
{
	int i;
	if (myRank==0)
	{
		for (i=0; i<WorkerNumber; i++)
		{
			MPI_Irecv(sampleArray+i*WorkerNumber, WorkerNumber, MPI_INT, i+1, SAMPLE, MPI_COMM_WORLD, sampleRequests+i);
			//MPI_Recv(sampleArray+i*WorkerNumber, WorkerNumber, MPI_INT, i+1, SAMPLE, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
		}
		MPI_Waitall(WorkerNumber, sampleRequests, MPI_STATUSES_IGNORE);
		qsort(sampleArray, WorkerNumber*WorkerNumber, sizeof(int), intComp);
		//printArray(myArray, SingleArrayLength, "worker's data array\n");
		/*
		for (i=0; i<WorkerNumber*WorkerNumber; i++)
		{
			printf("sample[%d]=%d\n", i, sampleArray[i]);
		}
		*/
		//printArray(sampleArray, WorkerNumber*WorkerNumber);
		

		for (i=1; i<WorkerNumber; i++)
		{
			sampleArray[i-1]=sampleArray[WorkerNumber*i+StartSampleOffset];			
		}
		MPI_Bcast(sampleArray, WorkerNumber-1, MPI_INT, 0, MPI_COMM_WORLD);
		//printArray(sampleArray, WorkerNumber-1, "this is the sampel data broadcasted");

	}
	else
	{
		MPI_Ssend(sampleArray, WorkerNumber, MPI_INT, 0, SAMPLE, MPI_COMM_WORLD);
		MPI_Bcast(sampleArray, WorkerNumber-1, MPI_INT, 0, MPI_COMM_WORLD);
	}
	/*
	for (i=0; i<WorkerNumber-1; i++)
	{
		printf("rank[%d][%d]=%d\n", myRank, i, sampleArray[i]);
	}
	*/
	
}

//it returns the smallest index of which the number is bigger than or equal to the key
int binarySearch(int key, int* array, int length)
{
	int front=0, end=length-1;
	if (key>array[end])
	{
		return length;
	}
	if (key<array[front])
	{
		return 0;
	}

	int pos=(front+end+1)/2;;
	while (front<=end)
	{		
		if (key>array[pos])
		{
			front=pos+1;
		}
		else
		{
			if (key<array[pos])
			{
				end=pos-1;
			}
			else
			{
				break;
			}
		}
		pos=(front+end+1)/2;
	}
	
	return pos;
}



void thirdPhase()
{
	int i, flag;
	//do binary search
	if (myRank!=0)
	{
		for (i=0; i<WorkerNumber-1; i++)
		{
			//printf("rank[%d]key=%d\n", myRank, sampleArray[i]);
			sampleArray[i]=binarySearch(sampleArray[i], myArray, SingleArrayLength);			
			//printf("rank[%d][%d]=%d and the data myArray[%d]=%d\n", myRank, i, sampleArray[i],sampleArray[i], myArray[sampleArray[i]] );//for testing
			//printf("before %d and after %d \n", myArray[sampleArray[i]-1], myArray[sampleArray[i]+1]); 
		}
		sampleArray[WorkerNumber-1]=SingleArrayLength;
		MPI_Ssend(sampleArray, WorkerNumber, MPI_INT, 0, PIVOT, MPI_COMM_WORLD);
		//printArray(sampleArray, WorkerNumber, "worker sampleArray print out in third phase");
	}
	else
	{
		for (i=0; i<WorkerNumber; i++)
		{
			sampleRequests[i]=MPI_REQUEST_NULL;
			MPI_Irecv(sampleArray+i*WorkerNumber, WorkerNumber, MPI_INT, i+1, PIVOT, MPI_COMM_WORLD, sampleRequests+i);
			//MPI_Recv(sampleArray+i*WorkerNumber, WorkerNumber, MPI_INT, i+1, PIVOT, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
			//sampleArray[(i+1)*WorkerNumber]=WorkerNumber;
		}
		//printArray(sampleArray, WorkerNumber*WorkerNumber, "master sampleArray print out in third phase");
		MPI_Waitall(WorkerNumber, sampleRequests, MPI_STATUSES_IGNORE);
		/*
		for (i=0; i<WorkerNumber*WorkerNumber; i++)
		{
			printf("sample[%d]=%d\n", i, sampleArray[i]);
		}
		*/
		
		
	}
}

void doMerge(int** mergeBuf, int* lengthArray,int& currentPos)
{
	int indexArray[WorkerNumber];
	int i, candidate, candidateIndex;
	bool beFirst=true, allOver=false;
	for (i=0; i<WorkerNumber; i++)
	{
		indexArray[i]=0;
	}
	do
	{		
		beFirst=true;
		allOver=true;
		for (i=0; i<WorkerNumber; i++)
		{
			if (indexArray[i]<lengthArray[i])
			{
				allOver=false;
				if (beFirst)
				{
					beFirst=false;
					candidate=mergeBuf[i][indexArray[i]];
					candidateIndex=i;
				}
				else
				{
					if (candidate>mergeBuf[i][indexArray[i]])
					{
						candidate=mergeBuf[i][indexArray[i]];
						candidateIndex=i;
					}
				}
			}
		}
		if (allOver)
		{
			break;
		}
		myArray[currentPos]=candidate;
		currentPos++;
		indexArray[candidateIndex]++;
	}
	while (true);
}
		
						
			
		


void fourthPhase()
{
	int i, j, flag;
	int* sizePtr;
	int* dataPtr;
	int currentPos=0;
	int previous, current;
	int length;
	//MPI_Request* tempRequests;
	int lengthArray[WorkerNumber];
	if (myRank==0)
	{
		//tempRequests=new MPI_Request[WorkerNumber*WorkerNumber];
		//printArray(sampleArray, WorkerNumber*WorkerNumber, "before 4th phase, let' see sample Array\n");
		for (i=0; i<WorkerNumber; i++)//the index of  worker node
		{
			for (j=0; j<WorkerNumber; j++)//the index within worker node index
			{
				sizePtr=sampleArray+j*WorkerNumber+i;
				if (i==0)
				{
					previous=0;	
					current=*sizePtr;
				}
				else
				{
					if (i==WorkerNumber-1)
					{
						current=SingleArrayLength;
					}
					else
					{
						current=*sizePtr;
					}
					previous=*(sampleArray+j*WorkerNumber+i-1);
				}
				//printf("\n current=%d, previous=%d\n", current, previous);
				lengthArray[j]=current - previous;
				//currentPos+=length;
				//dataPtr=myArray+currentPos;
				sampleRequests[j]=MPI_REQUEST_NULL;
				//printf("\nmaster begins\n");
				if (lengthArray[j]>0)
				{
					//MPI_Irecv(dataPtr, length, MPI_INT, j+1, j*10+i, MPI_COMM_WORLD, tempRequests+j*WorkerNumber+i);
					//printf("\nmaster begin to recv data from rank %d of length %d\n", j+1, lengthArray[j]);
					MPI_Irecv(mergeBuf[j], lengthArray[j], MPI_INT, j+1, j*10+i, MPI_COMM_WORLD, sampleRequests+j);
					//MPI_Recv(mergeBuf[j], lengthArray[j], MPI_INT, j+1, j*10+i, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
				}

				//printf("\nmaster after prints\n");

			}
			MPI_Waitall(WorkerNumber, sampleRequests, MPI_STATUSES_IGNORE);
			doMerge(mergeBuf, lengthArray, currentPos);
			/*
			printf("\nmaster after tests of %d\n", i+1 );
			for (int k=0; k<WorkerNumber; k++)
			{	
				//printf("\nmaster going to print %d\n", lengthArray[k]);			
				if (lengthArray[k]>0)
				{
					printArray(mergeBuf[k], lengthArray[k], "Master receive segment");
				}
			}
			*/

		}
		//MPI_Testall(WorkerNumber*WorkerNumber, tempRequests, &flag, MPI_STATUSES_IGNORE);
	}
	else
	{
		for (i=0; i<WorkerNumber; i++)
		{
			sizePtr=sampleArray+i;
			if (i==0)
			{
				previous=0;
				current=*sizePtr;
			}
			else
			{
				if (i==WorkerNumber-1)
				{
					current=SingleArrayLength;
				}
				else
				{
					current=*sizePtr;
				}
				previous=*(sampleArray+i-1);

			}
			dataPtr=myArray+previous;
			length=current-previous;
			currentPos+=length;
			if (length>0)
			{
				//printArray(dataPtr, length, "going to send segment");
				MPI_Send(dataPtr, length, MPI_INT, 0, (myRank-1)*10+i, MPI_COMM_WORLD);
			}
		}
	}
}



void testArray()
{
	int previous=myArray[0], current=myArray[0];
	for (int i=1; i<WorkerNumber*SingleArrayLength; i++)
	{		
		
		current=myArray[i];
		if (current<previous)
		{
			printf("sorting error at %d with %d > %d\n", i, previous, current);
			//exit(4);
		}
		previous=current;
		//printf("rank[%d][%d]=%d\n", myRank, i, myArray[i]);
	}
}

		





int main(int argc, char** argv)
{
	double start, end;
	MPI_Init(&argc, &argv);                 
   	MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
    	MPI_Comm_size(MPI_COMM_WORLD, &mySize);  
	initialize();
	
	zeroPhase();
	if (myRank==0)
	{
		start=MPI_Wtime();
	}


	firstPhase();
	//printf("\nfirst phase ends\n");

	secondPhase();	
	//printf("\nsecond phase ends\n");

	thirdPhase();
	//printf("\nthird phase ends\n");

	fourthPhase();

	//printf("\nfourth phase ends\n");

	
	if (myRank==0)
	{
		end=MPI_Wtime();
		printf("distributing system sorting takes %f\n", end-start);
	}
	

	


	return 0;
}
 
 
running result:
single machine sorting array of length 7000 takes 0.003865
distributing system sorting takes 0.901712
0.000u 0.012s 0:01.69 0.5% 0+0k 0+0io 0pf+0w

single machine sorting array of length 70000 takes 0.042404
distributing system sorting takes 0.899044
0.000u 0.008s 0:01.99 0.0% 0+0k 0+0io 0pf+0w


single machine sorting array of length 700000 takes 0.499217
distributing system sorting takes 0.818472
0.004u 0.012s 0:02.23 0.4% 0+0k 0+0io 0pf+0w


single machine sorting array of length 7000000 takes 10.278250
distributing system sorting takes 3.047682
0.000u 0.004s 0:18.02 0.0% 0+0k 0+0io 0pf+0w


single machine sorting array of length 70000000 takes 66.025549
distributing system sorting takes 13.169787
0.004u 0.004s 1:30.91 0.0% 0+0k 0+0io 0pf+0w
 


 
			
				 back.gif (341 bytes)       up.gif (335 bytes)         next.gif (337 bytes)