2013-02-22 3 views
5

W moim wiosennym zadaniu wsadowym mój procesor przedmiotów dzieli obiekt, który czyta czytnik przedmiotu, na siedem list o zmiennej długości. Listy te muszą być zapisane w siedmiu tabelach w bazie danych, a wszelkie błędy (jak rekordy odrzucające bazę danych z dowolnego powodu) muszą spowodować wycofanie transakcji na wszystkich siedmiu tabelach.w Spring Batch, czy wiele JdbcBatchItemWriters może być skonfigurowanych do pisania równolegle?

Obecnie tworzę owinięty obiekt z tych siedmiu list, które są przekazywane do niestandardowego programu piszącego. Program piszący przyjmuje wszystkie te elementy, tworzy własne siedem list tak, że ma tylko siedem zapisów w pakietach (przy użyciu DAO opartych na JdbcTemplate) dla partii pakowanych obiektów zwróconych przez procesor przedmiotów.

Mój program piszący wywołuje funkcję wstawiania dla każdej z tych tabel sekwencyjnie, którą chciałbym przyspieszyć. Zastanawiam się, czy mogę napisać listy, do ich odpowiednich tabel, równolegle, tak aby ogólny czas wykonania był czasem najdłuższego zapisu. Jednym z wymogów, którego nie mogę złamać, jest to, że musi to być jedna transakcja, którą należy wycofać, jeśli którykolwiek z autorów ma jakieś wyjątki.

+1

dlaczego nie używać normalnego 'java.util.concurrent.ThreadPoolExecutor' w ItemWriter do wysyłania do równoległych wątków? –

Odpowiedz

6

Oto proste rozwiązanie z wykorzystaniem TaskExecutor i rozszerzenie na org.springframework.batch.item.support.CompositeItemWriter.

package de.incompleteco.spring.batch.item.support; 

import java.util.List; 

import org.springframework.batch.item.ItemWriter; 
import org.springframework.batch.item.support.CompositeItemWriter; 
import org.springframework.core.task.TaskExecutor; 
import org.springframework.util.Assert; 

import de.incompleteco.spring.domain.SimpleEntity; 

public class ParallelCompositeItemWriter extends CompositeItemWriter<SimpleEntity> { 

    private List<ItemWriter<? super SimpleEntity>> delegates; 

    private TaskExecutor taskExecutor; 

    @Override 
    public void write(final List<? extends SimpleEntity> item) throws Exception { 
     for (final ItemWriter<? super SimpleEntity> writer : delegates) { 
      taskExecutor.execute(new Runnable() { 
       @Override 
       public void run() { 
        try { 
         writer.write(item); 
        } catch (Throwable t) { 
         rethrow(t); 
        } 
       } 

       private void rethrow(Throwable t) { 
        if (t instanceof RuntimeException) { 
         throw (RuntimeException) t; 
        } 
        else if (t instanceof Error) { 
         throw (Error) t; 
        } 
        throw new IllegalStateException(t); 
       }  
      }); 
     }//end for 
    } 


    public void setTaskExecutor(TaskExecutor taskExecutor) { 
     this.taskExecutor = taskExecutor; 
    } 

    @Override 
    public void setDelegates(List<ItemWriter<? super SimpleEntity>> delegates) { 
     this.delegates = delegates; 
     super.setDelegates(delegates); 
    } 

    @Override 
    public void afterPropertiesSet() throws Exception { 
     super.afterPropertiesSet(); 
     Assert.notNull(taskExecutor,"Task executor needs to be set"); 
    } 



} 

przykładowa konfiguracja wyglądałaby tak:

<batch:job id="simpleJob"> 
    <batch:step id="simpleJob.step1"> 
     <batch:tasklet> 
      <batch:chunk reader="reader" writer="writer" commit-interval="10"/> 
     </batch:tasklet> 
    </batch:step> 
</batch:job> 

<bean id="reader" class="org.springframework.batch.item.support.IteratorItemReader"> 
    <constructor-arg ref="itemList"/> 
</bean> 

<bean id="writer" class="de.incompleteco.spring.batch.item.support.ParallelCompositeItemWriter"> 
    <property name="delegates" ref="writerDelegates"/> 
    <property name="taskExecutor" ref="writerTaskExecutor"/> 
</bean> 

<util:list id="writerDelegates"> 
    <bean class="org.springframework.batch.item.database.JdbcBatchItemWriter"> 
     <property name="dataSource" ref="dataSource1"/> 
     <property name="sql" value="insert into test_table (idcol,stuff) values (:idCol,:stuff)"/> 
     <property name="itemSqlParameterSourceProvider"> 
      <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/> 
     </property> 
    </bean> 
    <bean class="org.springframework.batch.item.database.JdbcBatchItemWriter"> 
     <property name="dataSource" ref="dataSource2"/> 
     <property name="sql" value="insert into test_table (idcol,stuff) values (:idCol,:stuff)"/> 
     <property name="itemSqlParameterSourceProvider"> 
      <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/> 
     </property> 
    </bean>  
</util:list> 

<util:list id="itemList"> 
    <bean class="de.incompleteco.spring.domain.SimpleEntity"> 
     <constructor-arg value="stuff1"/> 
    </bean> 
    <bean class="de.incompleteco.spring.domain.SimpleEntity"> 
     <constructor-arg value="stuff2"/> 
    </bean>  
    <bean class="de.incompleteco.spring.domain.SimpleEntity"> 
     <constructor-arg value="stuff3"/> 
    </bean>  
</util:list> 

<task:executor id="writerTaskExecutor" pool-size="3"/> 


<bean id="dataSource1" class="bitronix.tm.resource.jdbc.PoolingDataSource" init-method="init" destroy-method="close"> 
    <property name="className" value="org.h2.jdbcx.JdbcDataSource" /> 
    <property name="uniqueName" value="#{T(System).currentTimeMillis()}" /> 
    <property name="allowLocalTransactions" value="true"/> 
    <property name="maxPoolSize" value="2" /> 
    <property name="driverProperties"> 
     <props> 
      <prop key="URL">jdbc:h2:mem:a;DB_CLOSE_DELAY=-1</prop> 
     </props> 
    </property> 
</bean> 

<bean id="dataSource2" class="bitronix.tm.resource.jdbc.PoolingDataSource" init-method="init" destroy-method="close"> 
    <property name="className" value="org.h2.jdbcx.JdbcDataSource" /> 
    <property name="uniqueName" value="#{T(System).currentTimeMillis()}" /> 
    <property name="allowLocalTransactions" value="true"/> 
    <property name="maxPoolSize" value="2" /> 
    <property name="driverProperties"> 
     <props> 
      <prop key="URL">jdbc:h2:mem:b;DB_CLOSE_DELAY=-1</prop> 
     </props> 
    </property> 
</bean>  

<jdbc:initialize-database data-source="dataSource1"> 
    <jdbc:script location="classpath:/META-INF/sql/schema-h2.sql"/> 
</jdbc:initialize-database> 

<jdbc:initialize-database data-source="dataSource2"> 
    <jdbc:script location="classpath:/META-INF/sql/schema-h2.sql"/> 
</jdbc:initialize-database> 
<!-- XA transaction --> 

<bean id="btmConfig" factory-method="getConfiguration" class="bitronix.tm.TransactionManagerServices"/> 

<bean id="BitronixTransactionManager" factory-method="getTransactionManager" 
    class="bitronix.tm.TransactionManagerServices" depends-on="btmConfig" destroy-method="shutdown" /> 

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> 
    <property name="transactionManager" ref="BitronixTransactionManager" /> 
    <property name="userTransaction" ref="BitronixTransactionManager" /> 
</bean> 

w tym przykładzie użyto poniższego;

  • Bitronix JTA do obsługi transakcji w wielu bazach danych
  • bardzo prosty model prosty podmiotu w prosty zapis jdbc

(materiał w bazie jest bardzo surowy i tylko przykład)

Powiązane problemy