2011-08-24 25 views
6

Mam wiele kolekcji MongoDB, które pobierają wiele dokumentów JSON z różnych źródeł strumieniowych. Innymi słowy istnieje szereg procesów, które nieustannie wprowadzają dane do zbioru zbiorów MongoDB.MongoDb Streaming Out Wstawione dane w czasie rzeczywistym (lub prawie w czasie rzeczywistym)

Potrzebuję sposobu na przesyłanie danych z MongoDB do aplikacji końcowych. Więc chcę, że koncepcyjnie system wygląda tak:

App Stream1 --> 
App Stream2 -->  MONGODB  ---> Aggregated Stream 
App Stream3 --> 

albo to:

App Stream1 -->     ---> MongoD Stream1 
App Stream2 -->  MONGODB  ---> MongoD Stream2 
App Stream3 -->     ---> MongoD Stream3 

pytanie brzmi, jak mogę przesyłać dane z Mongo bez konieczności ciągłego odpytywania/kwerendy bazy danych?

Oczywistą odpowiedzią na pytanie jest "dlaczego nie zmieniasz procesów strumieniowania aplikacji, aby wysyłać wiadomości do kolejki, takich jak Rabbit, Zero lub ActiveMQ, które następnie wysyłają do procesów Mongo Streaming i Mongo od razu w ten sposób":

    MONGODB 
        /|\ 
        | 
App Stream1 -->  |   ---> MongoD Stream1 
App Stream2 --> SomeMQqueue ---> MongoD Stream2 
App Stream3 -->    ---> MongoD Stream3 

w idealnym świecie tak, że byłoby dobrze, ale musimy Mongo aby zapewnić, że wiadomości są zapisywane po pierwsze, aby uniknąć duplikatów i upewnić się, że identyfikatory są generowane itp Mongo musi siedzieć w środku jak trwający warstwa.

Jak przesyłać wiadomości z kolekcji Mongo (nie używając GridFS itp.) Do tych aplikacji w dół strumienia. Podstawową szkołą myślenia było po prostu odpytywanie o nowe dokumenty, a każdy dokument, który jest gromadzony, aktualizuje go, dodając kolejne pole do dokumentów JSON przechowywanych w bazie danych, podobnie jak flaga procesu w tabeli SQL, która przechowuje przetworzony znacznik czasu. To znaczy. co 1 sekunda ankieta dla dokumentów, w których przetworzono == null .... add processed = now() .... dokument aktualizacji.

Czy istnieje bardziej efektywna i wydajna metoda obliczeniowa?

FYI - To są wszystkie procesy Java.

Pozdrawiam!

Odpowiedz

3

Jeśli piszesz do kolekcji z ograniczeniami (lub kolekcji), możesz użyć numeru tailablecursor, aby przesłać nowe dane w strumieniu lub w kolejce komunikatów, skąd można je przesłać. To jednak nie zadziała w przypadku kolekcji bez limitu.

+0

Dzięki za link. Niestety nie korzystamy z ograniczonych kolekcji, ale nie jest to zła cecha usługi wiadomości. Brzmi jak indeks na przetworzonej banderoli, a odpytywanie jest jedyną opcją ... Jeśli element indeksu ma wartość null, to nadal jest on przywoływany w indeksie lub zawiera zapytanie o zerowe, wciąż niewystarczające skanowanie kolekcji? – NightWolf

+1

Albo spoujemy, że możemy mieć kolekcję z limitem w ustalonym rozmiarze, zachowywać się jak pamięć podręczna, następnie wyciągnąć przedmioty z jednego kupienia 1 i umieścić je z powrotem w zwykłej kolekcji. Pojawia się pytanie, w jaki sposób zapisać nasz kursor pozycji między biegami aplikacji? Zakładam, że po prostu używamy pola _id generowanego automatycznie w Mongo i wybieramy wszystko, co jest większe niż to pole ID ... Czy wszystkie mongoidy generują _ID w porządku rosnącym? – NightWolf

+1

Indeksy przechowują wpisy dla 'null'. Jeśli szukasz kolekcji z ograniczeniami, musisz zapisać ostatni wpis, który zobaczyłeś (możesz go zachować, jak chcesz, używając innej kolekcji Mongo, działa dobrze), a następnie rozpocząć ruchomy kursor w tym elemencie za pomocą '$ min 'i' skip (1) ', aby wznowić. Zobacz http://www.mongodb.org/display/DOCS/Advanced+Queries#AdvancedQueries-%24minand%24max – dcrosta

Powiązane problemy