2013-02-12 7 views
5

Jestem nowy w Apache Flume.
Próbuję zobaczyć, jak mogę uzyskać json (jako źródło http), parsować go i przechowywać na dynamicznej ścieżce na hdfs zgodnie z zawartością.
Na przykład:
jeśli JSON jest:Czy apache flume zlew hdfs akceptuje dynamiczną ścieżkę do pisania?

[{ 
    "field1" : "value1", 
    "field2" : "value2" 
}] 

następnie ścieżka HDFS będą:
/some-default-root-path/wartosc1/value2/niektóre wartości-nazwy-pliku
Czy istnieje taka konfiguracja flume, która pozwala mi to zrobić?

Oto moja obecna konfiguracja (akceptuje JSON przez HTTP i przechowuje go w ścieżce według znacznika czasu):

#flume.conf: http source, hdfs sink 

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource 
a1.sources.r1.port = 9000 
#a1.sources.r1.handler = org.apache.flume.http.JSONHandler 

# Describe the sink 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = /user/uri/events/%y-%m-%d/%H%M/%S 
a1.sinks.k1.hdfs.filePrefix = events- 
a1.sinks.k1.hdfs.round = true 
a1.sinks.k1.hdfs.roundValue = 10 
a1.sinks.k1.hdfs.roundUnit = minute 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

Dzięki!

Odpowiedz

8

Roztwór w korycie documentation for the hdfs sink:

Oto zmieniony konfiguracja:

#flume.conf: http source, hdfs sink 

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource 
a1.sources.r1.port = 9000 
#a1.sources.r1.handler = org.apache.flume.http.JSONHandler 

# Describe the sink 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = /user/uri/events/%{field1} 
a1.sinks.k1.hdfs.filePrefix = events- 
a1.sinks.k1.hdfs.round = true 
a1.sinks.k1.hdfs.roundValue = 10 
a1.sinks.k1.hdfs.roundUnit = minute 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

a zagięciem:

curl -X POST -d '[{ "headers" : {   "timestamp" : "434324343", "host" :"random_host.example.com", "field1" : "val1"   }, "body" : "random_body" }]' localhost:9000 
+0

mi stosując inne źródło (RabbitMQ); i samodzielnie przekazuję ładunek JSON. Opisana przez Ciebie metoda nie działa w moim przypadku. Zakładam, że coś jest nie tak z mojego końca, chyba że spotkasz się z podobnymi problemami. –

+0

Dzięki, to zadziałało dla mnie –