2013-04-19 11 views
5

Mam program, w którym próbuję wdrożyć ustawienia wielu producentów i wielu klientów. Mam kod, który wydaje się działać dobrze, gdy mam jednego konsumenta i wielu producentów, ale wprowadzenie wielu wątków konsumenckich wydaje się wywoływać kilka dziwnych problemów.Problem z wieloma wątkami dla wielu producentów

Oto co mam teraz:

#include <stdio.h> 
#include <pthread.h> 
#include <unistd.h> 
#include <stdlib.h> 

#define MAX 10 

typedef struct travs { 
    int id; 
    int numBags; 
    int arrTime; 
    struct travs *next; 
} travs; 


travs *queue; 
//travs *servicing[MAX]; 

int produced; // The total # of produced in the queue 

pthread_mutex_t queue_lock; 
//pthread_mutex_t staff_lock; 
pthread_cond_t ct, cs; 

int CheckIn(){ 
    sleep(1); 
    if(produced != 0) return 1; 
    else return 0; 
} 



void *producerThread(void *args){ 
    travs *traveler = (travs *)args; 
    // Acquire the mutex 
    pthread_mutex_lock(&queue_lock); 
    produced++; 
// pthread_cond_signal(&cs); 
    pthread_cond_wait(&ct, &queue_lock); 
    printf("Producer %d is now checked in at time %d.\n", queue->id, (1+queue- >arrTime)); 
    queue = queue->next; 
    pthread_mutex_unlock(&queue_lock); 

    return; 
}  

int Producer(int id, int numBags, int arrTime){ 

    int ret; 
    pthread_t ttid; 
    travs *traveler = malloc(sizeof(travs)); 
    traveler->id = id; 
    traveler->numBags = numBags; 
    traveler->arrTime = arrTime; 
    sleep(arrTime); 
    pthread_mutex_lock(&queue_lock); 
    if(queue != NULL) { 
     travs *check_in = malloc(sizeof(travs)); 
     check_in = queue; 
     while(check_in->next != NULL){ 
      check_in = check_in->next; 
     } 
     check_in->next = traveler; 
    } 
    else { queue = traveler; } 
    pthread_mutex_unlock(&queue_lock); 
    // Create a new traveler thread 
    ret = pthread_create(&ttid, NULL, producerThread, (void *)traveler); 

    // Check if thread creation was successful 
    if(ret == 0) { 
     printf("Producer %d has entered the check-in line at time %d; s/he is at  position %d and has %d bags.\n", id, arrTime, produced, numBags); 
     pthread_cond_signal(&cs); 
     return 0; 
    } 
    else return -1; 

} 


void *consumerThread(void *arg){ 

    int i = 0; // travelers serviced 
    char *name = (char *)arg; 
    while(1) { // run iteratively 

     // If 20 producers have been served, the consumer's work is done. 
     if(i == 20) { 
      printf("Consumer %s's service has completed!\n", name); 
       pthread_exit(NULL); 
      } 
     // Sleep for 10s if 5 travelers have been checked in 
     if (((i+1) % 5) == 0) { 
       // Wake up sleeping travelers 
       printf("Consumer %s is taking a break.\n", name); 
       sleep(2); 
       printf("Consumer %s's break is over.\n", name); 
     } 

     if(CheckIn()) { 
      pthread_mutex_lock(&queue_lock); 
      int j = 1; 
        pthread_cond_wait(&cs, &queue_lock); 
        printf("Producer %d presents ticket to consumer  %s.\n", queue->id, name); 
        printf("Consumer %s gives boarding pass to producer  %d.\n", name, queue->id); 
        while(j <= queue->numBags){ 
         printf("Consumer %s checks in bag %d for  producer %d; baggage tag is _X_.\n", name, j, queue->id); 
         j++; 
       } 
      // Signal producer being serviced that their check in is complete. 
      i++; 
      pthread_mutex_unlock(&queue_lock); 
      produced--; 
      pthread_cond_signal(&ct); 
     } 
    sleep(3); 
    } 
} 

int Consumer(char *Name) { 

    sleep(5); 
    int ret; 
    pthread_t stid; 
    // Create a staff thread 

    ret = pthread_create(&stid, NULL, consumerThread, (void *)Name); 
    // Acquire the lock 
    if(ret == 0) { 
     printf("Producer %s's service has begun!\n", Name); 
     return 0; 
    } 
    else return -1; 
} 

int main() { 
    int ret = 0; 
    char *staff_name = malloc(sizeof(char)); 
    int staff_check = 0; 
    int trav_check = 0; 
    int id; 
    int bagnum; 
    int travtime; 
    FILE *consumer_fp; 
    FILE *producer_fp; 
    queue = malloc(sizeof(travs)); 
    queue = NULL; 
    /*while(ret < 10){ 
     servicing[ret] = malloc(sizeof(travs)); 
     servicing[ret] = NULL; 
    }*/ 

    // Initilize mutexes 
    pthread_mutex_init(&queue_lock, NULL); 
    //pthread_mutex_init(&staff_lock, NULL); 

    // Initialize condition variables 
    pthread_cond_init(&ct, NULL); 
    pthread_cond_init(&cs, NULL); 

    // Open the file so we can start reading from it 

    consumer_fp = fopen("staff.txt", "r"); 
    producer_fp = fopen("travelers.txt", "r"); 

    staff_check = fscanf(consumer_fp, "%s", staff_name); 
    trav_check = fscanf(producer_fp, "%d %d %d", &id, &bagnum, &travtime); 
    while(1){ 

     K: 
     while(staff_check == 1) { 
      Consumer(staff_name); 
      staff_check = fscanf(consumer_fp, "%s", staff_name); 
      goto L; 
     } 
     L: 
     while(trav_check == 3) { 
      Producer(id, bagnum, travtime); 
      trav_check = fscanf(producer_fp, "%d %d %d", &id, &bagnum,  &travtime); 
      goto K; 
     } 

    pthread_exit(NULL); 
    } 

} 

W tym ustawieniu, każdy wątek producent żyje tylko przez krótki czas przed powrotem, a nie czyni samo prawdziwe obliczenie oprócz dodawania nowego elementu do kolejki globalnej i kilka trafnie wyprowadzonych linii wyjściowych.

Jednak, gdy przedstawię wielu producentów, tylko ostatni wątek producenta robi cokolwiek.

Z tego co mogę przypuszczać, że potrzebne są następujące elementy:

I) oddzielnych kolejek dla producentów oczekujących na zameldowaliśmy się i producentów, które są obecnie sprawdzane w (wykomentowane jak travs * serwisowania [max] powyżej)

ii) Odrębny muteks dla konsumentów.

Jednak nie jestem pewien, jak to wdrożyć. Jest to pomysł miałem na myśli:

  1. CheckIn() wątek producent i kopia * * kolejka do serwisowania [i] (w wątku konsumentów).

  2. Kolejka ustaw = kolejka-> następna (w wątku producenta).

Ale, w jaki sposób mogę zagwarantować, że podczas kopiowania * kolejki nad tym nie będzie już zaawansowany krok? Czy mogę zasygnalizować oczekujący wątek z blokadą inną niż ta, którą obecnie ma wątek? I, co ważniejsze, w jaki sposób różne wątki konsumenckie będą przetwarzać wątki różnych podróżnych?

Każda pomoc będzie bardzo ceniona!

+0

'travs * traveler = malloc (sizeof (travs)); traveler = (travs *) args; 'to wyciek pamięci ... – Sebivor

+0

Ah, dziękuję za połów. Dodam tam darmową linię. – user991710

Odpowiedz

3

Użyj jednej kolejki.

Napisz dwie funkcje, jedną, aby dodać istniejącą pozycję do kolejki, a drugą, aby usunąć element z kolejki. Nie należy blokować tych funkcji. Przetestuj je w aplikacji z jednym gwintem.

Następnie napisz dwa opakowania do takich dwóch funkcji dodawania i usuwania. Owijarki te powinny wziąć dodatkowy argument mutex jako argument. Zablokuj ten mutex w opakowaniach przed wywołaniem funkcji dodawania lub usuwania, a następnie odblokuj muteks.

Napisz funkcję wątku producenta, tworząc nowy element i wywołując opakowanie dodatkowe. Napisz funkcję wątku konsumenta wywołującego element do usuwania elementów i niszczącą usunięty element.

Skonfiguruj funkcję main() deklarując i inicjując muteks, a następnie przejdź do tworzenia różnych wystąpień producentów i konsumentów, używając pthread_create(). Przekaż mutex jako argument funkcji wątków.

+0

Dziękuję za odpowiedź krok po kroku. Przepisałem cały program, zaczynając od małych kroków i działając poprawnie dla wielu producentów i pojedynczego klienta. Nadal mam problemy z wieloma klientami, których nie mogę zdiagnozować, ale mogę to opisać jako inne pytanie. Tak czy inaczej, ponieważ doprowadziło mnie to do pół-roboczego rozwiązania, przyjmuję to jako odpowiedź. Dziękuję Ci. – user991710

4

Jak już wspomniałem, jest to wyciek pamięci:

travs *traveler = malloc(sizeof(travs)); 
traveler = (travs *)args; 

Nie będę wchodzić w szczegóły na temat „co tak źle o przecieki pamięci?”. Jeśli chcesz otrzymać tę odpowiedź, zadaj Google to pytanie. Prawdopodobnie miałeś na myśli: travs *traveler = args;.


if(queue != NULL) { 
    travs *check_in = malloc(sizeof(travs)); 
    check_in = queue; 
    while(check_in->next != NULL){ 
     check_in = check_in->next; 
    } 
    check_in->next = traveler; 
} 
else { queue = traveler; } 

wyciek pamięci odłożyć, dlaczego jest kolejka mutex strzeżone wcześniej w innych funkcji, podczas gdy nie ma mutex pilnuje wcale w tym kodzie? Wygląda na to, że przegapiłeś punkt muteksów. Twoje wyścigi kodu, tutaj.

Być może pthread_rwlock_t s będzie bardziej odpowiedni dla tego rodzaju kodu.

+0

Dziękuję za uwagę wycieku i brak mutex w tej części kodu, ale to niestety wciąż nie odpowiada na pytanie o projekt wielu konsumentów! – user991710

Powiązane problemy