Go4Expert

Go4Expert (http://www.go4expert.com/)
-   C (http://www.go4expert.com/forums/c/)
-   -   Synchronizing two threads (http://www.go4expert.com/forums/synchronizing-two-threads-t18753/)

kami1219 30Jul2009 20:25

Synchronizing two threads
 
Hi I have two threads. One thread (capture) captures packets on NIC-1 and store them in a circular linked list and the other thread (sender) retrieves these packets from the list and sends the packet through NIC-2 and frees the memory. I need proper synchronization of these threads using semphore or mutex locks. Here I am facing problem to implement the mutex lock properly.

Need your urgent help in completing this task. Code is given below. Thanks in advance.



Code:

pthread_mutex_t mutex2=PTHREAD_MUTEX_INITIALIZER;

void* capture(void* ptr)
{
  printf("I am inside the capture thread to capture packets \n");

  struct sockaddr_ll packet_info;
  int packet_info_size = sizeof(packet_info);
  struct timeval time2;
  union semun args;
  capProcess *cProc;
  int sd;
  char* nic;
  extern int semaphore;
  int pktCnt;

  cProc=(capProcess*)ptr;
  sd=cProc->sd;
  nic=cProc->nic;
  semaphore=cProc->semaphore;
  pktCnt=cProc->pktCnt;
  int len1;
  // int pktCnt=1;
 
  printf("CAP1 THREAD--sd is %d \n",sd);
 
  // wPos=0;
  int  packets_to_sniff=10000000;
  args.val=1;
  unsigned char *pkt=(unsigned char *)malloc(2048*(sizeof(unsigned char *)));
 
 
    while( packets_to_sniff--)
    {

      if((len1=recvfrom(sd,pnode->pkt,2048,0,(struct sockaddr*)&packet_info,&packet_info_size)) == -1)
    {
      perror("Recvfrom() returned -1");
      exit(-1);
    }
      else
    {
     
      ioctl(sd,SIOCGSTAMP,&time2);


      pthread_mutex_lock(&mutex2);

          pnode->pkt_id=pktCnt;
          printf("packet number is %d \n",pnode->pkt_id);
          //      memcpy(&pnode->pkt,&pkt,len1);
          pnode->pkt_len=len1;
          printf("packet of length %d bytes received \n",pnode->pkt_len);
          pnode->ts.tv_sec=time2.tv_sec;
          printf("packet of time seconds are  %ld  \n",pnode->ts.tv_sec);
          pnode->ts.tv_usec=time2.tv_usec;
          printf("packet of time micro seconds are  %ld  \n",pnode->ts.tv_usec);
          pnode->wPos=wPos;
          rPos=wPos-3;
          pnode->rPos=rPos;
          pnode->wFlag=1;
          pnode->rFlag=0;
          char s[20];
          gettimeofday(&time2);     
          wPos++;
          //  }


      /* current system clocke time in hour-min-sec*/
      time_t rawtime;
      struct tm *timeinfo,*timeinfo2;
      char buffer[80],hr[10],min[10],sec[10];
      int str;int cnt;

      time(&rawtime);
      timeinfo=localtime(&rawtime);
      strftime(hr,10,"%I",timeinfo);
      strftime(min,10,"%M",timeinfo);
      strftime(sec,10,"%S",timeinfo);

      if(strcmp(min,"00")==0)
        {
          printf("New Hour starts \n");
          cnt=0;
        }
      else
        {
          int whichsecond;
          int mins, secs;
          mins = atoi(min);
          printf("mins=%d\n",mins);
          secs=atoi(sec);
          printf("secs=%d\n",secs);

          whichsecond= (mins*60)+secs;
          printf("whichsecond is %d",whichsecond);
          cnt=whichsecond;
       




          /* ADD DELAY AMOUNT TAKEN FROM THE ARRAY IN THE TIMESTAMP OF THE PACKETS*/

          struct timeval delay;
          delay.tv_usec=(1000*Shaper[cnt])+time2.tv_usec;
          pnode->ts.tv_usec=delay.tv_usec;
          printf("pnode->ts.tv_usec = %ld.......... \n",pnode->ts.tv_usec);
          cnt++;

        }

                pthread_mutex_unlock(&mutex2);     

   
      int n=0;int m=0;
   

           
      if(pktlist == NULL)
        {

     
          pktlist=pnode;
          phead=pnode;
          pcurr=pnode;
        }
      else
        {
         
          pcurr->next =pnode;
          pcurr=pcurr->next;

        }


     
    }

      pktCnt++;

    }

-----------------------------------------------
Code:

pthread_mutex_t mutex_send=PTHREAD_MUTEX_INITIALIZER;

pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void* sender(void* ptr)
{
  printf("sender1....-4");
  printf("SENDER1 --I am inside the sender1 thread to send packets \n");
  printf("sender1...-3");
  //  rPos=-3;
  printf("sender1... -2");
  sendProcess *mySend;
  int Fd;
  int nics;
  printf("sender 1... -1");
  extern int semaphore;
  struct sembuf sWait={0,-1,0};
  union semun sSet;
  unsigned char destAddr[6];
  char* nic;
  struct timeval curr_time;
  mySend=(sendProcess*)ptr;

  int sentPkts=0;
  // pktCnt=cProc->pktCnt;
  int len1=0;
  int pktCnt=0;

  sSet.val=1;
  semaphore=mySend->semaphore;
  Fd=mySend->Fd;
  //  rPos=-1;
  //  printf("sender1 .. 1");
  unsigned char buf[4092];

  int retVal;
  printf("SENDER1--Fd is %d",Fd);

  printf("sender 1... 1");


  //  printf("sender1 .. 2");
 
  snode=scurr=pktlist;
 
  printf(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>.snode=%d,scurr=%d,pktlist=%d",snode,scurr,pktlist);
do
  {
  pthread_mutex_lock( &mut );


  //    pthread_mutex_lock( &mut );
      while(scurr->next!=NULL);
    {
      if(snode->ts.tv_sec < scurr->ts.tv_sec && snode->ts.tv_usec < scurr->ts.tv_usec)
        {
          TMPNode=snode;
          snode=scurr;
          scurr=TMPNode;
        }
     
      scurr=scurr->next;
    }
 
   
   
      gettimeofday(&curr_time,NULL);     
      if(curr_time.tv_sec >= snode->ts.tv_sec  && curr_time.tv_usec >= snode->ts.tv_usec)
    {
     
      struct sockaddr_ll destAddr;
      memcpy(buf,&snode->pkt,snode->pkt_len);
      memcpy(&(destAddr.sll_addr),&buf,MAC_ADDR_LEN);
      destAddr.sll_family=PF_PACKET;
      destAddr.sll_protocol=htons(ETH_P_ALL);
      destAddr.sll_ifindex=2;
      destAddr.sll_pkttype=PACKET_OTHERHOST;
      destAddr.sll_halen=ETH_ALEN;
      if(snode2->rFlag==0 && snode2->wFlag==1)
        {
         
          if((retVal=write(Fd,snode->pkt,snode->pkt_len))==-1)
        {
          printf("retVal is %d",retVal);
          printf("sendto() error \n");
        }
          printf("SENDER 1 --- packet number is %d OF length %d Sent \n",snode2->pkt_id,retVal);
          snode2->rFlag=1;
          snode2->wFlag=0;
        }
     
    }
  pthread_mutex_unlock( &mut );
      snode=snode->next;
  }while(snode->next != NULL);
     
//      pthread_mutex_unlock( &mut );
     

}


kami1219 3Aug2009 18:50

Re: Synchronizing two threads
 
Quote:

Originally Posted by kami1219 (Post 53889)
Hi I have two threads. One thread (capture) captures packets on NIC-1 and store them in a circular linked list and the other thread (sender) retrieves these packets from the list and sends the packet through NIC-2 and frees the memory. I need proper synchronization of these threads using semphore or mutex locks. Here I am facing problem to implement the mutex lock properly.

Need your urgent help in completing this task. Code is given below. Thanks in advance.



Code:

pthread_mutex_t mutex2=PTHREAD_MUTEX_INITIALIZER;

void* capture(void* ptr)
{
  printf("I am inside the capture thread to capture packets \n");

  struct sockaddr_ll packet_info;
  int packet_info_size = sizeof(packet_info);
  struct timeval time2;
  union semun args;
  capProcess *cProc;
  int sd;
  char* nic;
  extern int semaphore;
  int pktCnt;

  cProc=(capProcess*)ptr;
  sd=cProc->sd;
  nic=cProc->nic;
  semaphore=cProc->semaphore;
  pktCnt=cProc->pktCnt;
  int len1;
  // int pktCnt=1;
 
  printf("CAP1 THREAD--sd is %d \n",sd);
 
  // wPos=0;
  int  packets_to_sniff=10000000;
  args.val=1;
  unsigned char *pkt=(unsigned char *)malloc(2048*(sizeof(unsigned char *)));
 
 
    while( packets_to_sniff--)
    {

      if((len1=recvfrom(sd,pnode->pkt,2048,0,(struct sockaddr*)&packet_info,&packet_info_size)) == -1)
    {
      perror("Recvfrom() returned -1");
      exit(-1);
    }
      else
    {
     
      ioctl(sd,SIOCGSTAMP,&time2);


      pthread_mutex_lock(&mutex2);

          pnode->pkt_id=pktCnt;
          printf("packet number is %d \n",pnode->pkt_id);
          //      memcpy(&pnode->pkt,&pkt,len1);
          pnode->pkt_len=len1;
          printf("packet of length %d bytes received \n",pnode->pkt_len);
          pnode->ts.tv_sec=time2.tv_sec;
          printf("packet of time seconds are  %ld  \n",pnode->ts.tv_sec);
          pnode->ts.tv_usec=time2.tv_usec;
          printf("packet of time micro seconds are  %ld  \n",pnode->ts.tv_usec);
          pnode->wPos=wPos;
          rPos=wPos-3;
          pnode->rPos=rPos;
          pnode->wFlag=1;
          pnode->rFlag=0;
          char s[20];
          gettimeofday(&time2);     
          wPos++;
          //  }


      /* current system clocke time in hour-min-sec*/
      time_t rawtime;
      struct tm *timeinfo,*timeinfo2;
      char buffer[80],hr[10],min[10],sec[10];
      int str;int cnt;

      time(&rawtime);
      timeinfo=localtime(&rawtime);
      strftime(hr,10,"%I",timeinfo);
      strftime(min,10,"%M",timeinfo);
      strftime(sec,10,"%S",timeinfo);

      if(strcmp(min,"00")==0)
        {
          printf("New Hour starts \n");
          cnt=0;
        }
      else
        {
          int whichsecond;
          int mins, secs;
          mins = atoi(min);
          printf("mins=%d\n",mins);
          secs=atoi(sec);
          printf("secs=%d\n",secs);

          whichsecond= (mins*60)+secs;
          printf("whichsecond is %d",whichsecond);
          cnt=whichsecond;
       




          /* ADD DELAY AMOUNT TAKEN FROM THE ARRAY IN THE TIMESTAMP OF THE PACKETS*/

          struct timeval delay;
          delay.tv_usec=(1000*Shaper[cnt])+time2.tv_usec;
          pnode->ts.tv_usec=delay.tv_usec;
          printf("pnode->ts.tv_usec = %ld.......... \n",pnode->ts.tv_usec);
          cnt++;

        }

                pthread_mutex_unlock(&mutex2);     

   
      int n=0;int m=0;
   

           
      if(pktlist == NULL)
        {

     
          pktlist=pnode;
          phead=pnode;
          pcurr=pnode;
        }
      else
        {
         
          pcurr->next =pnode;
          pcurr=pcurr->next;

        }


     
    }

      pktCnt++;

    }

-----------------------------------------------
Code:

pthread_mutex_t mutex_send=PTHREAD_MUTEX_INITIALIZER;

pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void* sender(void* ptr)
{
  printf("sender1....-4");
  printf("SENDER1 --I am inside the sender1 thread to send packets \n");
  printf("sender1...-3");
  //  rPos=-3;
  printf("sender1... -2");
  sendProcess *mySend;
  int Fd;
  int nics;
  printf("sender 1... -1");
  extern int semaphore;
  struct sembuf sWait={0,-1,0};
  union semun sSet;
  unsigned char destAddr[6];
  char* nic;
  struct timeval curr_time;
  mySend=(sendProcess*)ptr;

  int sentPkts=0;
  // pktCnt=cProc->pktCnt;
  int len1=0;
  int pktCnt=0;

  sSet.val=1;
  semaphore=mySend->semaphore;
  Fd=mySend->Fd;
  //  rPos=-1;
  //  printf("sender1 .. 1");
  unsigned char buf[4092];

  int retVal;
  printf("SENDER1--Fd is %d",Fd);

  printf("sender 1... 1");


  //  printf("sender1 .. 2");
 
  snode=scurr=pktlist;
 
  printf(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>.snode=%d,scurr=%d,pktlist=%d",snode,scurr,pktlist);
do
  {
  pthread_mutex_lock( &mut );


  //    pthread_mutex_lock( &mut );
      while(scurr->next!=NULL);
    {
      if(snode->ts.tv_sec < scurr->ts.tv_sec && snode->ts.tv_usec < scurr->ts.tv_usec)
        {
          TMPNode=snode;
          snode=scurr;
          scurr=TMPNode;
        }
     
      scurr=scurr->next;
    }
 
   
   
      gettimeofday(&curr_time,NULL);     
      if(curr_time.tv_sec >= snode->ts.tv_sec  && curr_time.tv_usec >= snode->ts.tv_usec)
    {
     
      struct sockaddr_ll destAddr;
      memcpy(buf,&snode->pkt,snode->pkt_len);
      memcpy(&(destAddr.sll_addr),&buf,MAC_ADDR_LEN);
      destAddr.sll_family=PF_PACKET;
      destAddr.sll_protocol=htons(ETH_P_ALL);
      destAddr.sll_ifindex=2;
      destAddr.sll_pkttype=PACKET_OTHERHOST;
      destAddr.sll_halen=ETH_ALEN;
      if(snode2->rFlag==0 && snode2->wFlag==1)
        {
         
          if((retVal=write(Fd,snode->pkt,snode->pkt_len))==-1)
        {
          printf("retVal is %d",retVal);
          printf("sendto() error \n");
        }
          printf("SENDER 1 --- packet number is %d OF length %d Sent \n",snode2->pkt_id,retVal);
          snode2->rFlag=1;
          snode2->wFlag=0;
        }
     
    }
  pthread_mutex_unlock( &mut );
      snode=snode->next;
  }while(snode->next != NULL);
     
//      pthread_mutex_unlock( &mut );
     

}


Hey guys. It has been a week that I did not hear any reply to my post. Please help me to complete this task. I am not getting how to use locks in this code to synchronize these two threads.


All times are GMT +5.5. The time now is 13:39.