5

Próbuję uzyskać interaktywne zapytania współpracujące ze strumieniami Kafki. Mam Zookeepera i Kafkę działającego lokalnie (w oknach). Gdzie używam C: \ temp jako folderu do przechowywania, zarówno dla Zookeepera, jak i Kafki.Metoda Kafka stream.allMetadata() zwraca pustą listę

mam setup temat jak to

kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic rating-submit-topic 
kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic rating-output-topic 

Reading Zrobiłem ten problem

przeczytałem tę stronę dokumentacji: http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application

Mam również przeczytać przykład Java tutaj: https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java

I przeczytaj również tę sim ilar post, który początkowo brzmiał tak samo jak ja: Cannot access KTable from a different app as StateStore

To jest moja konfiguracja. Więc o co chodzi?

Tak jak mówię, próbuję stworzyć własną aplikację, która pozwala na interaktywne zapytania przy użyciu niestandardowego REST Api w Akka Http (wywołania RPC zgodnie z zaleceniami), aby umożliwić mi wysłanie zapytania do mojego KTable. Rzeczywiste przetwarzanie strumienia wydaje się przebiegać zgodnie z oczekiwaniami i mogę wydrukować wyniki z KTable i pasują do tego, co jest produkowane na dany temat.

Więc strona Przechowywanie rzeczy wydaje się działać

Problem wydaje się pojawić, gdy próbuje użyć metody Streams.allMetadata(), gdzie zwraca pustą listę.

Używam

  • Wykaz egzemplarzy
  • Scala 2,12
  • SBT
  • Akka.Http 10,9 dla reszty Api
  • Kafki 11.0

kod Producent

Oto kod dla mojego producenta

package Processing.Ratings { 

    import java.util.concurrent.TimeUnit 

    import Entities.Ranking 
    import Serialization.JSONSerde 
    import Topics.RatingsTopics 

    import scala.util.Random 
    import org.apache.kafka.clients.producer.ProducerRecord 
    import org.apache.kafka.clients.producer.KafkaProducer 
    import org.apache.kafka.common.serialization.Serdes 
    import Utils.Settings 
    import org.apache.kafka.clients.producer.ProducerConfig 

    object RatingsProducerApp extends App { 

    run() 

    private def run(): Unit = { 

     val jSONSerde = new JSONSerde[Ranking] 
     val random = new Random 
     val producerProps = Settings.createBasicProducerProperties 
     val rankingList = List(
     Ranking("[email protected]","[email protected]", 1.5f), 
     Ranking("[email protected]","[email protected]", 1.5f), 
     Ranking("[email protected]","[email protected]", 3.5f), 
     Ranking("[email protected]","[email protected]", 2.5f), 
     Ranking("[email protected]","[email protected]", 1.5f)) 

     producerProps.put(ProducerConfig.ACKS_CONFIG, "all") 

     System.out.println("Connecting to Kafka cluster via bootstrap servers " + 
     s"${producerProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)}") 

     // send a random string from List event every 100 milliseconds 
     val rankingProducer = new KafkaProducer[String, Array[Byte]](
     producerProps, Serdes.String.serializer, Serdes.ByteArray.serializer) 

     //while (true) { 
     for (i <- 0 to 10) { 
     val ranking = rankingList(random.nextInt(rankingList.size)) 
     val rankingBytes = jSONSerde.serializer().serialize("", ranking) 
     System.out.println(s"Writing ranking ${ranking} to input topic ${RatingsTopics.RATING_SUBMIT_TOPIC}") 
     rankingProducer.send(new ProducerRecord[String, Array[Byte]](
      RatingsTopics.RATING_SUBMIT_TOPIC, ranking.toEmail, rankingBytes)) 
     Thread.sleep(100) 
     } 

     Runtime.getRuntime.addShutdownHook(new Thread(() => { 
     rankingProducer.close(10, TimeUnit.SECONDS) 
     })) 
    } 
    } 
} 

Strumienie Kod

Oto kod strumienie

def createRatingStreamsProperties() : Properties = { 
    val props = createBasicStreamProperties 
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ratings-application") 
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "ratings-application-client") 
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass) 
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass) 
    props 
} 

private def createBasicStreamProperties() : Properties = { 
    val props = new Properties() 
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers) 
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass) 
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass) 
    // Records should be flushed every 10 seconds. This is less than the default 
    // in order to keep this example interactive. 
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object]) 
    // For illustrative purposes we disable record caches 
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0.asInstanceOf[Object]) 
    props 
} 

i rzeczywisty kod

import java.util.Properties 
import java.util.concurrent.TimeUnit 
import org.apache.kafka.common.serialization._ 
import org.apache.kafka.streams._ 
import org.apache.kafka.streams.kstream._ 
import Entities.Ranking 
import Serialization.JSONSerde 
import Topics.RatingsTopics 
import Utils.Settings 

package Processing.Ratings { 

import Stores.StateStores 
import org.apache.kafka.streams.state.HostInfo 


class DummyRankingReducer extends Reducer[Ranking] { 
    override def apply(value1: Ranking, value2: Ranking): Ranking = { 
    value2 
    } 
} 

class RankingByEmailInitializer extends Initializer[List[Ranking]] { 
    override def apply(): List[Ranking] = List[Ranking]() 
} 

class RankingByEmailAggregator extends Aggregator[String, Ranking,List[Ranking]] { 
    override def apply(aggKey: String, value: Ranking, aggregate: List[Ranking]) = { 
    value :: aggregate 
    } 
} 


object RatingStreamProcessingApp extends App { 

    run() 

    private def run() : Unit = { 
    val stringSerde = Serdes.String 
    val rankingSerde = new JSONSerde[Ranking] 
    val listRankingSerde = new JSONSerde[List[Ranking]] 
    val builder: KStreamBuilder = new KStreamBuilder 
    val rankings = builder.stream(stringSerde, rankingSerde, RatingsTopics.RATING_SUBMIT_TOPIC) 

    val rankingTable = rankings.groupByKey(stringSerde,rankingSerde) 
     .aggregate(
     new RankingByEmailInitializer(), 
     new RankingByEmailAggregator(), 
     listRankingSerde, 
     StateStores.RANKINGS_BY_EMAIL_STORE 
    ) 

    rankingTable.toStream.print() 

    val streams: KafkaStreams = new KafkaStreams(builder, Settings.createRatingStreamsProperties) 
    val restEndpoint:HostInfo = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort) 
    System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}") 
    System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}") 

    // Always (and unconditionally) clean local state prior to starting the processing topology. 
    // We opt for this unconditional call here because this will make it easier for you to play around with the example 
    // when resetting the application for doing a re-run (via the Application Reset Tool, 
    // http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool). 
    // 
    // The drawback of cleaning up local state prior is that your app must rebuilt its local state from scratch, which 
    // will take time and will require reading all the state-relevant data from the Kafka cluster over the network. 
    // Thus in a production scenario you typically do not want to clean up always as we do here but rather only when it 
    // is truly needed, i.e., only under certain conditions (e.g., the presence of a command line flag for your app). 
    // See `ApplicationResetExample.java` for a production-like example. 
    //streams.cleanUp(); 
    streams.start() 
    val restService = new RatingRestService(streams, restEndpoint) 
    restService.start() 


    //**************************************************************** 
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE 
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE 
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE 
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE 
    //**************************************************************** 


    val SIZE = streams.allMetadata.size() 
    val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size() 

    import org.apache.kafka.streams.state.KeyValueIterator 
    import org.apache.kafka.streams.state.QueryableStoreTypes 
    import org.apache.kafka.streams.state.ReadOnlyKeyValueStore 
    val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore) 

    val range = keyValueStore.all 
    val HASNEXT = range.hasNext 
    import org.apache.kafka.streams.KeyValue 
    while (range.hasNext  ) { 
     val next = range.next 
     System.out.println(String.format("key: %s | value: %s", next.key, next.value)) 
    } 

    Runtime.getRuntime.addShutdownHook(new Thread(() => { 
     streams.close(10, TimeUnit.SECONDS) 
     restService.stop 
    })) 

    //return unit 
    () 
    } 
} 

}

Gdzie mam ten config rzeczy

kafka { 
    bootStrapServers = "localhost:9092" 
    zooKeepers = "zookeeper:2181" 
    schemaRegistryUrl = "http://localhost:8081" 
    partition = 0, 
    restApiDefaultHostName = "localhost", 
    restApiDefaultPort = "8080" 
} 

REST Usługi

portu Scala pliku przykład: https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/MetadataService.java

package Processing.Ratings 

import org.apache.kafka.streams.KafkaStreams 
import org.apache.kafka.streams.state.StreamsMetadata 
import java.util.stream.Collectors 
import Entities.HostStoreInfo 
import org.apache.kafka.common.serialization.Serializer 
import org.apache.kafka.connect.errors.NotFoundException 
import scala.collection.JavaConverters._ 


/** 
    * Looks up StreamsMetadata from KafkaStreams 
    */ 
class MetadataService(val streams: KafkaStreams) { 


    /** 
    * Get the metadata for all of the instances of this Kafka Streams application 
    * 
    * @return List of { @link HostStoreInfo} 
    */ 
    def streamsMetadata() : List[HostStoreInfo] = { 

    // Get metadata for all of the instances of this Kafka Streams application 
    val metadata = streams.allMetadata 
    return mapInstancesToHostStoreInfo(metadata) 
    } 


    /** 
    * Get the metadata for all instances of this Kafka Streams application that currently 
    * has the provided store. 
    * 
    * @param store The store to locate 
    * @return List of { @link HostStoreInfo} 
    */ 
    def streamsMetadataForStore(store: String) : List[HostStoreInfo] = { 

    // Get metadata for all of the instances of this Kafka Streams application hosting the store 
    val metadata = streams.allMetadataForStore(store) 
    return mapInstancesToHostStoreInfo(metadata) 
    } 


    /** 
    * Find the metadata for the instance of this Kafka Streams Application that has the given 
    * store and would have the given key if it exists. 
    * 
    * @param store Store to find 
    * @param key The key to find 
    * @return { @link HostStoreInfo} 
    */ 
    def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = { 
    // Get metadata for the instances of this Kafka Streams application hosting the store and 
    // potentially the value for key 
    val metadata = streams.metadataForKey(store, key, serializer) 
    if (metadata == null) 
     throw new NotFoundException(
     s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}") 

    return new HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList) 
    } 




    def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = { 

    metadatas.stream.map[HostStoreInfo](metadata => 
     HostStoreInfo(
     metadata.host(), 
     metadata.port, 
     metadata.stateStoreNames.asScala.toList)) 
     .collect(Collectors.toList()) 
     .asScala.toList 
    } 



} 

A tutaj jest usługa REST (próbowałem uzyskać tylko trasę "instancji" w danym momencie).

package Processing.Ratings 

import org.apache.kafka.streams.KafkaStreams 
import org.apache.kafka.streams.state.HostInfo 
import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model._ 
import akka.http.scaladsl.server.Directives._ 
import akka.stream.ActorMaterializer 
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ 
import spray.json.DefaultJsonProtocol._ 
import Entities.AkkaHttpEntitiesJsonFormats._ 
import Entities._ 
import akka.http.scaladsl.marshalling.ToResponseMarshallable 

import scala.concurrent.Future 


object RestService { 
    val DEFAULT_REST_ENDPOINT_HOSTNAME = "localhost" 
} 


class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) { 

    val metadataService = new MetadataService(streams) 
    var bindingFuture: Future[Http.ServerBinding] = null 

    implicit val system = ActorSystem("rating-system") 
    implicit val materializer = ActorMaterializer() 
    implicit val executionContext = system.dispatcher 


    def start() : Unit = { 
    val emailRegexPattern = """\w+""".r 


    val route = 
     path("ratingByEmail"/emailRegexPattern) { email => 
     get { 

      //TODO : This would come from Kafka store, either local or remote 

      complete(ToResponseMarshallable.apply(List[Ranking](
      Ranking("[email protected]", "[email protected]", 4.0f), 
      Ranking("[email protected]", "[email protected]", 2.0f))) 
     ) 
     } 
     } ~ 
     path("instances") { 
     get { 
      val x = metadataService.streamsMetadata 
      complete(ToResponseMarshallable.apply(metadataService.streamsMetadata)) 
     } 
     } 


    bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port) 
    println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n") 

    Runtime.getRuntime.addShutdownHook(new Thread(() => { 
     bindingFuture 
     .flatMap(_.unbind()) // trigger unbinding from the port 
     .onComplete(_ => system.terminate()) // and shutdown when done 
    })) 
    } 


    def stop() : Unit = { 
    bindingFuture 
     .flatMap(_.unbind()) // trigger unbinding from the port 
     .onComplete(_ => system.terminate()) // and shutdown when done 
    } 

    def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = { 
    hostStoreInfo.host.equals(hostInfo.host()) && 
     hostStoreInfo.port == hostInfo.port 
    } 
} 

Oto dowód, że nie ma danych w sklepie

producentów działa enter image description here

strumienie działające enter image description here

Jest mi po uruchomieniu 1st producentów, następnie strumienie, a następnie producent ponownie (inny biegać).

Zobacz, jak wynika z KTable są pokazane, a potem zacząłem producent i pchnął jeszcze kilka wiadomości za pośrednictwem temacie który strumienie podniósł

Ale kiedy zapytać mego odpoczynku punkt końcowy, aby spróbować uzyskać metadane przy użyciu localhost:8080/instances, wszystko rozumiem pustą listę []

enter image description here

Liczyłam te linie z kodu strumieni powyżej wrócić trochę metadane, jest wyraźnie coś w sklepie, więc dlaczego nie metadane

val SIZE = streams.allMetadata.size() 
val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size() 

Oba zwraca 0, podczas gdy iteracja elementów w sklepie przy użyciu tego kodu

import org.apache.kafka.streams.state.KeyValueIterator 
import org.apache.kafka.streams.state.QueryableStoreTypes 
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore 
val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore) 

val range = keyValueStore.all 
val HASNEXT = range.hasNext 
import org.apache.kafka.streams.KeyValue 
while (range.hasNext  ) { 
    val next = range.next 
    System.out.println(String.format("key: %s | value: %s", next.key, next.value)) 
} 

Produkuje dane ze sklepu

enter image description here

wiem API REST działa poprawnie, ponieważ zakodowana na stałe trasa testowa działa poprawnie

enter image description here

Co robię źle ???

Odpowiedz

5

Więc pomyślałem, to się okazuje jest było powodu tej brakującej wartości config

props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080") 

Gdy dodałem, że Akka Htpp REST API http://localhost:8080/instance zaczął działać. Ale potem zaczął się ten dziwny wyjątek

org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, my-key-value-store, may have migrated to another instance. 
    at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49) 
    at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55) 
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699) 

Więc po przeczytaniu o tym tutaj: http://docs.confluent.io/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-another-instance

zdecydowałem, co muszę zrobić, to przeprowadzić jakąś logikę ponawiania, co zrobiłem tak:

Retry

Który pożyczyłem stąd: https://gist.github.com/Mortimerp9/5430595

package Utils 

import scala.concurrent._ 
import scala.concurrent.duration._ 


object Retry { 

    /** 
    * exponential back off for retry 
    */ 
    def exponentialBackoff(r: Int): Duration = scala.math.pow(2, r).round * 500 milliseconds 

    def noIgnore(t: Throwable): Boolean = false 

    /** 
    * retry a particular block that can fail 
    * 
    * @param maxRetry how many times to retry before to giveup 
    * @param deadline how long to retry before giving up; default None 
    * @param backoff  a back-off function that returns a Duration after which to retry. default is an exponential backoff at 100 milliseconds steps 
    * @param ignoreThrowable  if you want to stop retrying on a particular exception 
    * @param block a block of code to retry 
    * @param ctx an execution context where to execute the block 
    * @returns an eventual Future succeeded with the value computed or failed with one of: 
    * `TooManyRetriesException` if there were too many retries without an exception being caught. Probably impossible if you pass decent parameters 
    * `DeadlineExceededException` if the retry didn't succeed before the provided deadline 
    * `TimeoutException` if you provide a deadline and the block takes too long to execute 
    * `Throwable` the last encountered exception 
    */ 
    def retry[T](maxRetry: Int, 
       deadline: Option[Deadline] = None, 
       backoff: (Int) => Duration = exponentialBackoff, 
       ignoreThrowable: Throwable => Boolean = noIgnore)(block: => T)(implicit ctx: ExecutionContext): Future[T] = { 

    class TooManyRetriesException extends Exception("too many retries without exception") 
    class DeadlineExceededException extends Exception("deadline exceded") 

    val p = Promise[T] 

    def recursiveRetry(retryCnt: Int, exception: Option[Throwable])(f:() => T): Option[T] = { 
     if (maxRetry == retryCnt 
     || deadline.isDefined && deadline.get.isOverdue) { 
     exception match { 
      case Some(t) => 
      p failure t 
      case None if deadline.isDefined && deadline.get.isOverdue => 
      p failure (new DeadlineExceededException) 
      case None => 
      p failure (new TooManyRetriesException) 
     } 
     None 
     } else { 
     val success = try { 
      val rez = if (deadline.isDefined) { 
      Await.result(future(f()), deadline.get.timeLeft) 
      } else { 
      f() 
      } 
      Some(rez) 
     } catch { 
      case t: Throwable if !ignoreThrowable(t) => 
      blocking { 
       val interval = backoff(retryCnt).toMillis 
       Thread.sleep(interval) 
      } 
      recursiveRetry(retryCnt + 1, Some(t))(f) 
      case t: Throwable => 
      p failure t 
      None 
     } 
     success match { 
      case Some(v) => 
      p success v 
      Some(v) 
      case None => None 
     } 
     } 
    } 

    def doBlock() = block 

    Future { 
     recursiveRetry(0, None)(doBlock) 
    } 

    p.future 
    } 

} 

co ja nazywam się ten

def printStoreMetaData(streams:KafkaStreams) : Unit = { 

    import org.apache.kafka.streams.state.KeyValueIterator 
    import org.apache.kafka.streams.state.QueryableStoreTypes 
    import org.apache.kafka.streams.state.ReadOnlyKeyValueStore 

    val keyValueStoreTry = waitUntilStoreIsQueryable(
     StateStores.RANKINGS_BY_EMAIL_STORE, 
     QueryableStoreTypes.keyValueStore[String,List[Ranking]](), 
     streams 
    ) match { 
     case Success(keyValueStore) => { 
     val SIZE = streams.allMetadata.size() 
     val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size() 
     val range = keyValueStore.all 
     val HASNEXT = range.hasNext 
     import org.apache.kafka.streams.KeyValue 
     while (range.hasNext  ) { 
      val next = range.next 
      System.out.println(String.format("key: %s | value: %s", next.key, next.value)) 
     } 
     } 
     case Failure(f) => println(f) 
    } 

} 

Po zrobieniu że jego wszystkie szczęśliwe dni dla mnie.

Powiązane problemy