| kvnsmnsn@hotmail.com 2007-09-29, 1:47 am |
| As an assignment for an Internet programming class I was assigned to
start a number of threads going generating at random votes for eleven
candidates, and sending them from one machine to another across a soc-
ket, and then have a program running on the destination machine that
took the votes (encoded in the first character of the transmitted mes-
sage) and started up one thread to read them into a queue and started
up a number of other threads to take the votes off the queue and in-
crement the total vote count for each of the eleven candidates.
I've verified that the program running on the source machine is work-
ing just fine. The votes are making it to my <doRead()> function, and
it's successfully putting them into the queue. But each of my
<counter> threads is getting stuck on either the <mtlExclsn> or the
<queueEmpty> semaphores. I don't understand that at all.
I initialize my <mtlExclsn> semaphore to one and my <queueEmpty> se-
maphore to zero, so it's understandable that my threads would get
stuck on one or the other. But when I signal on <queueEmpty> in my
<doRead()> function that should wake up whichever of my <counter>
threads had made it past the <mtlExclsn> semaphore, shouldn't it? And
yet as far as I can tell that never happens.
I'm also curious why <doRead()> succeeds in putting all fifteen votes
into the queue. Each time it does a wait on the <queueFull> semaphore
that should decrement its count (which was initialized to ten, the
size of the queue). So if the <counter> threads are never getting to
the point where they can signal on <queueFull> my <doRead()> should
fill the queue up in ten reads and get stuck. But that's not happen-
ing; instead it goes merrily on its way reading in all fifteen votes.
This has got me stumped. If anyone can take a look at my code and see
what the problem is and can point it out to me I'd be extremely grate-
ful.
---Kevin Simonson
"You'll never get to heaven, or even to LA,
if you don't believe there's a way."
from _Why Not_
### client ########################################
#################
#include <pthread.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define SOCKET_ERROR -1
#define NMBR_THREADS 3
#define VTS_PER_THRD 5
union semun
{ int val;
struct semid_ds* buf;
unsigned short* array;
struct seminfo* __buf;
};
int socketAccess;
int voteSocket;
void* generate ( void* pntr)
{
struct sembuf operations[ 1];
int itrtn;
char voteBuffer[ 2];
const int DIVISOR = RAND_MAX / 11;
operations[ 0].sem_num = 0;
operations[ 0].sem_flg = SEM_UNDO;
voteBuffer[ 1] = '\0';
for (itrtn = 0; itrtn < VTS_PER_THRD; itrtn++)
{ voteBuffer[ 0] = (char) (random() / DIVISOR);
operations[ 0].sem_op = -1;
semop( socketAccess, operations, 1);
write( voteSocket, voteBuffer, 2);
operations[ 0].sem_op = 1;
semop( socketAccess, operations, 1);
}
voteBuffer[ 0] = 'q';
operations[ 0].sem_op = -1;
semop( socketAccess, operations, 1);
write( voteSocket, voteBuffer, 2);
operations[ 0].sem_op = 1;
semop( socketAccess, operations, 1);
}
int main ( int argCount
, char** arguments)
{
if (argCount == 3)
{ pthread_t Generate[ NMBR_THREADS];
struct hostent* pHostInfo;
struct sockaddr_in Address;
long nHostAddress;
union semun argument;
key_t key = 1;
char finalBuffer[ 2];
int flags = 0700 | IPC_CREAT;
int thrd;
int result;
printf( "Creating vote socket.\n");
voteSocket = socket( AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (voteSocket == SOCKET_ERROR)
{ printf( "Couldn't make a socket.\n");
return 0;
}
pHostInfo = gethostbyname( arguments[ 1]);
memcpy( &nHostAddress, pHostInfo->h_addr, pHostInfo->h_length);
Address.sin_addr.s_addr = nHostAddress;
Address.sin_port = htons( atoi( arguments[ 2]));
Address.sin_family = AF_INET;
if ( connect( voteSocket, (struct sockaddr*) &Address,
sizeof( Address))
== SOCKET_ERROR)
{ printf( "Couldn't connect to host.\n");
return 0;
}
printf( "Creating vote semaphore.\n");
socketAccess = semget( key, 1, flags);
argument.val = 1;
semctl( socketAccess, 0, SETVAL, argument);
printf( "Creating threads.\n");
for (thrd = 0; thrd < NMBR_THREADS; thrd++)
{ pthread_create( Generate + thrd, NULL, &generate, NULL);
}
printf( "Threads running.\n");
for (thrd = 0; thrd < NMBR_THREADS; thrd++)
{ pthread_join( Generate[ thrd], (void *) &result);
}
printf( "All threads done.\n");
strcpy( finalBuffer, "x");
write( voteSocket, finalBuffer, 2);
close( socketAccess);
close( voteSocket);
}
else
{ printf( "Usage is\n ./VoteGenerate <counter-host> <counter-port>
\n");
}
}
### server ########################################
#################
#include <pthread.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define SOCKET_ERROR -1
#define NMBR_THREADS 3
union semun
{ int val;
struct semid_ds* buf;
unsigned short* array;
struct seminfo* __buf;
};
int queueEmpty;
int queueFull;
int mtlExclsn;
int candidateAccess[ 11];
int candidateCount[ 11];
int voteSocket;
char queue[ 10];
int nextRead = 0;
int nextWrite = 0;
void* doRead ( void* pntr)
{
struct sembuf operations_e[ 1], operations_f[ 1];
char pBuffer[ 2];
for (;;)
{ read( voteSocket, pBuffer, 2);
if (pBuffer[ 0] == 'x')
{ return NULL;
}
operations_f[ 0].sem_op = -1;
semop( queueFull, operations_f, 1); //
wait( queueFull)
queue[ nextWrite] = pBuffer[ 0];
nextWrite = (nextWrite + 1) % 10;
operations_e[ 0].sem_op = 1;
semop( queueEmpty, operations_e, 1); //
signal( queueEmpty)
}
}
void* count ( void* pntr)
{
struct sembuf operations_e[ 1], operations_f[ 1];
struct sembuf operations_me[ 1], operations_c[ 1];
char candidate;
for (;;)
{ operations_me[ 0].sem_op = -1;
semop( mtlExclsn, operations_me, 1); //
wait( mtlExclsn)
operations_e[ 0].sem_op = -1;
semop( queueEmpty, operations_e, 1); //
wait( queueEmpty)
candidate = queue[ nextRead];
nextRead = (nextRead + 1) % 10;
operations_f[ 0].sem_op = 1;
semop( queueFull, operations_f, 1); //
signal( queueFull)
operations_me[ 0].sem_op = 1;
semop( mtlExclsn, operations_me, 1); //
signal( mtlExclsn)
if (candidate == 'q')
{ return NULL;
}
operations_c[ 0].sem_op = -1;
semop( candidateAccess[ candidate], operations_c, 1); //
wait( ca[ cand])
candidateCount[ candidate]++;
operations_c[ 0].sem_op = 1;
semop( candidateAccess[ candidate], operations_c, 1); //
signal( ca[ cand])
}
}
int main ( int argCount
, char** arguments)
{
if (argCount == 2)
{ pthread_t reader;
pthread_t counter[ NMBR_THREADS];
struct hostent* pHostInfo;
struct sockaddr_in Address;
int nAddressSize = sizeof( struct sockaddr_in);
union semun argument;
key_t key = 1;
int flags = 0700 | IPC_CREAT;
int thrd;
int candidate;
void* result;
printf( "Creating listening and voting sockets.\n");
int listenSocket = socket( AF_INET, SOCK_STREAM, 0);
if (listenSocket == SOCKET_ERROR)
{ printf( "Couldn't make a socket.\n");
return 0;
}
Address.sin_addr.s_addr = INADDR_ANY;
Address.sin_port = htons( atoi( arguments[ 1]));
Address.sin_family = AF_INET;
if ( bind( listenSocket, (struct sockaddr *) &Address,
sizeof( Address))
== SOCKET_ERROR)
{ printf( "Couldn't connect to host.\n");
return 0;
}
getsockname
( listenSocket, (struct sockaddr *) &Address
, (socklen_t *) &nAddressSize);
if (listen( listenSocket, 5) == SOCKET_ERROR)
{ printf( "Couldn't listen.\n");
return 0;
}
voteSocket
= accept
( listenSocket, (struct sockaddr *) &Address
, (socklen_t *) &nAddressSize);
printf( "Creating queue semaphore and counting semaphores.\n");
queueEmpty = semget( key, 1, flags);
argument.val = 0;
semctl( queueEmpty, 0, SETVAL, argument);
queueFull = semget( key, 1, flags);
argument.val = 10;
semctl( queueFull, 0, SETVAL, argument);
mtlExclsn = semget( key, 1, flags);
argument.val = 1;
semctl( mtlExclsn, 0, SETVAL, argument);
for (candidate = 0; candidate < 11; candidate++)
{ candidateCount[ candidate] = 0;
candidateAccess[ candidate] = semget( key, 1, flags);
argument.val = 1;
semctl( candidateAccess[ candidate], 0, SETVAL, argument);
}
printf( "Creating threads.\n");
pthread_create( &reader, NULL, &doRead, NULL);
for (thrd = 0; thrd < NMBR_THREADS; thrd++)
{ pthread_create( counter + thrd, NULL, &count
, (void *) thrd);
}
printf( "Threads running.\n");
pthread_join( reader, &result);
for (thrd = 0; thrd < NMBR_THREADS; thrd++)
{ pthread_join( counter[ thrd], &result);
}
printf( "All threads done.\n\n");
for (candidate = 0; candidate < 11; candidate++)
{ printf
( "Candidate #%2d got %3d votes.\n", candidate
,
candidateCount[ candidate]);
close( candidateAccess[ candidate]);
}
close( queueEmpty);
close( queueFull);
close( mtlExclsn);
close( voteSocket);
}
else
{ printf( "Usage is\n ./VoteCount <counter-port>\n");
}
}
|