Мьютексе в мире асинхронного кода

failed guard


Фото: James P. Blair/National Geographic Creative


Вы когда-нибудь сталкивались со следующей проблемой в rust, когда использовали std::sync::Mutex в асинхронном коде?


 7  |     tokio::spawn(/* some future here */);
    |     ^^^^^^^^^^^^ future returned by `fut` is not `Send`
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |

Как указывает ошибка, это происходит из-за нереализованного типажа Send для переданной футуры. В мире async-await это происходит из-за использования !Send объекта между вызовами await (далее await-точки). В частности, один из подобных интересных случаев — это блокировака мьютекса из стандартной библиотеки.


Как только Mutex заблокирован, можно работать с вложенным объектом с помощью обертки MutexGuard. Это тот самый тип, который реализует !Send и является причиной указанной ошибки. Мотивация в реализации этого типажа прямо следует из спецификаций соответствующих вызовов операционных систем (к примеру pthread_mutex_unlock и ReleaseMutex).


Давайте попробуем решить эту ошибку со следующим (неработающим) примером:


use std::sync::Mutex;

#[tokio::main]
async fn main() {
    let counter = Mutex::new(0);

    tokio::spawn(async move {
        let _guard = counter.lock().unwrap();
        async {}.await;
    });
}

Этот достаточно упрощенный пример (мьютекс без Arc в большинстве случаев бесполезен) и еще, вдобавок, переусложнен (к примеру вместо числового типа за мьютексом может быть использован атомарный тип), но для иллюстрации сгодится. Далее примеры будут использовать tokio 2.0, по большей части из-за того что я просто привык к нему. Тем не менее, очень похожий код можно получить для futures 3.0, async-std, smol или возможно другого асинхронного рантайма, если не указано иначе.


Закрепление задачи за потоком


Поскольку Send — требование для многопоточности, один из способов убрать ошибку компиляции — это закрепить задачу (прим.пер: task) за одним потоком, либо же воспользоваться специализированным однопоточным асинхронным рантаймом, к примеру futures::executor::LocalPool. В таком случае экзекьютор будет выполнять задачу только на том же самом потоке, где она была порождена. Тогда код будет выглядить следующим образом:


// ...

let local = tokio::task::LocalSet::new();
local.spawn_local(async move {
        let _guard = counter.lock().unwrap();
        async {}.await;
    });

// Need to be explicitly awaited unlike `tokio::spawn`
local.await;

Этот код, впрочем, имеет семантические различия от от исходного. Если текущий поток перегружен, он не может выгрузить задачи на другие потоки, в крайних случаях это может привести к голоданию ядер процессора. Более того, это скорее обходной путь, нежели полноценное решение, которое вдобавок может привести к куда более серьезным проблемам. Как вы возможно знаете, rust не защищает от взаимных блокировок, и это один и из тех способов, где их очень легко получить.


Допустим другая задача также работает с тем же мьютексом и await-точка является ожиданием этой задачи. После переключение конектста, вторая задача будет заблокирована, что и приведет к состоянию взаимной блокировки. Так что в общем случае использование общего состояния вместе с !Send-порождением задач — сомнительный подход в большинстве случаев. В однопоточном случае как правило достаточно RefCell, в ином случае же придется воспользоваться другим решением.


Ограничение времени жизни


Другая уловка, нежели общее решение, является подход с пересмотром времени жизни MutexGuard. В примере, вызов drop происходил в конце области видимости, т.е. после await. Поскольку переключение между задачами может происходить только в await-точках, все еще возможно использовать !Send объекты в этих рамках. Конкретно для этого примера, будет работать ограничение жизни для MutexGuard прямо перед вызовом асинхронной функции:


// ...

tokio::spawn(async move {
    // For one-liner it looks better with omitted variable declaration.
    let guard = counter.lock().unwrap();
    // ..
    drop(guard);

    async {}.await;
});

На практике, указанный подход работает для многих случаев, но тем не менее не для всех. К примеру, критическая секция может быть использована для нескольких операций для обеспечения транзакционности. В таком случае операционная система не может управлять мьютексом, а значит быть использован из стандартной библиотеке, вместо этого он должен быть реализован на основе среды выполнения асинхронного экзекьютора.


Асинхронный мьютекс


Последнее, и наконец общее, решение — это асинхронный мьютекс. Хочу отметить, что smol не представляет данного примитива, но он может быть реализован самостоятельно либо использован через сторонний крейт (например async_mutex). Вот полный пример с асинхронным мьютексом из токио:


use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    let counter = Mutex::new(0);

    tokio::spawn(async move {
        let _guard = counter.lock().await;
        async {}.await;
    });
}

Как можно заметить, помимо выделения await части из метода lock(), убран unwrap. Для стандартного мьютекса блокировка возвращает Result, который несет информацию о возникшей панике на предыдущих блокировках мьютекса в других потоках. В асинхронном мьютексе блокировка — это футура, разрешающаяся в тип аналогичный MutexGuard. А значит в асинхронном случае информация о панике теряется, просто закрывая критическую секцию при раскрутке стэка. С одной стороны это достаточно удобно, поскольку не возникает взаимных блокировок, с другой достаточно просто получить неконсистентное состояние синхронизируемого объекта.


Другая проблема с асинхронным мьютексом — это поведение при взаимных блокировках. Как я уже упоминал, rust не защищает от них и само собой асинхронный подход не исключение. Однако достаточно важный момент, что отладка в последнем случае существенно сложнее. Взаимная блокировка асинхронного мьютекса блокирует задачу, а не поток. Так что при адекватной нагрузке на процессор можно не заметить заблокированные задачи. При взаимной блокировке в случае с потоками можно заметить, что процесс простаивает, и при подключении отладчиком можно увидеть в трассировке стека блокирующий системный вызов. В случае с асинхронным мьютексом, состояние блокировки хранится во внутренней структуре экзекьютора (или другого крейта) и отследить это состояние уже существенно сложнее.


Выводы


При выборе мьютекса в асинхронном коде необходимо определится с компромисами того или иного подхода. В первую очередь, стоит убедиться в необходимости мьютекса и многопоточного экзекьютора. После этого решение достаточно простое: либо использовать мьютекс из стандартной библиотеки, если он не используется между await-точками, иначе в лучше использовать асинхронную версию примитива. Последнее утверждение вероятно не всегда истинно, для места критичного к производительности имеет смысл написать релевантный бенчмарк.

Источник: habr.ru