2015-08-10 11 views
6

Jestem bardzo nowy, aby iskrzyć, ale chcę utworzyć wykres z relacji, które otrzymuję z tabeli Hive. Znalazłem funkcję, która ma na to pozwolić bez definiowania wierzchołków, ale nie mogę jej uruchomić.Jak utworzyć wykres z Array [(Any, Any)] za pomocą Graph.fromEdgeTuples

wiem, że to nie jest powtarzalna przykład, ale tutaj jest mój kod:

import org.apache.spark.SparkContext 
import org.apache.spark.graphx._ 
import org.apache.spark.rdd.RDD 
val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc) 
val data = sqlContext.sql("select year, trade_flow, reporter_iso, partner_iso, sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year, trade_flow, reporter_iso, partner_iso").collect() 
val data_2010 = data.filter(line => line(0)==2010) 
val couples = data_2010.map(line=>(line(2),line(3)) //country to country 

val graph = Graph.fromEdgeTuples(couples, 1) 

Ostatnia linia generuje następujący błąd:

val graph = Graph.fromEdgeTuples(sc.parallelize(couples), 1) 
<console>:31: error: type mismatch; 
found : Array[(Any, Any)] 
required: Seq[(org.apache.spark.graphx.VertexId,org.apache.spark.graphx.VertexId)] 
Error occurred in an application involving default arguments. 
val graph = Graph.fromEdgeTuples(sc.parallelize(couples), 1) 

pary wyglądać następująco:

couples: Array[(Any, Any)] = Array((MWI,MOZ), (WSM,AUS), (MDA,CRI), (KNA,HTI), (PER,ERI), (SWE,CUB), (DEU,PRK), (THA,DJI), (BIH,SVK), (RUS,THA), (SGP,BLR), (MEX,TGO), (TUR,ZAF), (ZWE,SYC), (UGA,GHA), (OMN,SVN), (NZL,SYR), (CHE,SLV), (CZE,LUX), (TGO,COM), (TTO,WLF), (NGA,PAN), (FJI,UKR), (BRA,ECU), (EGY,SWE), (ITA,ARG), (MUS,MLT), (MDG,DZA), (ARE,SUR), (CAN,GUY), (OMN,COG), (NAM,FIN), (ITA,HMD), (SWE,CHE), (SDN,NER), (TUN,USA), (THA,GMB), (HUN,TTO), (FRA,BEN), (NER,TCD), (CHN,JPN), (DNK,ZAF), (MLT,UKR), (ARM,OMN), (PRT,IDN), (BEN,PER), (TTO,BRA), (KAZ,SMR), (CPV,""), (ARG,ZAF), (BLR,TJK), (AZE,SVK), (ITA,STP), (MDA,IRL), (POL,SVN), (PRY,ETH), (HKG,MOZ), (QAT,GAB), (THA,MUS), (PHL,MOZ), (ITA,SGS), (ARM,KHM), (ARG,KOR), (AUT,GMB), (SYR,COM), (CZE,GBR), (DOM,USA), (CYP,LAO), (USA,LBR) 

Jak mogę przekonwertować na odpowiedni format?

Odpowiedz

7

Przede wszystkim nie można używać String jako VertexId, więc trzeba zamapować etykiety na Long. Następnie musimy przygotować mapowanie z etykiety na identyfikator. Dopóki liczba unikalnych wartości jest stosunkowo niewielka, najprostszym rozwiązaniem jest utworzenie zmiennej nadanie:

val idMap = sc.broadcast(couples // -> Array[(Any, Any)] 
    // Make sure we use String not Any returned from Row.apply 
    // And convert to Seq so we can flatten results 
    .flatMap{case (x: String, y: String) => Seq(x, y)} // -> Array[String] 
    // Get different keys 
    .distinct // -> Array[String] 
    // Create (key, value) pairs 
    .zipWithIndex // -> Array[(String, Int)] 
    // Convert values to Long so we can use it as a VertexId 
    .map{case (k, v) => (k, v.toLong)} // -> Array[(String, Long)] 
    // Create map 
    .toMap) // -> Map[String,Long] 

Następny możemy użyć powyższego wykonać mapowanie:

val edges: RDD[(VertexId, VertexId)] = sc.parallelize(couples 
    .map{case (x: String, y: String) => (idMap.value(x), idMap.value(y))} 
) 

Ostatecznie otrzymujemy wykres:

val graph = Graph.fromEdgeTuples(edges, 1) 
+0

Waw, dzięki! Spróbuję tego jutro. Czy mógłbyś opracować różne metody, których używałeś? Mam ogólny pomysł, ale byłoby mi bardzo przydatnym móc zrozumieć każdy krok, a nie tylko kopiować to. –

+1

Pewnie, dodałem komentarze i informacje o typie. – zero323

+0

Całkowicie działa, wielkie dzięki za wyjaśnienie. Czy wiesz, czy to możliwe do wizualizacji wykresu ze Spark, pracuję z konsolą, więc nie ma chyba graficznego interfejsu? –

Powiązane problemy