Skip to content

Commit

Permalink
More bugfixes for weak thread finalization
Browse files Browse the repository at this point in the history
If a thread was held in an upvalue, then it was possible for the runtime
to think that this thread was dead when it was not and reset it.

To prevent this, the runtime has been changed to mark the *targets* of
all live upvalues as live during finalization, even though they are held
through a weak thread ptr.

This requires two-phase finalization, one to make sure all live upvalues
are actually marked as live, *then* one to reset every dead thread.
Multi-phase finalization has always been on the roadmap so this is not
really a problem.
  • Loading branch information
kyren committed Jun 16, 2024
1 parent 892bfa2 commit b079a68
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 15 deletions.
23 changes: 22 additions & 1 deletion src/finalizers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::{thread::ThreadInner, Thread};
pub struct Finalizers<'gc>(Gc<'gc, RefLock<FinalizersState<'gc>>>);

impl<'gc> Finalizers<'gc> {
const THREAD_ERR: &'static str = "thread finalization was missed";

pub(crate) fn new(mc: &Mutation<'gc>) -> Self {
Finalizers(Gc::new(mc, RefLock::default()))
}
Expand All @@ -15,10 +17,29 @@ impl<'gc> Finalizers<'gc> {
self.0.borrow_mut(mc).threads.push(Gc::downgrade(ptr));
}

/// First stage of two-stage finalization.
///
/// This stage can cause resurrection, so the arena must be *fully re-marked* before stage two
/// (`Finalizers::finalize`).
pub(crate) fn prepare(&self, fc: &Finalization<'gc>) {
let state = self.0.borrow();
for &ptr in &state.threads {
let thread = Thread::from_inner(ptr.upgrade(fc).expect(Self::THREAD_ERR));
thread.resurrect_live_upvalues(fc).unwrap();
}
}

/// Second stage of two-stage finalization.
///
/// Assuming stage one was called (`Finalizers::prepare`) and the arena fully re-marked, this
/// method will *not* cause any resurrection.
///
/// The arena must *immediately* transition to `CollectionPhase::Collecting` afterwards to not
/// miss any finalizers.
pub(crate) fn finalize(&self, fc: &Finalization<'gc>) {
let mut state = self.0.borrow_mut(fc);
state.threads.retain(|&ptr| {
let ptr = ptr.upgrade(fc).expect("thread finalization was missed");
let ptr = ptr.upgrade(fc).expect(Self::THREAD_ERR);
if Gc::is_dead(fc, ptr) {
Thread::from_inner(ptr).reset(fc).unwrap();
false
Expand Down
18 changes: 8 additions & 10 deletions src/lua.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ impl<'gc> ops::Deref for Context<'gc> {

pub struct Lua {
arena: Arena<Rootable![State<'_>]>,
finalized: bool,
}

impl Default for Lua {
Expand All @@ -102,7 +101,6 @@ impl Lua {
pub fn empty() -> Self {
Lua {
arena: Arena::<Rootable![State<'_>]>::new(|mc| State::new(mc)),
finalized: false,
}
}

Expand Down Expand Up @@ -156,15 +154,17 @@ impl Lua {

/// Finish the current collection cycle completely, calls `gc_arena::Arena::collect_all()`.
pub fn gc_collect(&mut self) {
if !self.finalized {
if self.arena.collection_phase() != CollectionPhase::Collecting {
self.arena.mark_all().unwrap().finalize(|fc, root| {
root.finalizers.prepare(fc);
});
self.arena.mark_all().unwrap().finalize(|fc, root| {
root.finalizers.finalize(fc);
});
}

self.arena.collect_all();
assert!(self.arena.collection_phase() == CollectionPhase::Sleeping);
self.finalized = false;
}

pub fn gc_metrics(&self) -> &Metrics {
Expand All @@ -190,20 +190,18 @@ impl Lua {

let r = self.arena.mutate(move |mc, state| f(state.ctx(mc)));
if self.arena.metrics().allocation_debt() > COLLECTOR_GRANULARITY {
if self.finalized {
if self.arena.collection_phase() == CollectionPhase::Collecting {
self.arena.collect_debt();

if self.arena.collection_phase() == CollectionPhase::Sleeping {
self.finalized = false;
}
} else {
if let Some(marked) = self.arena.mark_debt() {
marked.finalize(|fc, root| {
root.finalizers.prepare(fc);
});
self.arena.mark_all().unwrap().finalize(|fc, root| {
root.finalizers.finalize(fc);
});
// Immediately transition to `CollectionPhase::Collecting`.
self.arena.mark_all().unwrap().start_collecting();
self.finalized = true;
}
}
}
Expand Down
54 changes: 51 additions & 3 deletions src/thread/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ use std::{
};

use allocator_api2::vec;
use gc_arena::{allocator_api::MetricsAlloc, lock::RefLock, Collect, Gc, GcWeak, Mutation};
use gc_arena::{
allocator_api::MetricsAlloc, lock::RefLock, Collect, Finalization, Gc, GcWeak, Mutation,
};
use thiserror::Error;

use crate::{
closure::{UpValue, UpValueState},
meta_ops,
types::{RegisterIndex, VarCount},
BoxSequence, Callback, Closure, Context, Error, FromMultiValue, Fuel, Function, IntoMultiValue,
TypeError, VMError, Value,
String, Table, TypeError, UserData, VMError, Value,
};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -179,6 +181,27 @@ impl<'gc> Thread<'gc> {
}
}

/// For each open upvalue pointing to this thread, if the upvalue itself is live, then resurrect
/// the actual value that it is pointing to.
///
/// Because open upvalues keep a *weak* pointer to their parent thread, their target values will
/// not be properly marked as live until until they are manually marked with this method.
pub(crate) fn resurrect_live_upvalues(
self,
fc: &Finalization<'gc>,
) -> Result<(), BadThreadMode> {
// If this thread is not dead, then none of the held stack values can be dead, so we don't
// need to resurrect them.
if Gc::is_dead(fc, self.0) {
let state = self.0.try_borrow().map_err(|_| BadThreadMode {
found: ThreadMode::Running,
expected: None,
})?;
state.resurrect_live_upvalues(fc);
}
Ok(())
}

fn check_mode(
&self,
mc: &Mutation<'gc>,
Expand Down Expand Up @@ -447,7 +470,7 @@ impl<'gc> ThreadState<'gc> {
for &upval in &self.open_upvalues[start..] {
match upval.get() {
UpValueState::Open(open_upvalue) => {
assert!(open_upvalue.thread.upgrade(mc).unwrap().as_ptr() == this_ptr);
debug_assert!(open_upvalue.thread.upgrade(mc).unwrap().as_ptr() == this_ptr);
upval.set(
mc,
UpValueState::Closed(self.stack[open_upvalue.stack_index]),
Expand All @@ -460,6 +483,31 @@ impl<'gc> ThreadState<'gc> {
self.open_upvalues.truncate(start);
}

fn resurrect_live_upvalues(&self, fc: &Finalization<'gc>) {
for &upval in &self.open_upvalues {
if !Gc::is_dead(fc, UpValue::into_inner(upval)) {
match upval.get() {
UpValueState::Open(open_upvalue) => {
match self.stack[open_upvalue.stack_index] {
Value::String(s) => Gc::resurrect(fc, String::into_inner(s)),
Value::Table(t) => Gc::resurrect(fc, Table::into_inner(t)),
Value::Function(Function::Closure(c)) => {
Gc::resurrect(fc, Closure::into_inner(c))
}
Value::Function(Function::Callback(c)) => {
Gc::resurrect(fc, Callback::into_inner(c))
}
Value::Thread(t) => Gc::resurrect(fc, Thread::into_inner(t)),
Value::UserData(u) => Gc::resurrect(fc, UserData::into_inner(u)),
_ => {}
}
}
UpValueState::Closed(_) => panic!("upvalue is not open"),
}
}
}
}

fn reset(&mut self, mc: &Mutation<'gc>) {
self.close_upvalues(mc, 0);
assert!(self.open_upvalues.is_empty());
Expand Down
1 change: 0 additions & 1 deletion tests/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ fn test_table_iter() {
assert!(matches!(pairs[5], (Value::String(s), Value::Integer(3)) if s == "3" ));

for (k, _) in table.iter() {
dbg!(k);
table.set(ctx, k, Value::Nil).unwrap();
}

Expand Down
42 changes: 42 additions & 0 deletions tests/weak_threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,49 @@ fn weak_threads_close() -> Result<(), StaticError> {
lua.gc_collect();
let executor = lua.try_enter(|ctx| {
let closure = Closure::load(ctx, None, format!("assert(closure() == {i})").as_bytes())?;
Ok(ctx.stash(Executor::start(ctx, closure.into(), ())))
})?;
lua.execute::<()>(&executor)?;
}

Ok(())
}

#[test]
fn live_upvalues_not_dead() -> Result<(), StaticError> {
let mut lua = Lua::core();

let executor = lua.try_enter(|ctx| {
let closure = Closure::load(
ctx,
None,
&br#"
local co = coroutine.create(function()
local thread = coroutine.create(function()
local i = 1
while true do
coroutine.yield(i)
i = i + 1
end
end)
coroutine.yield(function()
return coroutine.continue(thread)
end)
end)
_, go = coroutine.resume(co)
"#[..],
)?;

Ok(ctx.stash(Executor::start(ctx, closure.into(), ())))
})?;
lua.execute::<()>(&executor)?;

for i in 1..4 {
lua.gc_collect();
let executor = lua.try_enter(|ctx| {
let closure = Closure::load(ctx, None, format!("assert(go() == {i})").as_bytes())?;
Ok(ctx.stash(Executor::start(ctx, closure.into(), ())))
})?;
lua.execute::<()>(&executor)?;
Expand Down

0 comments on commit b079a68

Please # to comment.