Hogyan küzdhetünk meg a Rust élettartamok mitikus világával
Aszinkron Rust
A megszokott szinkron kódban ha meghívunk egy függvényt vagy metódust, akkor mindig meg is várjuk hogy az elvégezze a feladatát és visszaadja az eredményt. Pl. egy lassú I/O vagy hálózati műveletnél másodpercekre, percekre blokkolódhat a program futása. Egy egyszálú programban ez azt jelenti hogy amíg a válaszra várunk, semmi más feladatot nem tudunk folytatni. Az első megoldás erre a problémára a többszálúság, multithreading volt. Ezt lehetett úgy használni hogy a lassú I/O műveleteket külön szálakra szerveztük ki. Ez a megoldás addig működik jól amíg nincs nagyságrendekkel több szál mint ahány valós processzormag. Ha túl sok a szál, akkor nagyon sok idő megy el a szálak létrehozására és a szálak közötti környezetváltásokra (context switching).
Erre a problémára találták ki az aszinkron programozást. Egy aszinkron környezetben amikor elindítunk egy feladatot (Task) nem várjuk meg hogy az befejezze a működését, csak egy ígéretet (Future) kapunk vissza hogy a feladat egyszer majd visszatér valamilyen értékkel. A feladatot ezek után átadjuk egy futtató környezetnek (async runtime), ami addig futtatja azt, amíg nem blokkolódik valamilyen műveleten (lehet ez I/O, hálózati kérés, Mutex zárolására várakozás, stb.). Ilyenkor a runtime félreteszi a feladatot és futtatja a következőt, szintén addig amíg nem blokkolódik. Időről időre rákérdez (polling) azokra a feladatokra is amiket félretett, hogy tudják-e már folytatni a munkájukat. Ha tudják, akkor újra futtatni kezdi őket a következő blokkolásig vagy addig amíg véget nem érnek.
Az aszinkron runtime futhat egyetlen szálon (ilyen pl. a node.js) vagy párhuzamosan több szálon is egyszerre (ilyen pl. a Go és a Rust). A Go és Rust többszálú módban jellemzően annyi szálat indít el ahány virtuális processzormag van a futtató gépen, de ez igény szerint hangolható. Lehetnek eltérések abban is, hogy a feladatok önként adják vissza a vezérlést a runtime-nak mielőtt blokkolódnának (ez a cooperative megoldás) vagy a runtime dönt úgy hogy időnként megszakítja a feladat futását és másikat kezd futtatni (ez a preemptive megoldás). Az operációs rendszer pl. preemptive módon futtatja a szálakat, bármikor megszakíthat egy szálat hogy egy másiknak adja a processzoridőt. A Rust aszinkron működése viszont cooperative, a feladatok önként adják vissza vezérlést a runtime-nak amikor várakozásra kényszerülnek. Ebből következik hogy a Rust aszinkron taszkjai nem jelentenek jó megoldást arra az esetre ha hosszú ideig futó, elsősorban CPU igényes feladatokat kell párhuzamosítani. Erre az esetre az operációs rendszer szálai jobbak.
Tipikusan a webszerverek, webes alkalmazásszerverek egy olyan terület, ahol nagyon sok párhuzamos hálózati művelet történik egy időben (akár több tízezer), ezek tudnak profitálni az aszinkron működésből.
A Rust aszinkron feladatainak példakódja így néz ki (ez egy egyszerűsített példa, a valós működés ennél kicsit komplikáltabb):
trait SimpleFuture {
type Output;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
A SimpleFuture
definiál egy társított típust (Output) ami a feladat visszatérési értéke
lesz, és egy poll
metódust amivel a futtató környezet "rá tud kérdezni" hogy futtatható-e
a feladat. A poll
hatására a feladat addig fut, amíg az nem blokkolódik vagy be nem fejezi
a futását. A poll
visszatérési értéke a Poll<T>
enum, aminek két ága a Ready(T)
és a Pending
. A Ready(T)
azt jelenti hogy a feladat futása véget ért és visszaadja
a visszatérési értéket (a T típus itt a SimpleFuture Output típusával azonos). A Pending
azt jelenti hogy a feladat blokkolódott és visszaadja a vezérlést a runtime-nak.
A wake: fn()
paraméter egy callback, ami arra szolgál hogy a feladat értesíteni tudja
a runtime-ot ha újra futtathatóvá válik. Egy részlet a példakódból:
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// van adat, visszaadjuk
Poll::Ready(self.socket.read_buf())
} else {
// várni kell, a socket callback-jének a wake függvényt állítjuk be
self.socket.set_readable_callback(wake);
// és visszaadjuk a vezérlést a runtime-nak
Poll::Pending
}
}
}
Ebben a példában a socket
-nek továbbítjuk a wake
callback-et, amit a socket
akkor hív meg, ha érkezik adat.
A Rust-ban kétféle módon lehet aszinkron kódot létrehozni: aszinkron függvény/metódus vagy aszinkron blokk formájában:
async fn foo() -> u8 { 5 }
fn bar() -> impl Future<Output = u8> {
// This `async` block results in a type that implements
// `Future<Output = u8>`.
async {
let x: u8 = foo().await;
x + 5
}
}
Az async
kulcsszó lényegében azt jelenti hogy a függvény vagy blokk nem egy direkt
visszatérési értéket (u8
) hanem egy Future
-t fog visszaadni: Future<Output = u8>
ami majd idővel egy u8
értéket fog eredményezni. Az await
meghívásával indul el
a feladat futtatása.
A fenti egyszerű példában persze valójában semmi aszinkronitás nincs, mert a függvény
soha nem blokkolódik, rögtön az első await
hívás vissza fogja adni a visszatérési értéket.
Aszinkron környezetben az élettartamok úgy alakulnak, hogy a Future
élettartama
függ a bemeneti paraméterek élettartamától. Egy példa:
async fn foo(x: &u8) -> u8 { *x }
// Is equivalent to this function:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
async move { *x }
}
Itt a Future
élettartama függ a bemeneti paraméterként kapott x
referencia
élettartamától.
Ez azt jelenti hogy az await
-et meg kell hívni mielőtt a bemeneti paraméterek
élettartama végetér. Mi az az async move
? Hasonló a closure esetén látott move
kulcsszóhoz: az x
változó tulajdonjogát átadja az aszinkron Future
-nek, enélkül
ugyanis nem élne annyi ideig mint maga visszaadott a Future
. Fontos hogy a move
csak az x
lokális változó tulajdonjogát adja át (amiben egy referencia van),
nem pedig a referencia által hivatkozott u8
érték tulajdonjogát!
Egy teljes példa (ez már futtatható pl. a rust playground-on):
use std::future::Future;
async fn borrow_x(x: &str) {
println!("value: {}", x);
}
/*
fn bad() -> impl Future<Output = ()> {
let x = String::from("valami");
async {
borrow_x(&x).await // ERROR: `x` does not live long enough
}
}
*/
fn good() -> impl Future<Output = ()> {
async {
let x = String::from("valami");
borrow_x(&x).await
}
}
#[tokio::main]
async fn main() {
// bad().await;
good().await;
}
Itt a bad
függvény azért nem jó, mert deklarál egy lokális változót, aminek
az élettartama végetér a bad
függvény végén, a visszadott Future
viszont túléli
a függvényt és egy már nem létező lokális változóra hivatkozna amikor meghívjuk
az await
-et. A megoldást az async move
jelentené. A good
függvényben viszont
eleve az async
blokkon belül deklaráljuk a lokális változót, így az már együtt
él a visszaadott Future
-rel.
Mi az a #[tokio::main]
? Egy deklaratív makró, ami egy async runtime-ot indít el,
így a main()
függvény már lehet async, az elindított runtime fogja futtatni.
A Rust furcsasága, hogy nincs egyetlen "standard" async runtime, több különböző
implementáció létezik. Webes alkalmazási területen talán a tokio.rs
a legelterjedtebb, de van még néhány (async-std, smol). Ebből azért akadnak
kompatibilitási problémák is, egy tokio
-ra megírt library nem feltétlenül fog
működni async-std
-vel és fordítva (a legtöbb library azért igyekszik minden elterjedt
runtime-ot támogatni).
Még egy fontos dolog az aszinkron kódokkal kapcsolatban: többszálú futtató
környezet esetén a taszk lényegében véletlenszerűen fog különböző szálakhoz
kerülni, ha blokkolódik akkor az újraindítás után könnyen lehet hogy már másik
szálon fog futni mint a blokkolódás előtt. Ilyenkor a taszk által hivatkozott adatokat
át kell küldeni (Send) egy másik szálnak. Ennek érdekében a Rust elvárja hogy ha
egy aszinkron taszk működése megszakad (Pending értékkel tér vissza a poll), akkor
a taszk aktuális állapota ne tartalmazzon hivatkozást olyan adatra, ami nem küldhető
át biztonságosan szálak között (vagyis nem implementálja a Send
trait-et).
Egy példa:
use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn bar() {}
async fn foo() {
let x = NotSend::default();
bar().await;
}
fn require_send(_: impl Send) {}
fn main() {
require_send(foo());
}
Az Rc
referenciaszámlálós típusról már említettem hogy nem thread-safe, vagyis nem
implementálja a Send
trait-et. Amikor a fenti példában a bar().await
-et meghívjuk,
a környezetnek még élő hivatkozása van a nem thread-safe x
változóra, ezért a Rust
hibát fog dobni.
Ilyenkor két lehetőségünk van: az egyik hogy még azelőtt elengedjük az x
változót,
mielőtt az await
-et meghívjuk:
async fn foo() {
{
let x = NotSend::default();
}
bar().await;
}
A másik lehetőség hogy az Rc
helyett a thread-safe Arc
-t használjuk, ami
implementálja a Send
trait-et.