Dmitry Leskov
 

Scala Stream Hygiene I: Avoiding Memory Leaks

The source code for this post is available on GitHub.

Update 25-Jul-2014: Part II is out with surprise findings — don’t miss it!

Update 11-Aug-2014: Part III is available too.

Lazy evaluation, also known as call-by-need, is commonly found in functional languages. Some of them go as far as to make it the default evaluation strategy; perhaps the most prominent example is Haskell. Language authors however seem to prefer eager (strict) evaluation, whether because it results in better performance in the majority of practical use cases, or because it plays better with the imperative features of their languages, such as I/O and exceptions, or because the authors find it easier to implement. So they add a number of features to the language and the standard library that enable the developers to use lazy evaluation if they really want to.

In Scala, the language features are the lazy modifiers for vals and by-name function parameters. And in the standard library, amongs others, there is the class Stream (scala.collection.immutable.Stream). It was the subject of my recent studies, the results of which I share in this series.

Memoization

Lazy evaluation often goes hand-in-hand with memoization. Without memoization, many programs implemented using lazy evaluation would exhibit terrible performance characteristics. Stream implements memoization and hence can be reasoned about as a List of elements computed on-demand.

However, there are also scenarios in which memoization is highly undesirable. One benefit of lazy evaluation is the ability to define potentially infinite data structures. In case of streams, these can be a stream of natural numbers or a stream of data packets coming in from the network. Problem is, all computers commercially available today only support a finite amount of memory, so memoization of such data structures is very capable of making your program throw an OutOfMemoryError.

When you either have no need to use stream elements more then once, as in the network packet filtering scenario, or stream elements can be re-computed very cheaply (natural numbers), you better ensure that they do not get memoized.

Below are the rules that will help you avoid memoization of Scala streams. I’ve collected them from various sources and confirmed by compiliing and decompiling test programs. If you know of any other techniques or edge cases, please post in the comments.

Rules for Avoiding Stream Memoization

  1. Define streams using def and never store them in vals.

    This should be obvious, because val ensures that memoizaion occurs – see Stream scaladoc – but obvious things are often worth stating explicitly.

  2. Consume streams in tail-recursive functions.

    Again, this is rather obvious – if the consuming function is recursive, but not tail-recursive, a reference to the original stream will remain on the call stack until the recursion completes, effectively holding the entire stream in memory. (Not to mention that such a function would likely throw a StackOverflowError when there is still plenty of memory available on the heap.)

    How the tail-recursive functions manage to avoid the OOM? Let’s decompile an example:

    @tailrec
    def sum(xs: Stream[Int], z: Int = 0): Int = 
      if (xs.isEmpty) z else sum(xs.tail, z + xs.head)

    Here is what the decompiler produces:

    public int sum(Stream<Object> xs, int z) {
      for (;;) {
        if (xs.isEmpty()) return z;
        z += BoxesRunTime.unboxToInt(xs.head());
        xs = (Stream)xs.tail();
      }
    }

    Notice that the xs parameter is reused. It gets overwritten on each loop iteration, so it always holds a reference to the not-yet processed remainder of the original stream.

  3. Pass streams around via by-name parameters. (Make sure to read the corollary below.)

    Sometimes you need to pass a stream through intermediate functions before its consumption, but that would leave references to the stream on the call stack.

    Typical example:

    def sum(xs: Stream[Int]): Int = {
      @tailrec
      def loop(acc: Int, xs: Stream[Int]): Int =
        if (xs.isEmpty) acc else loop(acc + xs.head, xs.tail)
      loop(0, xs)
    }

    Although the inner function loop is tail-recursive, the sum function that calls it will hold a reference to the head of the stream in its parameter.

    The advice commonly found on the Net is to pass the stream around in a "container", such as a single-element array or an AtomicReference, and nullify its contents in the consuming function. But this results in awkward-looking, impure code. I am not sure why the built-in language feature that achieves the same effect gets overlooked.

    In the above example, if you make xs a by-name parameter to sum, what gets actually passed is a function, computed right before the call to loop, so its result does not hold the entire stream:

    def sum(xs: => Stream[Int]): Int = {
       .  .  .

    As you may have noticed when reading about Rule #2, you could also get rid of the outer function altogether using a default parameter value:

    @tailrec
    def sum(xs: Stream[Int], z: Int = 0): Int = 
      if (xs.isEmpty) z else sum(xs.tail, z + xs.head)

    but that is not always possible.

    Corollary: When defining stream-consuming functions in traits, wrap them in functions accepting streams as by-name parameters.

    This one is subtle, and I would say the most unfortunate, because you have no control over the root cause of this restriction. The root cause is that trait methods are not called directly, but via a forwarder method generated by the compiler, even if the caller is a member of the same trait. The forwarder method will hold a reference to the entire stream, that is, unless the stream is passed as a by-name parameter.

    Example:

    trait StreamConsumers {
      @tailrec
      final def sum(xs: Stream[Int], z: Int = 0): Int = {
        if (xs.isEmpty) z else sum(xs.tail, z + xs.head)
      }
      def sumByName(xs: => Stream[Int]): Int = {
        @tailrec def loop(acc: Int, xs: Stream[Int]): Int =
          if (xs.isEmpty) acc else loop(acc+xs.head, xs.tail)
        loop(0, xs)
      }
    
       .  .  .
    
    object Main extends StreamConsumers {
       .  .  .

    And here are the forwarders in the decompiled code of Main:

    public final class Main$
      implements StreamConsumers
    {
      public static final  MODULE$;
    
      public final int sum(Stream<Object> xs, int z) {
        return StreamConsumers.class.sum(this, xs, z);
      }
      public int sumByName(Function0<Stream<Object>> xs) {
        return StreamConsumers.class.sumByName(this, xs);
      }
       .  .  .

    You may also notice that enclosing a tail-recursive function in a wrapper method relieves you from the need to declare that method as final.

  4. Do not pattern match against streams outside the consuming functions.

    It is perfectly okay to use pattern matching inside a tail-recursive function that consumes the stream:

    def sumPatMatInner(xs: => Stream[Int]): Int = {
      @tailrec
      def loop(acc: Int, xs: Stream[Int]): Int =
        xs match {
          case Stream.Empty => acc
          case y #:: ys => loop(acc + y, ys)
        }
      loop(0, xs)
    }

    Hence a pattern matching addict might write something like this:

    def sumPatMat(xs: => Stream[Int]): Int = {
      @tailrec
      def loop(acc: Int, xs: Stream[Int]): Int =
        xs match {
          case Stream.Empty => acc
          case y #:: ys => loop(acc + y, ys)
        }
      xs match {
        case Stream.Empty => 0
        case x #:: Stream.Empty => x
        case y #:: ys => loop(y, ys)
      }
    }

    Why can this lead to an OOM? Let’s consider a simpler example:

    createStream match {
      case x #:: xs => consumeStream(x, xs)
      case _ => println("No data to process")
    }

    As of Scala 2.10, this code is an exact equivalent of the following:

    val foo: Option[(A, Stream[A])] = Stream.#::.unapply(createStream)
    if (foo.isEmpty) println("No data to process")
    else {val x = foo.get._1; val xs = foo.get._2; consumeStream(x, xs)}

    where A is the type of stream elements and foo is a unique name not used anywhere else.

    As you may see, if the first pattern matches, val xs will hold a reference to the tail of the stream returned by createStream. In fact, the temporary val foo will contain a reference to the entire stream.

    StackOverflow user Daniel Martin described a solution for safely matching on stream tail, which I think is a nice demostration of Scala implicits, but otherwise an overkill, so I won’t reproduce it here.

  5. Only call the eagerly evaluated Stream methods that are marked as "optimized for GC". The methods foreach, foldLeft, and reduceLeft have been specialized for the class Stream. length is also "GC-safe". (Of course, they all loop forever if the receiver is an infinite stream.)

    However, the method /: was left with the default implementation from scala.collections.TraversableOnce, which simply calls foldLeft, effectively holding the reference to the receiver on its stack frame:

    def /:[B](z: B)(op: (B, A) => B): B = foldLeft(z)(op)

    This also applies to methods forall, exists, find, max, min, sum, product, and possibly others.

Now I have some news for you. Under certain circumstances, your program can get away with breaking Rules 2 to 5. Hop over to Part II for details.

Tags: , ,

« | »

Talkback

  1. Justin
    03-Mar-2015
    10:00 pm
    1

    Hi, interesting article. I noticed you used trough instead of through early on.

  2. Dmitry Leskov
    29-Mar-2015
    12:34 pm
    2

    Fixed, thanks!

  3. Alex
    10-Aug-2015
    2:47 pm
    3

    “Problem is,
    all computers commercially available today
    only support a finite amount of memory” -
    very funny joke

* Copy This Password *

* Type Or Paste Password Here *