2013-04-29 11 views
5

Ostatnio grałem z wykorzystaniem pamięci współdzielonej dla IPC. Jedną rzeczą, którą próbowałem wdrożyć, jest prosty bufor pierścieniowy z 1 procesem produkcyjnym i 1 procesem zużywającym. Każdy proces ma swój własny numer sekwencyjny do śledzenia swojej pozycji. Te numery sekwencji są aktualizowane za pomocą op atomowych, aby zapewnić prawidłowe wartości są widoczne dla drugiego procesu. Producent zablokuje się po zapełnieniu bufora pierścieniowego. Kod jest wolny od blokady, ponieważ nie używa się semaforów ani muteksów.Bufor pojedynczego producenta/bufora konsumenta we wspólnej pamięci

Wydajność mądry Dostaję około 20 milionów wiadomości na sekundę na moje skromne VM - Całkiem zadowolony z tego :)

Co jestem ciekaw jak „poprawny” kod jest mój. Czy ktoś może wykryć nieodłączne problemy/warunki wyścigu? Oto mój kod. Z góry dziękuję za wszelkie uwagi.

#include <stdlib.h> 
#include <stdio.h> 
#include <fcntl.h> 
#include <sys/mman.h> 
#include <sys/stat.h> 
#include <time.h> 
#include <unistd.h> 
#include <string.h> 

#define SHM_ID "/mmap-test" 
#define BUFFER_SIZE 4096 
#define SLEEP_NANOS 1000 // 1 micro 

struct Message 
{ 
    long _id; 
    char _data[128]; 
}; 

struct RingBuffer 
{ 
    size_t _rseq; 
    char _pad1[64]; 

    size_t _wseq; 
    char _pad2[64]; 

    Message _buffer[BUFFER_SIZE]; 
}; 

void 
producerLoop() 
{ 
    int size = sizeof(RingBuffer); 
    int fd = shm_open(SHM_ID, O_RDWR | O_CREAT, 0600); 
    ftruncate(fd, size+1); 

    // create shared memory area 
    RingBuffer* rb = (RingBuffer*)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 
    close(fd); 

    // initialize our sequence numbers in the ring buffer 
    rb->_wseq = rb->_rseq = 0; 
    int i = 0; 

    timespec tss; 
    tss.tv_sec = 0; 
    tss.tv_nsec = SLEEP_NANOS; 

    while(1) 
    { 
     // as long as the consumer isn't running behind keep producing 
     while((rb->_wseq+1)%BUFFER_SIZE != rb->_rseq%BUFFER_SIZE) 
     { 
      // write the next entry and atomically update the write sequence number 
      Message* msg = &rb->_buffer[rb->_wseq%BUFFER_SIZE]; 
      msg->_id = i++; 
      __sync_fetch_and_add(&rb->_wseq, 1); 
     } 

     // give consumer some time to catch up 
     nanosleep(&tss, 0); 
    } 
} 

void 
consumerLoop() 
{ 
    int size = sizeof(RingBuffer); 
    int fd = shm_open(SHM_ID, O_RDWR, 0600); 
    if(fd == -1) { 
     perror("argh!!!"); return; 
    } 

    // lookup producers shared memory area 
    RingBuffer* rb = (RingBuffer*)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 

    // initialize our sequence numbers in the ring buffer 
    size_t seq = 0; 
    size_t pid = -1; 

    timespec tss; 
    tss.tv_sec = 0; 
    tss.tv_nsec = SLEEP_NANOS; 

    while(1) 
    { 
     // while there is data to consume 
     while(seq%BUFFER_SIZE != rb->_wseq%BUFFER_SIZE) 
     { 
      // get the next message and validate the id 
      // id should only ever increase by 1 
      // quit immediately if not 
      Message msg = rb->_buffer[seq%BUFFER_SIZE]; 
      if(msg._id != pid+1) { 
       printf("error: %d %d\n", msg._id, pid); return; 
      } 
      pid = msg._id; 
      ++seq; 
     } 

     // atomically update the read sequence in the ring buffer 
     // making it visible to the producer 
     __sync_lock_test_and_set(&rb->_rseq, seq); 

     // wait for more data 
     nanosleep(&tss, 0); 
    } 
} 

int 
main(int argc, char** argv) 
{ 
    if(argc != 2) { 
     printf("please supply args (producer/consumer)\n"); return -1; 
    } else if(strcmp(argv[1], "consumer") == 0) { 
     consumerLoop(); 
    } else if(strcmp(argv[1], "producer") == 0) { 
     producerLoop(); 
    } else { 
     printf("invalid arg: %s\n", argv[1]); return -1; 
    } 
} 

Odpowiedz

1

Na pierwszy rzut oka wydaje mi się to poprawne. Rozumiem, że jesteś zadowolony z wydajności, ale fajnym eksperymentem może być użycie czegoś o wiele lżejszego niż __sync_fetch_and_add. AFAIK to pełna bariera pamięci, która jest droga. Ponieważ istnieje jeden producent i jeden konsument, wydanie i odpowiednia operacja nabycia powinny zapewnić lepszą wydajność. Biblioteka Folly na Facebooku ma pojedynczą pojedynczą kolejkę dla pojedynczego producenta, która używa nowych atomów C++ 11 tutaj: https://github.com/facebook/folly/blob/master/folly/ProducerConsumerQueue.h

Powiązane problemy