2012-01-18 9 views
7

Chcę wykonywać równolegle kilka różnych zadań, ale mam koncepcję, że jeśli zadanie jest już w kolejce lub jest w trakcie przetwarzania, nie zostanie ponownie ustawione w kolejce. Odczytywałem trochę na temat API Java i wymyśliłem poniższy kod, który wydaje się działać. Czy ktoś może rzucić światło na to, czy metoda, z której korzystam, jest najlepszym podejściem. Wszelkie zagrożenia (bezpieczeństwo wątków?) Lub lepsze sposoby na to? Kod jest jak poniżej:Obsługa wątków "duplikowanie" zadań

import java.util.HashMap; 
import java.util.concurrent.Future; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

public class TestExecution implements Runnable { 
    String key1; 
    String key2; 
    static HashMap<TestExecution, Future<?>> executions = new HashMap<TestExecution, Future<?>>(); 
    static LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(); 
    static ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, q); 

    public static void main(String[] args) { 
     try { 
     execute(new TestExecution("A", "A")); 
     execute(new TestExecution("A", "A")); 
     execute(new TestExecution("B", "B")); 
     Thread.sleep(8000); 
     execute(new TestExecution("B", "B")); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 

    static boolean execute(TestExecution e) { 
     System.out.println("Handling "+e.key1+":"+e.key2); 
     if (executions.containsKey(e)) { 
     Future<?> f = (Future<?>) executions.get(e); 
     if (f.isDone()) { 
      System.out.println("Previous execution has completed"); 
      executions.remove(e); 
     } else { 
      System.out.println("Previous execution still running"); 
      return false; 
     }   
     } 
     else { 
     System.out.println("No previous execution"); 
     } 
     Future<?> f = tpe.submit(e); 
     executions.put(e, f);    
     return true; 
    } 

    public TestExecution(String key1, String key2) { 
     this.key1 = key1; 
     this.key2 = key2;  
    } 

    public boolean equals(Object obj) 
    { 
     if (obj instanceof TestExecution) 
     { 
      TestExecution t = (TestExecution) obj; 
      return (key1.equals(t.key1) && key2.equals(t.key2));   
     }  
     return false; 
    } 

    public int hashCode() 
    { 
     return key1.hashCode()+key2.hashCode(); 
    } 

    public void run() {  
     try { 
     System.out.println("Start processing "+key1+":"+key2); 
     Thread.sleep(4000); 
     System.out.println("Finish processing "+key1+":"+key2); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     }  
    }    
} 

Dołącz do komentowania poniżej:
Plan jest taki, że wyzwalanie zadań do wykonania będą obsługiwane przez cron wywołującego relaksującego usługę internetową. Na przykład poniżej znajduje się konfiguracja dla jednego zadania uruchamianego o godzinie 9:30 każdego dnia, plus kolejne zaplanowane co dwie minuty.

0/2 * * * * restclient.pl key11 key12 
30 09 * * * restclient.pl key21 key22 

W tym przypadku, jeśli key11 zadanie: key12 pracuje, czy już w kolejce do uruchomienia, nie chcę stać w kolejce kolejnej instancji. Rozumiem, że mamy inne opcje planowania, jednak zwykle używamy crona do innych zadań, więc chcę spróbować to zatrzymać.

Druga aktualizacja. W odpowiedzi na uwagi do tej pory ponownie napisałem kod, czy mógłbyś skomentować jakiekolwiek problemy z następującym zaktualizowanym rozwiązaniem?

import java.util.concurrent.LinkedBlockingQueue; 

public class TestExecution implements Runnable { 
    String key1; 
    String key2;  
    static TestThreadPoolExecutor tpe = new TestThreadPoolExecutor(new LinkedBlockingQueue<Runnable>()); 

    public static void main(String[] args) { 
     try { 
     tpe.execute(new TestExecution("A", "A")); 
     tpe.execute(new TestExecution("A", "A")); 
     tpe.execute(new TestExecution("B", "B")); 
     Thread.sleep(8000); 
     tpe.execute(new TestExecution("B", "B")); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 

    public TestExecution(String key1, String key2) { 
     this.key1 = key1; 
     this.key2 = key2;  
    } 

    public boolean equals(Object obj) 
    { 
     if (obj instanceof TestExecution) 
     { 
      TestExecution t = (TestExecution) obj; 
      return (key1.equals(t.key1) && key2.equals(t.key2));   
     }  
     return false; 
    } 

    public int hashCode() 
    { 
     return key1.hashCode()+key2.hashCode(); 
    } 

    public void run() {  
     try { 
     System.out.println("Start processing "+key1+":"+key2); 
     Thread.sleep(4000); 
     System.out.println("Finish processing "+key1+":"+key2); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     }  
    } 
} 


import java.util.Collections; 
import java.util.HashSet; 
import java.util.Set; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 


public class TestThreadPoolExecutor extends ThreadPoolExecutor { 
    Set<Runnable> executions = Collections.synchronizedSet(new HashSet<Runnable>()); 

    public TestThreadPoolExecutor(LinkedBlockingQueue<Runnable> q) {  
     super(2, 5, 1, TimeUnit.MINUTES, q);  
    } 

    public void execute(Runnable command) { 
     if (executions.contains(command)) { 
     System.out.println("Previous execution still running"); 
     return; 
     } 
     else { 
     System.out.println("No previous execution"); 
     } 
     super.execute(command);  
     executions.add(command);  
    } 

    protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t);   
     executions.remove(r); 
    }  
} 
+0

Dlaczego nie użyć hashset dla TestExecution zamiast HashMap ?? –

Odpowiedz

2

Kilka uwag:

  • w realizacji, metodę dostaniesz wyścigowy stan pomiędzy czytaniem „egzekucji” (containsKey) i piśmie (usunąć lub put) czy kilka wątków wywołuje tę metodę w tym samym czasie. Musisz zawijać wszystkie wywołania do "executes", które mają być atomowe w bloku zsynchronizowanym. (W przypadku, dzięki czemu metoda zsynchronizowane będzie działać) http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
  • Należy obsłużyć stan użyciu singleton zamiast statycznych (czyli globalne) zmienne

Ale ja naprawdę chciałbym wiedzieć nieco więcej o tym projekcie aby zrozumieć, co próbujesz osiągnąć. Dlaczego zadanie ma być kolejkowane w kolejce kilka razy?

+0

Dzięki, zaktualizowałem pytanie, podając więcej informacji. – Patrick

+1

I dla bardziej zorientowanego obiektowo projektu, rozważałbym podklasowanie ThreadPoolExecutor i umieszczenie kodu do zarządzania mapą executes-map w funkcjach execute() i afterExecute(). (Wydaje mi się również, że bardziej poprawne jest wywoływanie metody execute() zamiast funkcji submit(), ale specyfikacja nie jest jednoznaczna w tym punkcie). –

+0

Pozdrowienia, napisałem przepisany kod na podstawie twoich sugestii. Czy to wydaje się być lepsze? – Patrick

3

Oto jak bym sobie i uniknąć duplikatów

import java.util.Collections; 
import java.util.Set; 
import java.util.concurrent.*; 

public class TestExecution implements Callable<Void> { 
    private static final ThreadPoolExecutor TPE = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()); 
    private static final Set<TestExecution> TE_SET = Collections.newSetFromMap(new ConcurrentHashMap<TestExecution, Boolean>()); 

    private final String key1; 
    private final String key2; 

    public static void main(String... args) throws InterruptedException { 
     new TestExecution("A", "A").execute(); 
     new TestExecution("A", "A").execute(); 
     new TestExecution("B", "B").execute(); 
     Thread.sleep(8000); 
     new TestExecution("A", "A").execute(); 
     new TestExecution("B", "B").execute(); 
     new TestExecution("B", "B").execute(); 
     TPE.shutdown(); 
    } 

    public TestExecution(String key1, String key2) { 
     this.key1 = key1; 
     this.key2 = key2; 
    } 

    void execute() { 
     if (TE_SET.add(this)) { 
      System.out.println("Handling " + this); 
      TPE.submit(this); 
     } else { 
      System.out.println("... ignoring duplicate " + this); 
     } 
    } 

    public boolean equals(Object obj) { 
     return obj instanceof TestExecution && 
       key1.equals(((TestExecution) obj).key1) && 
       key2.equals(((TestExecution) obj).key2); 
    } 

    public int hashCode() { 
     return key1.hashCode() * 31 + key2.hashCode(); 
    } 

    @Override 
    public Void call() throws InterruptedException { 
     if (!TE_SET.remove(this)) { 
      System.out.println("... dropping duplicate " + this); 
      return null; 
     } 
     System.out.println("Start processing " + this); 
     Thread.sleep(4000); 
     System.out.println("Finish processing " + this); 
     return null; 
    } 

    public String toString() { 
     return key1 + ':' + key2; 
    } 
} 

drukuje

Handling A:A 
... ignoring duplicate A:A 
Handling B:B 
Start processing A:A 
Start processing B:B 
Finish processing A:A 
Finish processing B:B 
Handling A:A 
Handling B:B 
Start processing A:A 
Start processing B:B 
... ignoring duplicate B:B 
Finish processing B:B 
Finish processing A:A 
+0

OK, dziękuję, niektóre dobre wskaźniki tam, zwłaszcza unikanie problemów z wielowątkowością za pomocą ConcurrentHashMap i nadpisywanie metody toString. Kilka pytań.Dlaczego nie używać HashSet (czy to dlatego, że nie ma odpowiednika bezpiecznego wątku do użycia?) Również nie rozumiem kodu do usunięcia z HashMap. Wydaje się, że robisz to na początku przetwarzania, czy to nie powinno być na końcu przetwarzania? – Patrick

+1

Możesz użyć 'Collections.synchronizedSet (new HashSet())' To jest wątek bezpieczny, ale nie współbieżny. Czy to na początku, czy na końcu zależy od Twoich wymagań. Czy lepiej od czasu do czasu zrobić coś dwa razy lub od czasu do czasu nie robić czegoś (ponieważ nowe zadanie zostało dodane pomiędzy zakończeniem zadania a usunięciem) –

+0

OK, tak naprawdę nie znam różnicy między "bezpiecznym wątkiem" a "współbieżnym", może Mam tam jakieś dochodzenie. Ale do usunięcia elementu, jeśli nie chcę ponownie wykonać (lub umieścić w kolejce) zadania, które już się rozpoczęło, to TE_SET.remove powinien zostać przeniesiony na koniec funkcji Call, prawda? Czy mam rację zakładając, że przypadek "upuszczenia duplikatu" jest przypadkiem błędu? Jeśli jesteśmy w funkcji wywołania, to element powinien zawsze być zapisany w prawach HashSet? – Patrick

Powiązane problemy