Skip to content

Transcription rust concurrency

Lars Bergstrom edited this page Nov 14, 2013 · 2 revisions

Brian Anderson Concurrency

  • This morning, I will talk about concurrency in rust and walk through a few common patterns you see in Rust. Yesterday, we talked about the theading model. Rust has tasks that are memory-isolated from each other, so they are data race free. You cannot share memory between two separate tasks. We transfer data between tasks by message passing and transfer of ownership. If one task wants to send data that another task needs, it sends a message from one task to the other task and transfers ownership. Message passing is the primary way that tasks in rust communicate. Message passing is almost always the mechanism you want to use between tasks for Rust concurrency.

Simple interactions

  • The first thing is spawning tasks and doing computations and viewing results. Today, we will pretend that we are an image loading task, in the way that we already have in Servo. Our work for this demo is load_image. Load_image takes a string and gives you something that is like an image (details are not important). We will do a few different forms of work that involve loading an image.
  • A worker is a concurrent task that goes off and does one thing, like compute something or do i/o for a while in parallel, in the background. When it is done, it will send you a message with the result. To set up communication between task, you need channels. We call them pipes. You do that by:
  let (port, chan) = stream();
  • Give me two endpoints, the receiver (port) and sender (channel). We build it with the stream function. Stream is a constructor in std::comm. It is imported via a standard Rust prelude into all proograms. A stream gives you a pipe that may send an unbounded number of messages and receive an unbounded number of them. You can send and receive as many messages as you like without worrying about blocking or running out of buffer space in the pipe. Most general form of communication, and it is almost always what you want by default. There are other more specialized forms of pipes, but rarely used.
  • One you have a pipe, you can create another task with:
 do spawn {
     }
  • Creates another task and runs it in parallel. Our worker thread just has to load the image. So it says:
let (port, chan) = stream();
let url = "foo.png";
do spawn {
    let image = load_image(url.to_str());
    }
  • Spawn creates a task by using the block (which is a closure, or first-class function) passed as an argument to it. It can capture values from the environment. Here, it implicitly captures a string (url) into the task. Since we referenced it in the spawned closure, it moves the url into it. Any variable that we reference from the enclosing environment will be automatically moved into the environment.
  • The to_str() changes the string to an owned ("twiddle", or ~) str.
  • When we created the task, the way we moved data into it through the capture. In order to use it, we need to send the data back to the main thread.
  • If we want to print our image, we need to use our channel to send a message back to the main task.
do spawn{
     ...
     chan.send(image);
     }
println~("{:?}", port.recv());
  • port.recv() gives us back the image that was sent on the channel. This pattern is used often for one-off computations. The image cache task in Servo works exactly like this for each image. It creates a task for each load, and runs them.
  • The closure we spawn the task with is a unique closure (or a ~fn(), "twiddle function"). Spawn looks like fn spawn(task: ~fn). Twiddle functions own their captured arguments. URL is being caputred by the spawn function and at that point, the url is owned by the spawn function. Since the function load_image takes an owned string ("~str"), the problem is that the spawned closure owns the url so it can't pass it to the function load_image. The reason we cannot pass ownership of the string url to the function load_image is that a ~fn could be called multiple times, so ownership of the string be transferred.
  • QUESTION: why did we transfer url to the other task?
  • This is an argument capture. I will expand the code so you can see exactly what is happening.
let task:~fn() = || {
      let image = load_image(url);
      chan.send(image);
      };
  • So we create the owned function, task, is referencing the value url, which lived in the main function. Because we referenced it, we moved the url into the owned function. You can imagine there was an argument to the owned function, but it is implicit for the closure.
  • Why url can't be moved out of the closure is that the load_image function wants to take ownership of the string is that it would make a "hole" in the closure (named task).
  • If we called task twice, on the second call, there would be a null pointer because the string url was taken out of the closure and ownership was transferred to the function load_image.
  • One way to solve this problem is to clone the url, which makes the program work now. That pattern is (doing work in a spawned task) is the easiest way to do it.
  • QUESTION: You are passing the url and the channel to the created task automatically?
  • Yes, the channel gets moved in automatically.
  • QUESTION: When do we pass the object automatically vs. express it explicitly (with spawn_with)?
  • spawn_with is explicit and may make the code clearer. Personally, I do not use it.
  • pcwalton: prevents you from having to use Cell.
  • Great! This problem here comes up over and over. Cell solves it. It is like an option where you can put in a value and take it out. It lives in std::Cell. A common pattern you will see to work around this exact problem (moving values out of closures) is to put them in a cell.
  let url_cell = Cell::new(url);
  
  do spawn {
    chan.send(load_image(url_cell.take()));
    }
  
  • So, in the closure, we take the value out of the cell. Because the cell becomes empty, we can take the value out of the cell and pass ownership to load_image without using clone. So we avoid one allocation. spawn_with does this exact thing without creating the cell.
  • QUESTION: What is the type signature of spawn_with?
  • Parameterized over <T:Send>, so you can send multiple values.
  • QUESTION: patrick's new code uses Slot, what is it?
  • pcwalton: Complicated version of Cell in Servo. Slot allows dynamic borrowing, but does not work with concurrency. It's like @mut, but with one less pointer indirection. I can talk more about it offline. I would like to get rid of Cell and replace it with Slot.
  • QUESTION: Confused about Cell and clone(); Why would you use Cell? Should I just use clone() instead of Cell and .take()?
  • The difference is that clone() makes a copy, which can perform an allocation. If url is an allocated string.
  • QUESTION: so cell moves the argument?
  • Yes, using the Cell means you allocate only a small cell object. Calling clone() can cause a large allocation to copy the string url. But, you cannot call .take() from the Cell object twice, so if you called the closure twice, it would fail (dynamically) the second time.
  • There is an upcoming change to closures. Currently, the type ~fn can be called multiple times. So, code like:
let foo: ~fn() = ||{};
foo();
foo();
  • We will make a change soon so that ~fn can only be called once (since that is how they are almost always used. It will now be consumed when it is called (a linear/affine type). That change will mean that you can write the more intuitive code (with neither the clone or Cell).

Using a trait so that you can call a closure multiple times

  • In cases where you do want to create an owned function that can be called multiple times, since we are making the change above, there will be no Rust feature that lets you do that. If you have a callback (like in the CSS parser), closures called multiple times will have to be rewritten as traits.
  type Callback = ~fn(); // will no longer work
  trait Callback {
      fn run(&self);
  }
  
  trait CallbackInt {
      fn run(&self, int);
  }
  fn load(cb: ~fn(int)); // old
  fn load(cb:~CallbackInt);
  
  struct CallbackIntData { i: int }
  impl CallbackInt for CallbakDataInt {
    fn run(&self, int) { ... }
    }
    
    //before:
    do load { ... }
    
    //after:
    let cb = ~CallbackDataInt {i:3};
    load(cb as ~CallbackInt);
  • That run method has the same signature as the original closure. So, the code that took the callback as a closure will now take the trait.
  • To implement the trait, you will need to create a struct and impl. Not a lot of examples of this behavior in the codebase currently.

Actor pattern

  • An actor is like an object, but where in an object you call methods, in an actor you send messages. Actor languages are a common concurrency abstraction. Rust is not an actor language, but it makes it easy to model the actor pattern. The script task, layout task, render tasks, image cache tasks, etc. are all them. They wait in a loop for messages, respond to them, and continue to wait for additional messages.
enum ImageLoaderMsg {
    LoadImage(~str, Chan<~[u8]>),
    Exit
    }
    
    fn main() {
        let (port, chan) = stream();
        let urlto_load = ~"foo.png"; // Corresponds to the port the actor receives on and chan to communicate with it.
        
       
    do spawn {
        loop {
            match port.recv() {
                LoadImage(url, result_chan) => {
                    let image = load_image(url);
                    result_chan.send(image);
                    },
                    Exit = { break; }
            }
        }
        
        for _ in range(0,10) {
          // make an image request by creating a port/chan to recieve the result
          let (result_port, result_chan) = stream();
          // then, send loadimage message
          chan.send(LoadImage(~"image.png", result_chan));
          println!("{}", result_port.recv());
        }
        
        chan.send(Exit);
    }
  • need to think about the set of mesasges an actor responds to. These are always enums. The actor event loop loops calling port.recv. Let's talk about the parameters to the LoadImage message. Think about the messages as method calls. If it was a method, it might be load_image(url:str)->[u8]. That same interface as a method is what we build now. The argument wil be a url.
  • When we send the image to load, we send the url to load along with a pipe on which to send the result. That is what the LoadImage message contains.
  • Our actor is just sitting in a loop receiving LoadImage messages over and over so it can load as many messages as we need in the program.
  • For Exit, we just send the Exit message and the program exits.
  • jack: The layout, script, and compositor tasks all do this. Every task in Servo has exactly this loop but with many more messages. This model is at the core of Servo's task architecture.
  • In practice, Servo's actors are a bit more complicated with a struct to hold all the state instead of part of the task closure. We create a struct that contains all of the state, rather than just capturing variables as part of the closure. Example with a cache:
  let mut cache = HashMap::new();
  ...
  // (inside the closure)
  if cache.contains_key(url) {
      result_chan.send(cache.get_copy());
  } else {
      // load the image, store it in the cache, and then send a copy
  }
  • In Servo, we factor out everything like the HashMap into members of a struct. This avoids relying on the closure and cleans up the code.

Shared channel

  • QUESTION: can we extend the example to use a shared channel?
  • Shared channel is also pretty common. If I have a bunch of tasks that all want to request images, we will need a shared channel. The challeng is that if in our for loop we want to spawn tasks to load an print the images, we need to use a special channel that permits cloning. We convert our normal channel into a shared channel.
  let shared_chan = SharedChan::new(chan);
  for _ in range(0, 10) {
    let new_chan = shared_chan.clone();
    do spawn {
        let (rp, rc) = stream();
        new_chan.send(LoadImage(...));
        let image = rp.recv();
        println!(...);
        }
        }
        
  • Do NOT try to do the shared_chan.clone() inside of the spawn. It must happen outside so that the main task has ownership of the shared_chan. SharedChan is in std::comm. If you do the clone inside of the spawn, ownership of the shared_chan would move to the spawned task.
  • pcwalton: SharedChan is slower than a regular channel, which is why they are not the default. SharedChan needs a lock, which makes it more expensive.
  • QUESTION: better to have 10 separate channels than one shared channel?
  • No, it's just that if you don't need a SharedChan, we use the faster one. It's not that it's slow, but just a little bit slower.
  • QUESTION: if I want to send data to multiple tasks, what should I use?
  • Multiple pipes. Do not use SharedPort - its semantics are complicated.
  • jack: So in Servo, if we want to send to the compostior and script tasks, we have individual channels for each one.
  • QUESTION: If I want to split the data, send multiple out, and then combine the result. Split the vector into N pieces to the tasks and combine, SharedChan?
  • jack: Multiple channels and send the channels. You need a piece of state to know what order to combine the work. The port is that piece of information.
  • pcwalton: If you want to shard work from a vector into 5 pieces, then what you do is make 5 channels. You then have the first task create all of the channels and they all have a channel back to the main task. The main task spawns all 5 of them to do work, and then the main task calls .recv() on the first one, then the second, etc. Since you do the receives in the same order as the sends, you will get them in order and fill them back up. You can use 5 regular channels and 5 ports with no SharedChan or SharedPort.
  • QUESTION: because of performance?
  • pcwalton: There's only one endpoint for each of the channels and streams, so no need for a SharedChan. There is only one owner per task for the 5 separate streams. You would use a SharedChan if you had something like the History task, where order does not matter. Basically, the design choice is whether the order matters or not, since multiple channels let you control the order you receive the results. SharedChan does not let you control the order that you receive the responses in.
  • QUESTION: So the 5 calls in SharedChan to port.recv() is in order...
  • pcwalton: You have no idea what the order is with SharedChan.
  • QUESTION: If the order does not matter, I can use SharedChan.
  • pcwalton: Yes.
  • jack: When we create a SharedChan to the compositor, we don't care which message gets processed first. We just receive and process the messages in any order.
  • QUESTION: What is the performance issue with SharedChan?
  • pcwalton: Probably faster than multiple channels.
  • jack: The compositor would have to listen on 5 channels, which is much more complicated as well. We only use multiple channels when we need to order the results. We do it for design reasons, not for performance reasons.
  • The performance of sending on a SharedChan is slightly worse than a non-SharedChan. The performance of recv on a single SharedChan is way better than receiving on many non-SharedChans. So, try to use a single SharedChan instead of 5 different ports at the same time. It's possible with select in the libraries, but it's a little flaky.
  • jack: In the compositor, we want to take messages from the renderer and windowing system events. So there, we have two channels. We peek on the individual channels and only call recv() when there is a message ready. In compositing/run.rs, we have the example of it. The main loop in the compositor checks on the compositor port and on the window events (resize, etc.). This is not great because we don't know which port has the newest message, so it has to use peek to see if there is any work to do. Kinda of klunky.
  • You picked the most complex example possible.
  • pcwalton: It needs to integrate with the OS native event loop, which will clean this up.
  • QUESTION: when you talk about perf, what is the difference?
  • I don't have numbers on SharedChan vs. Chan. The only difference is the SharedChan does one additional atomic swap. The performance impact will be related to the contention on that cache line. So it will be proportional to the number of concurrent accessors. The implementation of all these message passing types is going to change to increase the throughput performance. We are working hard on it in the near future.
  • jack: These messages are sent once per render, not 1000 per second. There are not that many, so the performance of message send is not a huge problem. Script to layout, though, is critical. Or if you are farming out work to many tasks.
Clone this wiki locally