A Rust borrow checker működése, 4.

Posted 2024-01-04 08:00:00 by sanyi ‐ 7 min read

Hogyan küzdhetünk meg a Rust élettartamok mitikus világával

Aszinkron Rust

A megszokott szinkron kódban ha meghívunk egy függvényt vagy metódust, akkor mindig meg is várjuk hogy az elvégezze a feladatát és visszaadja az eredményt. Pl. egy lassú I/O vagy hálózati műveletnél másodpercekre, percekre blokkolódhat a program futása. Egy egyszálú programban ez azt jelenti hogy amíg a válaszra várunk, semmi más feladatot nem tudunk folytatni. Az első megoldás erre a problémára a többszálúság, multithreading volt. Ezt lehetett úgy használni hogy a lassú I/O műveleteket külön szálakra szerveztük ki. Ez a megoldás addig működik jól amíg nincs nagyságrendekkel több szál mint ahány valós processzormag. Ha túl sok a szál, akkor nagyon sok idő megy el a szálak létrehozására és a szálak közötti környezetváltásokra (context switching).

Erre a problémára találták ki az aszinkron programozást. Egy aszinkron környezetben amikor elindítunk egy feladatot (Task) nem várjuk meg hogy az befejezze a működését, csak egy ígéretet (Future) kapunk vissza hogy a feladat egyszer majd visszatér valamilyen értékkel. A feladatot ezek után átadjuk egy futtató környezetnek (async runtime), ami addig futtatja azt, amíg nem blokkolódik valamilyen műveleten (lehet ez I/O, hálózati kérés, Mutex zárolására várakozás, stb.). Ilyenkor a runtime félreteszi a feladatot és futtatja a következőt, szintén addig amíg nem blokkolódik. Időről időre rákérdez (polling) azokra a feladatokra is amiket félretett, hogy tudják-e már folytatni a munkájukat. Ha tudják, akkor újra futtatni kezdi őket a következő blokkolásig vagy addig amíg véget nem érnek.

Az aszinkron runtime futhat egyetlen szálon (ilyen pl. a node.js) vagy párhuzamosan több szálon is egyszerre (ilyen pl. a Go és a Rust). A Go és Rust többszálú módban jellemzően annyi szálat indít el ahány virtuális processzormag van a futtató gépen, de ez igény szerint hangolható. Lehetnek eltérések abban is, hogy a feladatok önként adják vissza a vezérlést a runtime-nak mielőtt blokkolódnának (ez a cooperative megoldás) vagy a runtime dönt úgy hogy időnként megszakítja a feladat futását és másikat kezd futtatni (ez a preemptive megoldás). Az operációs rendszer pl. preemptive módon futtatja a szálakat, bármikor megszakíthat egy szálat hogy egy másiknak adja a processzoridőt. A Rust aszinkron működése viszont cooperative, a feladatok önként adják vissza vezérlést a runtime-nak amikor várakozásra kényszerülnek. Ebből következik hogy a Rust aszinkron taszkjai nem jelentenek jó megoldást arra az esetre ha hosszú ideig futó, elsősorban CPU igényes feladatokat kell párhuzamosítani. Erre az esetre az operációs rendszer szálai jobbak.

Tipikusan a webszerverek, webes alkalmazásszerverek egy olyan terület, ahol nagyon sok párhuzamos hálózati művelet történik egy időben (akár több tízezer), ezek tudnak profitálni az aszinkron működésből.

A Rust aszinkron feladatainak példakódja így néz ki (ez egy egyszerűsített példa, a valós működés ennél kicsit komplikáltabb):

trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}

A SimpleFuture definiál egy társított típust (Output) ami a feladat visszatérési értéke lesz, és egy poll metódust amivel a futtató környezet "rá tud kérdezni" hogy futtatható-e a feladat. A poll hatására a feladat addig fut, amíg az nem blokkolódik vagy be nem fejezi a futását. A poll visszatérési értéke a Poll<T> enum, aminek két ága a Ready(T) és a Pending. A Ready(T) azt jelenti hogy a feladat futása véget ért és visszaadja a visszatérési értéket (a T típus itt a SimpleFuture Output típusával azonos). A Pending azt jelenti hogy a feladat blokkolódott és visszaadja a vezérlést a runtime-nak. A wake: fn() paraméter egy callback, ami arra szolgál hogy a feladat értesíteni tudja a runtime-ot ha újra futtathatóvá válik. Egy részlet a példakódból:

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // van adat, visszaadjuk
            Poll::Ready(self.socket.read_buf())
        } else {
            // várni kell, a socket callback-jének a wake függvényt állítjuk be
            self.socket.set_readable_callback(wake);
            // és visszaadjuk a vezérlést a runtime-nak
            Poll::Pending
        }
    }
}

Ebben a példában a socket-nek továbbítjuk a wake callback-et, amit a socket akkor hív meg, ha érkezik adat.

A Rust-ban kétféle módon lehet aszinkron kódot létrehozni: aszinkron függvény/metódus vagy aszinkron blokk formájában:

async fn foo() -> u8 { 5 }

fn bar() -> impl Future<Output = u8> {
    // This `async` block results in a type that implements
    // `Future<Output = u8>`.
    async {
        let x: u8 = foo().await;
        x + 5
    }
}

Az async kulcsszó lényegében azt jelenti hogy a függvény vagy blokk nem egy direkt visszatérési értéket (u8) hanem egy Future-t fog visszaadni: Future<Output = u8> ami majd idővel egy u8 értéket fog eredményezni. Az await meghívásával indul el a feladat futtatása.

A fenti egyszerű példában persze valójában semmi aszinkronitás nincs, mert a függvény soha nem blokkolódik, rögtön az első await hívás vissza fogja adni a visszatérési értéket.

Aszinkron környezetben az élettartamok úgy alakulnak, hogy a Future élettartama függ a bemeneti paraméterek élettartamától. Egy példa:

async fn foo(x: &u8) -> u8 { *x }

// Is equivalent to this function:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
    async move { *x }
}

Itt a Future élettartama függ a bemeneti paraméterként kapott x referencia élettartamától.

Ez azt jelenti hogy az await-et meg kell hívni mielőtt a bemeneti paraméterek élettartama végetér. Mi az az async move? Hasonló a closure esetén látott move kulcsszóhoz: az x változó tulajdonjogát átadja az aszinkron Future-nek, enélkül ugyanis nem élne annyi ideig mint maga visszaadott a Future. Fontos hogy a move csak az x lokális változó tulajdonjogát adja át (amiben egy referencia van), nem pedig a referencia által hivatkozott u8 érték tulajdonjogát!

Egy teljes példa (ez már futtatható pl. a rust playground-on):

use std::future::Future;

async fn borrow_x(x: &str) {
    println!("value: {}", x);
}

/*
fn bad() -> impl Future<Output = ()> {
    let x = String::from("valami");
    async {
        borrow_x(&x).await // ERROR: `x` does not live long enough
    }
}
*/

fn good() -> impl Future<Output = ()> {
    async {
        let x = String::from("valami");
        borrow_x(&x).await
    }
}

#[tokio::main]
async fn main() {
    // bad().await;
    good().await;
}

Itt a bad függvény azért nem jó, mert deklarál egy lokális változót, aminek az élettartama végetér a bad függvény végén, a visszadott Future viszont túléli a függvényt és egy már nem létező lokális változóra hivatkozna amikor meghívjuk az await-et. A megoldást az async move jelentené. A good függvényben viszont eleve az async blokkon belül deklaráljuk a lokális változót, így az már együtt él a visszaadott Future-rel.

Mi az a #[tokio::main]? Egy deklaratív makró, ami egy async runtime-ot indít el, így a main() függvény már lehet async, az elindított runtime fogja futtatni.

A Rust furcsasága, hogy nincs egyetlen "standard" async runtime, több különböző implementáció létezik. Webes alkalmazási területen talán a tokio.rs a legelterjedtebb, de van még néhány (async-std, smol). Ebből azért akadnak kompatibilitási problémák is, egy tokio-ra megírt library nem feltétlenül fog működni async-std-vel és fordítva (a legtöbb library azért igyekszik minden elterjedt runtime-ot támogatni).

Még egy fontos dolog az aszinkron kódokkal kapcsolatban: többszálú futtató környezet esetén a taszk lényegében véletlenszerűen fog különböző szálakhoz kerülni, ha blokkolódik akkor az újraindítás után könnyen lehet hogy már másik szálon fog futni mint a blokkolódás előtt. Ilyenkor a taszk által hivatkozott adatokat át kell küldeni (Send) egy másik szálnak. Ennek érdekében a Rust elvárja hogy ha egy aszinkron taszk működése megszakad (Pending értékkel tér vissza a poll), akkor a taszk aktuális állapota ne tartalmazzon hivatkozást olyan adatra, ami nem küldhető át biztonságosan szálak között (vagyis nem implementálja a Send trait-et).

Egy példa:

use std::rc::Rc;

#[derive(Default)]
struct NotSend(Rc<()>);

async fn bar() {}
async fn foo() {
    let x = NotSend::default();
    bar().await;
}

fn require_send(_: impl Send) {}

fn main() {
    require_send(foo());
}

Az Rc referenciaszámlálós típusról már említettem hogy nem thread-safe, vagyis nem implementálja a Send trait-et. Amikor a fenti példában a bar().await -et meghívjuk, a környezetnek még élő hivatkozása van a nem thread-safe x változóra, ezért a Rust hibát fog dobni.

Ilyenkor két lehetőségünk van: az egyik hogy még azelőtt elengedjük az x változót, mielőtt az await-et meghívjuk:

async fn foo() {
    {
        let x = NotSend::default();
    }
    bar().await;
}

A másik lehetőség hogy az Rc helyett a thread-safe Arc-t használjuk, ami implementálja a Send trait-et.

Címkék:
rust borrow-checker async