2015-04-30 16 views

Odpowiedz

6
import org.apache.spark.mllib.linalg.{Vectors,Vector,Matrix,SingularValueDecomposition,DenseMatrix,DenseVector} 
import org.apache.spark.mllib.linalg.distributed.RowMatrix 

def computeInverse(X: RowMatrix): DenseMatrix = { 
    val nCoef = X.numCols.toInt 
    val svd = X.computeSVD(nCoef, computeU = true) 
    if (svd.s.size < nCoef) { 
    sys.error(s"RowMatrix.computeInverse called on singular matrix.") 
    } 

    // Create the inv diagonal matrix from S 
    val invS = DenseMatrix.diag(new DenseVector(svd.s.toArray.map(x => math.pow(x,-1)))) 

    // U cannot be a RowMatrix 
    val U = new DenseMatrix(svd.U.numRows().toInt,svd.U.numCols().toInt,svd.U.rows.collect.flatMap(x => x.toArray)) 

    // If you could make V distributed, then this may be better. However its alreadly local...so maybe this is fine. 
    val V = svd.V 
    // inv(X) = V*inv(S)*transpose(U) --- the U is already transposed. 
    (V.multiply(invS)).multiply(U) 
    } 
3

miałem problemy przy użyciu tej funkcji z opcją

conf.set("spark.sql.shuffle.partitions", "12") 

Wiersze RowMatrix został tasuje.

Tutaj jest aktualizacja, która pracowała dla mnie

import org.apache.spark.mllib.linalg.{DenseMatrix,DenseVector} 
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix 

def computeInverse(X: IndexedRowMatrix) 
: DenseMatrix = 
{ 
    val nCoef = X.numCols.toInt 
    val svd = X.computeSVD(nCoef, computeU = true) 
    if (svd.s.size < nCoef) { 
    sys.error(s"IndexedRowMatrix.computeInverse called on singular matrix.") 
    } 

    // Create the inv diagonal matrix from S 
    val invS = DenseMatrix.diag(new DenseVector(svd.s.toArray.map(x => math.pow(x, -1)))) 

    // U cannot be a RowMatrix 
    val U = svd.U.toBlockMatrix().toLocalMatrix().multiply(DenseMatrix.eye(svd.U.numRows().toInt)).transpose 

    val V = svd.V 
    (V.multiply(invS)).multiply(U) 
} 
0

Matrix U zwrócone przez X.computeSVD ma wymiary MXK gdzie m jest liczba wierszy oryginalnego (rozproszonej) RowMatrix X. Można by oczekiwać m być duże (prawdopodobnie większe niż k), więc nie jest wskazane, aby zebrać je w sterowniku, jeśli chcemy, aby nasz kod do skalowania do bardzo dużych wartości m.

Powiedziałbym, że oba poniższe rozwiązania mają tę wadę. Odpowiedź podana przez @Alexander Kharlamov dzwoni val U = svd.U.toBlockMatrix().toLocalMatrix(), która zbiera matrycę w sterowniku. To samo dzieje się z odpowiedzią podaną przez @Climbs_lika_Spyder (twoja nić skał !!), która wywołuje svd.U.rows.collect.flatMap(x => x.toArray). Proponuję raczej polegać na mnożeniu macierzy rozproszonej, takiej jak kod Scala, opublikowany here.

+0

Nie widzę żadnych odwrotnych obliczeń w dodanym łączu. –

+0

@Climbs_lika_Spyder Link dotyczy mnożenia macierzy rozproszonych w celu zastąpienia mnożenia macierzy lokalnej '(V.multiply (invS)). Mnożenia (U)' w ostatniej linijce twojego rozwiązania, abyś nie musiał zbierać 'U' w kierowcy. Myślę, że 'V' i' invS' nie są wystarczająco duże, aby powodować problemy. – Pablo