2010-11-09 13 views
20

Czy możliwe jest zastosowanie programowania funkcjonalnego w strumieniach Scala, tak, że strumień jest przetwarzany sekwencyjnie, ale już przetworzona część strumienia może zostać zebrana?Funkcjonalne przetwarzanie strumieni Scala bez błędów OutOfMemory

Na przykład, zdefiniować Stream który zawiera numery od start do end:

def fromToStream(start: Int, end: Int) : Stream[Int] = { 
    if (end < start) Stream.empty 
    else start #:: fromToStream(start+1, end) 
} 

Jeśli zsumować wartości w funkcjonalnym stylu:

println(fromToStream(1,10000000).reduceLeft(_+_)) 

Dostaję OutOfMemoryError - być może dlatego, że ramka stosu połączenia do reduceLeft zawiera odniesienie do nagłówka strumienia. Ale jeśli mogę to zrobić w stylu iteracyjny, to działa:

var sum = 0 
for (i <- fromToStream(1,10000000)) { 
    sum += i 
} 

Czy istnieje sposób, aby zrobić to w stylu funkcjonalnym bez coraz OutOfMemory?

AKTUALIZACJA: To był a bug in scala, który jest teraz ustalony. Teraz jest to mniej lub bardziej przestarzałe.

+2

Chociaż to w żaden sposób nie odpowiada na twoje pytanie, uważam, że składnia '# ::' dla strumieni jest dużo bardziej czytelna niż "Stream.cons" –

Odpowiedz

13

Tak, możesz. Sztuczka polega na użyciu metod rekursywnych ogona, tak aby ramka stosu lokalnego zawierała tylko odniesienie do instancji Stream. Ponieważ metoda jest rekursywna, lokalne odwołanie do poprzedniej Stream zostanie usunięte po wywołaniu rekursywnie, co umożliwia GC zbieranie początków Stream.

Welcome to Scala version 2.9.0.r23459-b20101108091606 (Java HotSpot(TM) Server VM, Java 1.6.0_20). 
Type in expressions to have them evaluated. 
Type :help for more information. 

scala> import collection.immutable.Stream 
import collection.immutable.Stream 

scala> import annotation.tailrec 
import annotation.tailrec 

scala> @tailrec def last(s: Stream[Int]): Int = if (s.tail.isEmpty) s.head else last(s.tail) 
last: (s: scala.collection.immutable.Stream[Int])Int 

scala> last(Stream.range(0, 100000000))                    
res2: Int = 99999999 

Ponadto, należy upewnić się, że rzeczą, jaką można przekazać do metody last powyżej ma tylko jedno odniesienie na stosie. Jeśli przechowujesz wartość Stream w zmiennej lokalnej lub wartości, nie będzie ona wywoływana podczas wywoływania metody last, ponieważ jej argument nie jest jedyną referencją pozostawioną do Stream. Poniższy kod zabraknie pamięci.

scala> val s = Stream.range(0, 100000000)                   
s: scala.collection.immutable.Stream[Int] = Stream(0, ?)                

scala> last(s)                          
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space            
     at sun.net.www.ParseUtil.encodePath(ParseUtil.java:84)              
     at sun.misc.URLClassPath$JarLoader.checkResource(URLClassPath.java:674)          
     at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:759)          
     at sun.misc.URLClassPath.getResource(URLClassPath.java:169)             
     at java.net.URLClassLoader$1.run(URLClassLoader.java:194)             
     at java.security.AccessController.doPrivileged(Native Method)            
     at java.net.URLClassLoader.findClass(URLClassLoader.java:190)            
     at java.lang.ClassLoader.loadClass(ClassLoader.java:307)              
     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)            
     at java.lang.ClassLoader.loadClass(ClassLoader.java:248)              
     at scala.tools.nsc.Interpreter$Request$$anonfun$onErr$1$1.apply(Interpreter.scala:978)      
     at scala.tools.nsc.Interpreter$Request$$anonfun$onErr$1$1.apply(Interpreter.scala:976)      
     at scala.util.control.Exception$Catch.apply(Exception.scala:80) 
     at scala.tools.nsc.Interpreter$Request.loadAndRun(Interpreter.scala:984)          
     at scala.tools.nsc.Interpreter.loadAndRunReq$1(Interpreter.scala:579)          
     at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:599)            
     at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:576) 
     at scala.tools.nsc.InterpreterLoop.reallyInterpret$1(InterpreterLoop.scala:472)        
     at scala.tools.nsc.InterpreterLoop.interpretStartingWith(InterpreterLoop.scala:515)       
     at scala.tools.nsc.InterpreterLoop.command(InterpreterLoop.scala:362) 
     at scala.tools.nsc.InterpreterLoop.processLine$1(InterpreterLoop.scala:243) 
     at scala.tools.nsc.InterpreterLoop.repl(InterpreterLoop.scala:249) 
     at scala.tools.nsc.InterpreterLoop.main(InterpreterLoop.scala:559) 
     at scala.tools.nsc.MainGenericRunner$.process(MainGenericRunner.scala:75) 
     at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:31) 
     at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala) 

Podsumowując:

  1. Stosować metody ogon rekurencyjnej
  2. Opisywanie ich za ogon rekurencyjnej
  3. Kiedy je nazwać, upewnić się, że ich argument jest tylko odniesienie do Stream

EDYTOWANIE:

Zauważ, że ten również działa i nie powoduje out błędu pamięci:

scala> def s = Stream.range(0, 100000000)             
s: scala.collection.immutable.Stream[Int] 

scala> last(s)                    
res1: Int = 99999999 

EDIT2:

A w przypadku reduceLeft, które wymagają, trzeba by zdefiniować metody pomocnika z argumentem akumulatora dla wyniku.

Dla argumentu reduceLeft potrzebny jest argument dotyczący akumulatorów, który można ustawić na określoną wartość przy użyciu domyślnych argumentów. Uproszczony przykład:

scala> @tailrec def rcl(s: Stream[Int], acc: Int = 0): Int = if (s.isEmpty) acc else rcl(s.tail, acc + s.head) 
rcl: (s: scala.collection.immutable.Stream[Int],acc: Int)Int 

scala> rcl(Stream.range(0, 10000000)) 
res6: Int = -2014260032 
+2

Gdzie zdefiniowałbyś metodę pomocnika? Gdyby w wewnętrznej metody 'reduceLeft', czy osoba wywołująca metody pomocnika nie ryzykowałaby trzymając się nagłówka strumienia? – huynhjl

+0

Hmmm. Dobra uwaga - w istocie. Optymalizację połączeń końcowych można zastosować tylko do metod rekurencyjnych. Masz rację. Ale można wtedy grać z domyślnymi parametrami. Zobacz moją drugą edycję. – axel22

+0

Mam ten sam problem z OutOfMemory, ale używając stream.foreach - jak mogę go rozwiązać? –

2

Możesz chcieć spojrzeć na Scalaz's ephemeral streams.

+8

Fragment byłby świetny, aby zobaczyć, jak efemeryczne strumienie odnoszą się do tego konkretnego pytania . Link, który podasz, wskazuje na plik źródłowy bez komentarza. – huynhjl

19

Kiedy zacząłem się uczyć o Stream Myślałem, że to było fajne. Wtedy zdałem sobie sprawę, że to, co chcę używać prawie przez cały czas, to Iterator.

W przypadku, gdy potrzebujemy Stream ale chcą, aby reduceLeft pracy:

fromToStream(1,10000000).toIterator.reduceLeft(_ + _) 

Jeśli spróbujesz powyżej linii, będzie zbierać śmieci po prostu w porządku. Odkryłem, że używanie Strumienia jest trudne, ponieważ łatwo jest go przytrzymać, nie zdając sobie z tego sprawy. Czasami standardowa biblioteka będzie dla ciebie dostępna - w bardzo subtelny sposób.

2

Jak się okazuje, jest to a bug w bieżącej realizacji reduceLeft. Problem polega na tym, że metoda reduceLeft wywołuje funkcję foldLeft, a zatem ramka stosu metody reduceLeft zawiera odniesienie do nagłówka strumienia podczas całego wywołania. FoldLeft używa rekursji ogonowej, aby uniknąć tego problemu. Porównaj:

(1 to 10000000).toStream.foldLeft(0)(_+_) 
(1 to 10000000).toStream.reduceLeft(_+_) 

Są one semantycznie równoważne. W Scala w wersji 2.8.0 wywołanie foldLeft działa, ale wywołanie metody reduceLeft powoduje wyrzucenie OutOfMemory. Jeśli funkcja reduceLeft wykona swoją własną pracę, problem ten nie wystąpi.

Powiązane problemy