2015-05-25 12 views
13

Szukałem możliwości dla Websphere MQ jako źródła danych dla strumieniowania iskier, ponieważ jest to potrzebne w jednym z naszych przypadków użycia. Dowiedziałem się, że MQTT jest protokołem, który obsługuje komunikację ze struktur danych MQ, ale ponieważ jestem początkującym dla iskrzenia strumieniowego, potrzebuję pewnych roboczych przykładów dla tego samego. Czy ktoś próbował połączyć się z MQ z strumieniowaniem iskier. Proszę wymyślić najlepszy sposób na zrobienie tego.Websphere MQ jako źródło danych dla Apache Spark Streaming

+1

Głosowanie zamknąć jako niezwiązane z tematem, ponieważ nie pasuje do wytycznych dotyczących pytań w Stack Overflow. Proponuję zadać te szerokie pytania dotyczące architektury i wykonalności na http://mqseries.net lub na jednym z internetowych forów MQ. –

+0

Myślę, że może to być problem z frazowaniem. Zamiast niejasnego: "Szukałem tego. Jakie jest najlepsze rozwiązanie?" - możesz zadać bezpośrednie pytanie. _ "Jak odczytać dane z Websphere MQ przez Apache Spark?" _ Jeśli wiesz więcej na temat strony Websphere MQ, możesz dodać więcej informacji na ten temat. Czy obsługuje SQL? Jak zwykle go odpytujesz? Jakie istnieją dla niego klienci? Wtedy ktoś, kto zna Sparka, prawdopodobnie może ci pomóc. –

Odpowiedz

3

Więc jestem delegowania tutaj kod pracy dla CustomMQReceiver który łączy WebSphere MQ i odczytuje dane:

public class CustomMQReciever extends Receiver<String> { String host = null; 
int port = -1; 
String qm=null; 
String qn=null; 
String channel=null; 
transient Gson gson=new Gson(); 
transient MQQueueConnection qCon= null; 

Enumeration enumeration =null; 

public CustomMQReciever(String host , int port, String qm, String channel, String qn) { 
    super(StorageLevel.MEMORY_ONLY_2()); 
    this.host = host; 
    this.port = port; 
    this.qm=qm; 
    this.qn=qn; 
    this.channel=channel; 

} 

public void onStart() { 
    // Start the thread that receives data over a connection 
    new Thread() { 
     @Override public void run() { 
      try { 
       initConnection(); 
       receive(); 
      } 
      catch (JMSException ex) 
      { 
       ex.printStackTrace(); 
      } 
     } 
    }.start(); 
} 
public void onStop() { 
    // There is nothing much to do as the thread calling receive() 
    // is designed to stop by itself isStopped() returns false 
} 

/** Create a MQ connection and receive data until receiver is stopped */ 
private void receive() { 
    System.out.print("Started receiving messages from MQ"); 

    try { 

    JMSMessage receivedMessage= null; 

     while (!isStopped() && enumeration.hasMoreElements()) 
     { 

      receivedMessage= (JMSMessage) enumeration.nextElement(); 
      String userInput = convertStreamToString(receivedMessage); 
      //System.out.println("Received data :'" + userInput + "'"); 
      store(userInput); 
     } 

     // Restart in an attempt to connect again when server is active again 
     //restart("Trying to connect again"); 

     stop("No More Messages To read !"); 
     qCon.close(); 
     System.out.println("Queue Connection is Closed"); 

    } 
    catch(Exception e) 
    { 
     e.printStackTrace(); 
     restart("Trying to connect again"); 
    } 
    catch(Throwable t) { 
     // restart if there is any other error 
     restart("Error receiving data", t); 
    } 
    } 

    public void initConnection() throws JMSException 
{ 
    MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory(); 
    conFactory.setHostName(host); 
    conFactory.setPort(port); 
    conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP); 
    conFactory.setQueueManager(qm); 
    conFactory.setChannel(channel); 


    qCon= (MQQueueConnection) conFactory.createQueueConnection(); 
    MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1); 
    MQQueue queue=(MQQueue) qSession.createQueue(qn); 
    MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue); 
    qCon.start(); 

    enumeration= browser.getEnumeration(); 
    } 

@Override 
public StorageLevel storageLevel() { 
    return StorageLevel.MEMORY_ONLY_2(); 
} 
} 
1

wierzę można użyć JMS, aby połączyć się połączyć WebSphere MQ i Apache Camel mogą być wykorzystane połączyć się z Websphere MQ. Można utworzyć własny odbiornik jak tak (zauważ, że ten wzór może być również stosowana bez JMS):

class JMSReceiver(topicName: String, cf: String, jndiProviderURL: String) 
    extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) with Serializable { 
    //Transient as this will get passed to the Workers from the Driver 
    @transient 
    var camelContextOption: Option[DefaultCamelContext] = None 

    def onStart() = { 
    camelContextOption = Some(new DefaultCamelContext()) 
    val camelContext = camelContextOption.get 
    val env = new Properties() 
    env.setProperty("java.naming.factory.initial", "???") 
    env.setProperty("java.naming.provider.url", jndiProviderURL) 
    env.setProperty("com.webmethods.jms.clientIDSharing", "true") 
    val namingContext = new InitialContext(env); //using the properties file to create context 

    //Lookup Connection Factory 
    val connectionFactory = namingContext.lookup(cf).asInstanceOf[javax.jms.ConnectionFactory] 
    camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)) 

    val builder = new RouteBuilder() { 
     def configure() = { 
      from(s"jms://topic:$topicName?jmsMessageType=Object&clientId=$clientId&durableSubscriptionName=${topicName}_SparkDurable&maxConcurrentConsumers=10") 
      .process(new Processor() { 
      def process(exchange: Exchange) = { 
       exchange.getIn.getBody match { 
       case s: String => store(s) 
       } 
      } 
      }) 
     } 
     } 
    } 
    builders.foreach(camelContext.addRoutes) 
    camelContext.start() 
    } 

    def onStop() = if(camelContextOption.isDefined) camelContextOption.get.stop() 
} 

można następnie utworzyć DStream swojego wydarzenia tak:

val myDStream = ssc.receiverStream(new JMSReceiver("MyTopic", "MyContextFactory", "MyJNDI"))