2016-02-03 5 views
5

Mam tabelę z Guid jako klucz podstawowy. Tabela zawiera już wiele wierszy. Mam również win service, który wykonuje pewien zestaw akcji z każdym wierszem (możliwe potrzeby odczytu i zapisu danych z innych baz danych). Przetwarzanie jednego wiersza zajmuje dużo czasu. (Średnio około 100 sekund)Potrzebuję funkcji contributon dla moich instancji usługi wygranych

Moja wygrana usługa działa tak:

public class MyDto 
{ 
    public Guid Id { get; set; } 
} 

while (true){ 
    if(time to start){ 
      List<MyDto> rows = LoadData(); 
      foreach(MyDto obj in rows){ 
       Process(obj);//it takes in average about 100 sec 
      } 
    } 
} 

muszę skrócić czas wykonywania wszystkich moich wierszy. Z pewnych powodów zdecydowałem się na wzrost wartości mojej usługi wygranej. Potrzebuję więc, aby każda usługa wygrywała własny zestaw wierszy.

Mam parametryzowane mój LoadData() zabawy:

public List<MyDto> LoadData(int winServInstanceNumber){ 

} 

Więc muszę funkcja wkładu zależy od sumy wystąpień usług wygrana liczyć i concreate serwis wygrana liczba instancji.

można zaoferować coś lepszego niż

//on .net side 
obj.Id.GetHashCode()%totalWinServiceInstancesCount 

lub

--on sql side 
HASHBYTES('MD5', CAST(id as varbinary(16)))%totalWinServiceInstancesCount 

Odpowiedz

1

wygląda wszystko, co potrzebne jest, aby zakręcić więcej wątków do przetwarzania danych. Ale aby to zrobić, potrzebujesz kontroli nad tym, co przetwarzasz, aby nie przetwarzać tego samego dwa razy. Aby uzyskać kontrolę, możesz użyć na przykład MSMQ lub System.Collections.Queue. Twoja usługa powinna odpowiadać za zapytanie do bazy danych i ładowanie nieprzetworzonych wierszy do kolejki.

Następnie można wywołać metodę statyczną ProcessBatch. Przejdzie do kolejki i zakręci wątkiem (wątkami) i przekaże id (y) rzędu (ów) do procesora (ów)/pracowników. pracownik przetwarza tylko jeden wiersz. Pracownik może być oddzielnym plikiem EXE i kończy się proces. Twoja "ProcessBatch" powinna kontrolować, co zostało przetworzone/nie przetworzone. Powinien kontrolować, ile wątków jest aktualnie uruchomionych. Nie chcesz kręcić zbyt dużo.

Więc

Service   ProcessControl   Worker 
    |      |    | 
    |---Load Queue   |    | 
    |   |    |    | 
    |<--------|    |    | 
    |      |    | 
    |-----Call When Q ----->|---Queue  | 
    |      |  |  | 
    |      |<------|  | 
    |      |    | 
    |---Load Queue   |----Start------>| 
    |  |    |<---Success-----| 
    |<-------|    |    | 
    |      |---Permanent | 
    |-----Call When Q ----->| Dequeue  | 
    |      |  |  | 
    |      |<------|  | 

Jest to prawdopodobnie typowe obciążenie rozłam, który przyspiesza inaczej powolne procesy

+0

Dobra uwaga, dziękuję za odpowiedź, ale pytałem o funkcję składki. BTW zagłosuj na swoje rozwiązanie – isxaker

+0

@isxaker Przepraszamy, może coś pominę tutaj.Myślałem, że twoje pytanie dotyczyło przyspieszenia przetwarzania danych zamiast "foreach (MyDto obj in rows)". Czy możesz wyjaśnić, co to znaczy dla ciebie "funkcja składki"? Dzięki –

+0

Jest to funkcja (wygląda jak funkcja skrótu), zależy od jednego (numer aktualnej instancji win serv) lub dwóch (numer bieżącej instancji win serv i całkowitej liczby usług wygranych) ustawień globalnych. Również ta funkcja ma jeden parametr wejściowy (GUID w moim przypadku). Funkcja musi wykonać pewne "magiczne" z wartością wejściową i return int number należy [0; totalWinServInstancesNumber -1]; wygląda jak jakiś skrót od Guida w moim przypadku. – isxaker

1

Zamiast próbować uruchomić wiele wystąpień tej samej usługi, należy przyjąć asynchroniczny wzór producent/konsumentów. Użyj obiektu Task, aby uruchomić producenta, a następnie utworzyć wielu użytkowników. Jeśli twoje dane muszą zostać przetworzone w określonej kolejności, musisz zorganizować konsumentom pracę tylko nad przydzielonym blokiem danych. W przeciwnym razie mogą pobrać swoją pracę i rozpocząć przetwarzanie.

Poniższy przykład zakłada, że ​​praca może być kontynuowana w dowolnej kolejności. Możesz dostroić optymalizację liczby klientów w oparciu o zasoby systemowe. Użyj AppSetting, aby skonfigurować MaxConsumer i znaleźć idealną liczbę, która optymalizuje przetwarzanie.

Podłączenie metody start/stop z poziomu usługi start/stop, jak również wszelkie niezbędne rejestrowanie/obsługę wyjątków. Przykład tutaj jest uproszczony i pokazuje podstawy wzoru.

public class MyService 
{ 
    BlockingCollection<MyDto> sharedResource = new BlockingCollection<MyDto>(); 
    CancellationTokenSource cancellation; 
    private Task producer; 
    private List<Task> consumers; 
    //Load/Set this from configuration 
    private static readonly int MaxConsumer = 3; 

    public void Start() 
    { 
     this.cancellation = new CancellationTokenSource(); 

     // Start the producer & Consumers, as long running task 
     this.producer = Task.Factory.StartNew(() => this.Produce(), TaskCreationOptions.LongRunning); 
     this.consumers = new List<Task>(); 
     for(int i=0; i<MaxConsumer; i++) 
     { 
      this.consumers.Add(Task.Factory.StartNew(() => this.Consume() , TaskCreationOptions.LongRunning)); 
     } 

     // If you need primary service loop you can do 
     // something like the following 
     // while(!this.cancellation.IsCancellationRequested) 
     //{ 
     //  this.cancellation.Token.WaitHandle.WaitOne(1000); 
     //} 
    } 

    public void Stop() 
    { 
     this.cancellation.Cancel(); 
     WaitOnTask(producer); 
     foreach(var t in this.consumers) 
     { 
      WaitOnTask(t); 
     } 
     this.cancellation.Dispose(); 
    } 

    private void WaitOnTask(Task task) 
    { 
     try 
     { 
      if (!task.IsCompleted) 
      { 
       //May want to use timeout 
       //instead of blindly waiting 
       task.Wait(); 
      } 
     } 
     catch(ObjectDisposedException oex)   
     { 
      // Task might have been disposed/closed already 
     } 

    } 

    public void Produce() 
    { 
     var token = this.cancellation.Token; 
     while(!token.IsCancellationRequested) 
     { 
      //Code for your data loading 
      if (time to start) 
      { 
       List<MyDto> rows = LoadData(); 
       foreach(var data in rows) 
       { 
        this.sharedResource.Add(data, token); 
       } 
      } 

      //Wait and repeat 
      token.WaitHandle.WaitOne(1000); 
     } 
    } 

    public void Consume() 
    { 
     var token = this.cancellation.Token; 
     try 
     { 
      foreach (var data in this.sharedResource.GetConsumingEnumerable(token)) 
      { 
       // Code for your data processing 
       Process(data); 
      } 
     } 
     catch(OperationCanceledException ex) 
     { 
      // service stop requested, can log here 
      // or take action for saving state as needed 
     } 
    } 
} 
+0

Mój komentarz jest taki sam jak do T.S. odpowiedź - Dobra uwaga, dziękuję za odpowiedź, ale pytałem o funkcję składkową. BTW zagłosuj na swoje rozwiązanie – isxaker

Powiązane problemy