#include "sopnamsp.h" #include "zthread.h" #include #include #include #include #include #include #include #include "timing.h" /* --------------------------------------------------------- */ /* ------ -Programme de test des tuyaux unix (pipe) ------ */ /* Avec creation de threads (les ZThread de Sophya) */ /* Ou avec creation de process (fork()) */ /* Octobre 2003 - C. Magneville, R. Ansari */ /* --------------------------------------------------------- */ /* Identificateur de fichier de pipe */ /* pipe_desc[0] : lecture pipe_desc[1] : ecriture */ static int pipe_desc[2]; /* fonction a executer dans le thread 1 ou process 1 */ void write_to_pipe(void * arg); /* fonction a executer dans le thread 2 ou process 2 */ void read_frpipe(void * arg); /* Tailles en nombre de double */ static int NSample; /* Nombre total d'echantillons */ static int NBlk1; /* Taille de paquet en ecriture (Thr1 ou Proc1) */ static int NBlk2; /* Taille de paquet en lecture (Thr2 ou Proc2) */ static double DeltaX; /* pas du calcul de cosinus */ int main(int narg, char *arg[]) { if ((narg < 5) || ((narg>1)&&(strcmp(arg[1],"-h")==0)) ) { cout << " Usage: tpipe T/P NSample Blk1 Blk2 \n " << " T --> Threads , P --> 2 process \n" << " NSample: Nb total d'echantillons \n" << " NBlk1: Taille de paquet en ecriture \n" << " NBlk2: Taille de paquet en lecture \n" << endl; return(0); } if ((*arg[1] != 'T') && (*arg[1] != 'P')) { cout << " tpipe: arg erreur / specifiez T ou P , tpipe -h pour aide \n" << endl; return 1; } char proc_or_thread = *arg[1]; NSample = atoi(arg[2]); NBlk1 = atoi(arg[3]); NBlk2 = atoi(arg[4]); double DeltaX = 50.*M_PI/(double)NSample; cout << " =============== tpipe (Pipe Test Program) =================" << endl; if (proc_or_thread == 'T') cout << " ----> avec Threads " << endl; else cout << " ----> avec fork() / process " << endl; cout << " -- NSample= " << NSample << " NBlk1=" << NBlk1 << " NBlk2=" << NBlk2 << endl; cout << " ============================================================\n" << endl; InitTim(); cout << ">> tpipe: Opening pipe ... " << endl; int rc = pipe(pipe_desc); if (rc != 0) { cout << " ERREUR creation pipe !" << endl; return 99; } if (proc_or_thread == 'T') { cout << ">> tpipe: Creation des Threads ... " << endl; ZThread zt1; zt1.setAction(write_to_pipe, arg[1]); ZThread zt2; zt2.setAction(read_frpipe, arg[2]); cout << ">> tpipe: Start des Threads Z1 et Z2 ... " << endl; zt1.start(); zt2.start(); cout << ">> tpipe: Attente z1/2.join() ... " << endl; zt1.join(); zt2.join(); cout << ">> tpipe: Thread Z1 and Z2 joined " << endl; PrtTim("EndOftpipe_Threads "); } else { cout << ">> tpipe: fork() - creation de process ... " << endl; char zzz[32]; strcpy(zzz,"toto"); pid_t rcf = fork(); if (rcf == 0) { cout << ">> tpipe: Processus FILS - PID= " << getpid() << " -> execution de read_frpipe()" << endl; read_frpipe(zzz); cout << ">> tpipe: FIN execution read_frpipe() ds FILS - PID= " << getpid() << endl; PrtTim("EndOffun_Fils "); } else { cout << ">> tpipe: Processus PERE - PID= " << getpid() << " -> execution de read_frpipe()" << endl; write_to_pipe(zzz); cout << ">> tpipe: FIN execution write_to_pipe() ds PERE - PID= " << getpid() << endl; PrtTim("EndOffun_Pere "); cout << ">> tpipe: Attente processus fils PID=" << rcf << endl; int status_procs[20]; waitpid(rcf, status_procs, 0); cout << ">> tpipe: Status fils= " << status_procs[0] << endl; } } char buff[128]; sprintf(buff,"EndOftpipe-PID=%d ",getpid()); PrtTim(buff); cout << " ======= Fin du programme tpipe (PID= " << getpid() << ") ======= \n" << endl; return(0); } /* fonction a executer dans le thread 1 ou process 1 : write to pipe */ void write_to_pipe(void *arg) { double * d = new double[NBlk1]; int ns = 0; cout << " ====> fonction write_to_pipe : START write to pipe ... " << endl; for(int k=0; k NSample) nwrt = NSample-k; write(pipe_desc[1], d, nwrt*sizeof(double)); ns += nwrt; } cout << " ====> fonction write_to_pipe : END write to pipe NSample= " << ns << endl; delete[] d; return; } /* fonction a executer dans le thread 2 ou process 2 : read from pipe */ void read_frpipe(void *arg) { double * d = new double[NBlk2]; char * cd = (char *)d; char * ccd = cd; int ns = 0; int nerr = 0; int nwait = 0; cout << " ====> fonction read_frpipe : START read from pipe ... " << endl; int kkr = -1; for(int k=0; k NSample) nrd = NSample-k; ssize_t nrdok = read(pipe_desc[0], d, nrd*sizeof(double)); if (nrdok != nrd*sizeof(double)) { int nrdc = nrd*sizeof(double)-nrdok; ccd = cd + nrdok; while (nrdc != 0) { nwait++; usleep(1000); nrdok = read(pipe_desc[0], ccd, nrdc); nrdc -= nrdok; ccd += nrdok; } } ns += nrd; for(int i=0; i 1.e-39) nerr++ ; } } cout << " ====> fonction read_frpipe : END read from pipe NSample= " << ns << " NErr=" << nerr << endl; cout << " ====> fonction read_frpipe NWait= " << nwait << endl; delete[] d; return; }