Ractor - Ruby's Actor 并发模型

最后更新于
Ref:
Ractor 下多线程 Ruby 程序指南
Ruby 仓库里的文档

由于 GIL 的存在,CRuby 以往只能同时执行一个线程。现在,我们通过限制线程间传递消息的手段,实现了真正的并发编程,这就是 Ractor,其实就是 Ruby + Actor 并发模型。Actor 模型其实就是用通信代替锁,类似的例子还可以参考 JS 里的 Worker,这里不做展开。

一个 Ractor 就是一个真正并行的行为受限的 Thread,我们可以看到他和原来的 Thread 的诸多相似之处,下面大概是一些使用笔记。

消息模型,可以参考官方文档里的示意图:Sending/Receiving ports

A Ractor has:
    incoming << msg
        msg = receive()   阻塞地从 incoming 里取出一条消息
                          如果当前没有消息,会一直等到他有
    outgoing >> msg
        Ractor.yield(msg) 往 outgoing 里放一条消息
                          并且阻塞,直到外面有个人 take 了这条消息
    a thread to run ruby code in parallel

外部使用这个 ractor:
    r = Ractor.new { some code to run in parallel }
    r << msg              非阻塞地往 r 的 incoming 中放一条消息
    r.take                阻塞地从 r 的 outgoing 中获取一条消息

下面举一些具体的并发编程例子,部分来自文档。

Hello, Ractor

Ractor.new { 42 }.take #=> 42

计算 0 到 10000000 的平方和

t = Time.new
p Array.new(4) { |i|
  Ractor.new(i) { |i|
    a = (10000000 / 4) * i ... (10000000 / 4) * (i + 1)
    a.map { _1 ** 2 }.sum
  }
}.map { _1.take }.sum
p Time.new - t

所有权

我们可以在 send() 时添加 move: true 来把所有权移交给某个 Ractor。

r.take() 可以获取 Ractor.yield() 或最终的返回值,其中我们获取到最终的返回值时,这个值的所有权会移交给 take 它的地方,也就是说这里没有对消息不可变的限制。

r = Ractor.new do
  a = "hello"
  binding
end
r.take.eval("p a") #=> "hello"

Promise.race

r1 = Ractor.new{'r1'}
r2 = Ractor.new{'r2'}
rs = [r1, r2]
as = []

# Wait for r1 or r2's Ractor.yield
r, obj = Ractor.select(*rs)
rs.delete(r)
as << obj

# Second try (rs only contain not-closed ractors)
r, obj = Ractor.select(*rs)
rs.delete(r)
as << obj
as.sort == ['r1', 'r2'] #=> true

用一个 Ractor 当消息队列

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

RN = 10
rs = RN.times.map{|i|
  Ractor.new pipe, i do |pipe, i|
    msg = pipe.take
    msg # ping-pong
  end
}
RN.times{|i|
  pipe << i
}
RN.times.map{
  r, n = Ractor.select(*rs)
  rs.delete r
  n
}.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

类似的,可以用有限个 Ractor 和一个队列来做 Worker Pool:

require 'prime'

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

N = 1000
RN = 10
workers = (1..RN).map do
  Ractor.new pipe do |pipe|
    while n = pipe.take
      Ractor.yield [n, n.prime?]
    end
  end
end

(1..N).each{|i|
  pipe << i
}

pp (1..N).map{
  _r, (n, b) = Ractor.select(*workers)
  [n, b]
}.sort_by{|(n, b)| n}

自动 freeze 所有可以作为值类型的常量字面量

# shareable_constant_value: literal

TABLE = {a: 'ko1', b: 'ko2', c: 'ko3'}
TABLE.frozen? #=> true

依次执行

# pipeline with yield/take
r1 = Ractor.new do
  'r1'
end
r2 = Ractor.new r1 do |r1|
  r1.take + 'r2'
end
r3 = Ractor.new r2 do |r2|
  r2.take + 'r3'
end
p r3.take #=> 'r1r2r3'

# pipeline with send/receive
r3 = Ractor.new Ractor.current do |cr|
  cr.send Ractor.receive + 'r3'
end
r2 = Ractor.new r3 do |r3|
  r3.send Ractor.receive + 'r2'
end
r1 = Ractor.new r2 do |r2|
  r2.send Ractor.receive + 'r1'
end
r1 << 'r0'
p Ractor.receive #=> "r0r1r2r3"

一个更复杂的例子

# ring example with supervisor and re-start

def make_ractor r, i
  Ractor.new r, i do |r, i|
    loop do
      msg = Ractor.receive
      raise if /e/ =~ msg
      r.send msg + "r#{i}"
    end
  end
end

r = Ractor.current
rs = (1..10).map{|i|
  r = make_ractor(r, i)
}

msg = 'e0' # error causing message
begin
  r.send msg
  p Ractor.select(*rs, Ractor.current)
rescue Ractor::RemoteError
  r = rs[-1] = make_ractor(rs[-2], rs.size-1)
  msg = 'x0'
  retry
end

#=> [:receive, "x0r9r9r8r7r6r5r4r3r2r1"]