2015-04-09 7 views
8

Obecnie mam dwa tabele Hbase (można je nazwać tableA i tableB). Za pomocą jednoetapowego zadania MapReduce dane w tableA są odczytywane i zapisywane do tableB. Obecnie obie tabele znajdują się w tym samym klastrze HBase. Muszę jednak przenieść tableB do jego klastra.Jak mogę odczytać z jednej instancji HBase, ale piszę do innej?

Czy można skonfigurować jednoetapowe zadanie zmniejszania mapy w Hadoop do odczytu i zapisu z oddzielnych instancji HBase?

+0

Możesz użyć iskry do tego typu pracy. – Tinku

Odpowiedz

3

Jest możliwe, HBase na CopyTable MapReduce job robi to za pomocą TableMapReduceUtil.initTableReducerJob() który pozwala ustawić alternatywny quorumAddress w przypadku trzeba napisać do zdalnych klastrów:

public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, org.apache.hadoop.mapreduce.Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl) 

quorumAddress - Distant klaster napisać do; wartość domyślna to null dla danych wyjściowych do klastra, który jest określony w hbase-site.xml. Ustaw ten ciąg na zespół zookeeper w alternatywnym klastrze zdalnym, aby zmniejszyć liczbę zapisów klastrów, które są inne niż domyślne wartości ; na przykład kopiując tabele między klastrami, źródłem byłby oznaczony przez hbase-site.xml, a ten parametr miałby adres zespołu zdalnego. Format do przekazania jest szczególny. Pass :: takich jak serwer, serwer2, serwer3: 2181:/hbase.


Inną opcją jest wdrożenie swój własny reduktor napisać do zdalnej tabeli zamiast pisać do kontekstu. Coś podobnego do tego:

public static class MyReducer extends Reducer<Text, Result, Text, Text> { 

    protected Table remoteTable; 
    protected Connection connection; 

    @Override 
    protected void setup(Context context) throws IOException, InterruptedException { 
     super.setup(context); 
     // Clone configuration and provide a new quorum address for the remote cluster 
     Configuration config = HBaseConfiguration.create(context.getConfiguration()); 
     config.set("hbase.zookeeper.quorum","quorum1,quorum2,quorum3"); 
     connection = ConnectionFactory.createConnection(config); // HBase 0.99+ 
     //connection = HConnectionManager.createConnection(config); // HBase <0.99 
     remoteTable = connection.getTable("myTable".getBytes()); 
     remoteTable.setAutoFlush(false); 
     remoteTable.setWriteBufferSize(1024L*1024L*10L); // 10MB buffer 
    } 

    public void reduce(Text boardKey, Iterable<Result> results, Context context) throws IOException, InterruptedException { 
     /* Write puts to remoteTable */ 
    } 

    @Override 
    protected void cleanup(Context context) throws IOException, InterruptedException { 
     super.cleanup(context); 
     if (remoteTable!=null) { 
      remoteTable.flushCommits(); 
      remoteTable.close(); 
     } 
     if(connection!=null) { 
      connection.close(); 
     } 
    } 
} 
+0

Muszę kochać ludzi, którzy negatywnie odnoszą się do słusznych odpowiedzi tylko dla zabawy. –

+0

Dziękujemy za przedstawienie przykładu niestandardowej redukcji – slayton

Powiązane problemy