Szukałem możliwości dla Websphere MQ jako źródła danych dla strumieniowania iskier, ponieważ jest to potrzebne w jednym z naszych przypadków użycia. Dowiedziałem się, że MQTT jest protokołem, który obsługuje komunikację ze struktur danych MQ, ale ponieważ jestem początkującym dla iskrzenia strumieniowego, potrzebuję pewnych roboczych przykładów dla tego samego. Czy ktoś próbował połączyć się z MQ z strumieniowaniem iskier. Proszę wymyślić najlepszy sposób na zrobienie tego.Websphere MQ jako źródło danych dla Apache Spark Streaming
13
A
Odpowiedz
3
Więc jestem delegowania tutaj kod pracy dla CustomMQReceiver który łączy WebSphere MQ i odczytuje dane:
public class CustomMQReciever extends Receiver<String> { String host = null;
int port = -1;
String qm=null;
String qn=null;
String channel=null;
transient Gson gson=new Gson();
transient MQQueueConnection qCon= null;
Enumeration enumeration =null;
public CustomMQReciever(String host , int port, String qm, String channel, String qn) {
super(StorageLevel.MEMORY_ONLY_2());
this.host = host;
this.port = port;
this.qm=qm;
this.qn=qn;
this.channel=channel;
}
public void onStart() {
// Start the thread that receives data over a connection
new Thread() {
@Override public void run() {
try {
initConnection();
receive();
}
catch (JMSException ex)
{
ex.printStackTrace();
}
}
}.start();
}
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a MQ connection and receive data until receiver is stopped */
private void receive() {
System.out.print("Started receiving messages from MQ");
try {
JMSMessage receivedMessage= null;
while (!isStopped() && enumeration.hasMoreElements())
{
receivedMessage= (JMSMessage) enumeration.nextElement();
String userInput = convertStreamToString(receivedMessage);
//System.out.println("Received data :'" + userInput + "'");
store(userInput);
}
// Restart in an attempt to connect again when server is active again
//restart("Trying to connect again");
stop("No More Messages To read !");
qCon.close();
System.out.println("Queue Connection is Closed");
}
catch(Exception e)
{
e.printStackTrace();
restart("Trying to connect again");
}
catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}
public void initConnection() throws JMSException
{
MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory();
conFactory.setHostName(host);
conFactory.setPort(port);
conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
conFactory.setQueueManager(qm);
conFactory.setChannel(channel);
qCon= (MQQueueConnection) conFactory.createQueueConnection();
MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1);
MQQueue queue=(MQQueue) qSession.createQueue(qn);
MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
qCon.start();
enumeration= browser.getEnumeration();
}
@Override
public StorageLevel storageLevel() {
return StorageLevel.MEMORY_ONLY_2();
}
}
1
wierzę można użyć JMS, aby połączyć się połączyć WebSphere MQ i Apache Camel mogą być wykorzystane połączyć się z Websphere MQ. Można utworzyć własny odbiornik jak tak (zauważ, że ten wzór może być również stosowana bez JMS):
class JMSReceiver(topicName: String, cf: String, jndiProviderURL: String)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) with Serializable {
//Transient as this will get passed to the Workers from the Driver
@transient
var camelContextOption: Option[DefaultCamelContext] = None
def onStart() = {
camelContextOption = Some(new DefaultCamelContext())
val camelContext = camelContextOption.get
val env = new Properties()
env.setProperty("java.naming.factory.initial", "???")
env.setProperty("java.naming.provider.url", jndiProviderURL)
env.setProperty("com.webmethods.jms.clientIDSharing", "true")
val namingContext = new InitialContext(env); //using the properties file to create context
//Lookup Connection Factory
val connectionFactory = namingContext.lookup(cf).asInstanceOf[javax.jms.ConnectionFactory]
camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory))
val builder = new RouteBuilder() {
def configure() = {
from(s"jms://topic:$topicName?jmsMessageType=Object&clientId=$clientId&durableSubscriptionName=${topicName}_SparkDurable&maxConcurrentConsumers=10")
.process(new Processor() {
def process(exchange: Exchange) = {
exchange.getIn.getBody match {
case s: String => store(s)
}
}
})
}
}
}
builders.foreach(camelContext.addRoutes)
camelContext.start()
}
def onStop() = if(camelContextOption.isDefined) camelContextOption.get.stop()
}
można następnie utworzyć DStream swojego wydarzenia tak:
val myDStream = ssc.receiverStream(new JMSReceiver("MyTopic", "MyContextFactory", "MyJNDI"))
Powiązane problemy
- 1. AMQP vs Websphere MQ
- 2. Okresowa transmisja w Apache Spark Streaming
- 3. Spark Streaming - przetwarzanie pliku danych binarnych
- 4. Jaka jest najnowsza wersja klienta WebSphere MQ?
- 5. Spark Streaming Kafka stream
- 6. W Spark Streaming, jak wykryć pustą partię?
- 7. Apache Camel z IBM MQ
- 8. Apache Spark vs Apache Spark 2
- 9. Używanie produktu Websphere MQ z JMS z aplikacji .NET
- 10. Apache Spark vs. Apache Storm
- 11. Apache Spark vs Apache Ignite
- 12. Nie znaleziono klasy KafkaUtils w Spark streaming
- 13. Spark streaming 1.6.0 - Executory odbijające się
- 14. Apache wdrażanie aplikacji Spark najlepsze praktyki
- 15. Spark streaming z Kafka - createDirectStream vs createStream
- 16. Źródło danych dla kontroli użytkownika
- 17. Apache Spark ALS Rekomendacja
- 18. Klasy typu Scalaz dla Apache Spark RDD
- 19. Jak przeglądać wiadomość Websphere MQ bez jej usuwania?
- 20. Co oznacza tryb powiązania w produkcie WebSphere MQ?
- 21. Elasticsearch + wydajność Apache Spark
- 22. Czytanie z Cassandry za pomocą Spark Streaming
- 23. Spark streaming mapWithState timeout bez usuwania
- 24. Apache Drill kontra Spark
- 25. Równość DataFrame w Apache Spark
- 26. Http Live Streaming z serwerem WWW Apache
- 27. Spark Streaming: foreachRDD aktualizuje moje mongo RDD
- 28. Spark Streaming: Jak okresowo odświeżać buforowane RDD?
- 29. Przetwarzanie w kolejnoś ci w Spark Streaming
- 30. Zalecenia Apache Spark ALS podejście
Głosowanie zamknąć jako niezwiązane z tematem, ponieważ nie pasuje do wytycznych dotyczących pytań w Stack Overflow. Proponuję zadać te szerokie pytania dotyczące architektury i wykonalności na http://mqseries.net lub na jednym z internetowych forów MQ. –
Myślę, że może to być problem z frazowaniem. Zamiast niejasnego: "Szukałem tego. Jakie jest najlepsze rozwiązanie?" - możesz zadać bezpośrednie pytanie. _ "Jak odczytać dane z Websphere MQ przez Apache Spark?" _ Jeśli wiesz więcej na temat strony Websphere MQ, możesz dodać więcej informacji na ten temat. Czy obsługuje SQL? Jak zwykle go odpytujesz? Jakie istnieją dla niego klienci? Wtedy ktoś, kto zna Sparka, prawdopodobnie może ci pomóc. –