Многонишково програмиране

30 ноември 2020

Административни неща

Административни неща

Fearless concurrency

Must be this tall

Нишки

1 2 3 4 5 6 7
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
}
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
}

Нишки

1 2 3 4 5 6 7
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
}
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
}

Нишки

1 2 3 4 5 6 7
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
}
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
}

Нишки

1 2 3 4 5 6 7
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
}
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
}

Нишки

1 2 3 4 5 6 7 8
use std::thread;

fn main() {
    let handle = thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
    let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread;

fn main() {
    let handle = thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
    let _ = handle.join();
}

Нишки

1 2 3 4 5 6 7 8
use std::thread;

fn main() {
    let handle = thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
    let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread;

fn main() {
    let handle = thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
    let _ = handle.join();
}

Нишки

1 2 3 4 5 6 7 8
use std::thread;

fn main() {
    let handle = thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
    let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread;

fn main() {
    let handle = thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
    let _ = handle.join();
}

Нишки

1 2 3 4 5 6 7 8
use std::thread;

fn main() {
    let handle = thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
    let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread;

fn main() {
    let handle = thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
    let _ = handle.join();
}

Нишки

1 2 3 4 5 6 7 8
use std::thread;

fn main() {
    let handle = thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
    let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread;

fn main() {
    let handle = thread::spawn(|| println!("hi from spawned thread"));

    println!("hi from main thread");
    let _ = handle.join();
}

Panic

Panic

Panic

Panic

Panic

Panic

Споделяне на стойности

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread;

fn main() {
    let nums = (0..5).collect::<Vec<_>>();

    let handle = thread::spawn(|| {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}

Споделяне на стойности

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread;

fn main() {
    let nums = (0..5).collect::<Vec<_>>();

    let handle = thread::spawn(|| {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}
error[E0373]: closure may outlive the current function, but it borrows `nums`, which is owned by the current function --> src/bin/main_70d75afc36316cf9348e1b9a1b43baada82fd5b2.rs:6:32 | 6 | let handle = thread::spawn(|| { | ^^ may outlive borrowed value `nums` 7 | for i in &nums { | ---- `nums` is borrowed here | note: function requires argument type to outlive `'static` --> src/bin/main_70d75afc36316cf9348e1b9a1b43baada82fd5b2.rs:6:18 | 6 | let handle = thread::spawn(|| { | __________________^ 7 | | for i in &nums { 8 | | println!("number {}", i); 9 | | } 10 | | }); | |______^ help: to force the closure to take ownership of `nums` (and any other referenced variables), use the `move` keyword | 6 | let handle = thread::spawn(move || { | ^^^^^^^
use std::thread;

fn main() {
    let nums = (0..5).collect::>();

    let handle = thread::spawn(|| {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}

Споделяне на стойности

Споделяне на стойности

Споделяне на стойности

1 2 3 4
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
    F: FnOnce() -> T,
    F: Send + 'static,
    T: Send + 'static

Споделяне на стойности

Можем да преместим стойността в новата нишка

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread;

fn main() {
    let nums = (0..5).collect::<Vec<_>>();

    let handle = thread::spawn(move || {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}
number 0 number 1 number 2 number 3 number 4
use std::thread;

fn main() {
    let nums = (0..5).collect::>();

    let handle = thread::spawn(move || {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}

Споделяне между няколко нишки

Как бихме споделили стойност между няколко нишки?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
use std::thread;

fn main() {
    let nums = (0..5).collect::<Vec<_>>();

    let handles = (0..2).map(|_| {
            thread::spawn(move || {
                for i in &nums {
                    println!("number {}", i);
                }
            })
        })
        .collect::<Vec<_>>();

    for h in handles {
        let _ = h.join();
    }
}

Споделяне между няколко нишки

Как бихме споделили стойност между няколко нишки?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
use std::thread;

fn main() {
    let nums = (0..5).collect::<Vec<_>>();

    let handles = (0..2).map(|_| {
            thread::spawn(move || {
                for i in &nums {
                    println!("number {}", i);
                }
            })
        })
        .collect::<Vec<_>>();

    for h in handles {
        let _ = h.join();
    }
}

Споделяне между няколко нишки

Как бихме споделили стойност между няколко нишки?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
use std::thread;

fn main() {
    let nums = (0..5).collect::<Vec<_>>();

    let handles = (0..2).map(|_| {
            thread::spawn(move || {
                for i in &nums {
                    println!("number {}", i);
                }
            })
        })
        .collect::<Vec<_>>();

    for h in handles {
        let _ = h.join();
    }
}

Споделяне между няколко нишки

Можем да пробваме с Rc

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
use std::rc::Rc;
use std::thread;

fn main() {
    let nums_vec = (0..5).collect::<Vec<_>>();
    let nums = Rc::new(nums_vec);

    let handles = (0..2)
        .map(|_| {
            let nums = Rc::clone(&nums);

            thread::spawn(move || {
                for i in &*nums {
                    println!("number {}", i);
                }
            })
        })
        .collect::<Vec<_>>();

    for h in handles {
        let _ = h.join();
    }
}

Споделяне между няколко нишки

Можем да пробваме с Rc

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
use std::rc::Rc;
use std::thread;

fn main() {
    let nums_vec = (0..5).collect::<Vec<_>>();
    let nums = Rc::new(nums_vec);

    let handles = (0..2)
        .map(|_| {
            let nums = Rc::clone(&nums);

            thread::spawn(move || {
                for i in &*nums {
                    println!("number {}", i);
                }
            })
        })
        .collect::<Vec<_>>();

    for h in handles {
        let _ = h.join();
    }
}
error[E0277]: `Rc<Vec<i32>>` cannot be sent between threads safely --> src/bin/main_a64edfb8be3b487ac4411ab31257712271d4ea64.rs:12:13 | 12 | thread::spawn(move || { | _____________^^^^^^^^^^^^^_- | | | | | `Rc<Vec<i32>>` cannot be sent between threads safely 13 | | for i in &*nums { 14 | | println!("number {}", i); 15 | | } 16 | | }) | |_____________- within this `[closure@src/bin/main_a64edfb8be3b487ac4411ab31257712271d4ea64.rs:12:27: 16:14]` | ::: /home/andrew/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:607:8 | 607 | F: Send + 'static, | ---- required by this bound in `spawn` | = help: within `[closure@src/bin/main_a64edfb8be3b487ac4411ab31257712271d4ea64.rs:12:27: 16:14]`, the trait `Send` is not implemented for `Rc<Vec<i32>>` = note: required because it appears within the type `[closure@src/bin/main_a64edfb8be3b487ac4411ab31257712271d4ea64.rs:12:27: 16:14]`
use std::rc::Rc;
use std::thread;

fn main() {
    let nums_vec = (0..5).collect::>();
    let nums = Rc::new(nums_vec);

    let handles = (0..2)
        .map(|_| {
            let nums = Rc::clone(&nums);

            thread::spawn(move || {
                for i in &*nums {
                    println!("number {}", i);
                }
            })
        })
        .collect::>();

    for h in handles {
        let _ = h.join();
    }
}

Send и Sync

1 2 3 4
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
    F: FnOnce() -> T,
    F: Send + 'static,
    T: Send + 'static

Send и Sync

Send и Sync

Send и Sync

Send и Sync

Send и Sync

Send и Sync

Send и Sync

Send и Sync

Send

Send и Sync

Send

Send и Sync

Send

Send и Sync

Send

Send и Sync

Send

Send и Sync

Send

Send + Sync

Sync

Send + Sync

Sync

Send + Sync

Sync

Send + Sync

Sync

Send + Sync

Sync

Send + Sync

Sync

Send + Sync

Sync

Send + Sync

Sync

Това значи ли че Vec<T> е Sync?

Send + Sync

Sync

Това значи ли че Vec<T> е Sync?

Send + Sync

Sync

Това значи ли че Vec<T> е Sync?

Send + Sync

Sync

Това значи ли че Vec<T> е Sync?

Send + Sync

Sync

Това значи ли че Vec<T> е Sync?

Send и Sync

Аuto traits

1
pub struct Token(u32);
pub struct Token(u32);
fn main() {}

Auto trait docs

Send + Sync

Unsafe traits

1 2 3 4
struct MyBox(*mut u8);

unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}
fn main() {}
struct MyBox(*mut u8);

unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}

Send + Sync

Деимплементация

1 2 3 4 5 6 7
// Само на nightly
#![feature(optin_builtin_traits)]

struct SpecialToken(u8);

impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}

Send + Sync

Деимплементация

1 2 3 4 5 6 7
// Само на nightly
#![feature(optin_builtin_traits)]

struct SpecialToken(u8);

impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}

Send + Sync

Деимплементация

1 2 3 4 5 6 7
// Само на nightly
#![feature(optin_builtin_traits)]

struct SpecialToken(u8);

impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}

Send + Sync

Деимплементация

Хак за stable

1 2 3
use std::marker::PhantomData;

struct SpecialToken(u8, PhantomData<*const ()>);
fn main() {}
use std::marker::PhantomData;

struct SpecialToken(u8, PhantomData<*const ()>);

Arc

Да се върнем на кода, който не се компилираше

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
use std::rc::Rc;
use std::thread;

fn main() {
    let nums = Rc::new((0..5).collect::<Vec<_>>());

    let handles = (0..2)
        .map(|_| {
            let nums = Rc::clone(&nums);

            thread::spawn(move || {
                for i in &*nums {
                    println!("number {}", i);
                }
            })
        })
        .collect::<Vec<_>>();

    for h in handles {
        let _ = h.join();
    }
}
error[E0277]: `Rc<Vec<i32>>` cannot be sent between threads safely --> src/bin/main_8e08fc7cb2794a1557ea5e32c268dd5d930f61fb.rs:11:13 | 11 | thread::spawn(move || { | _____________^^^^^^^^^^^^^_- | | | | | `Rc<Vec<i32>>` cannot be sent between threads safely 12 | | for i in &*nums { 13 | | println!("number {}", i); 14 | | } 15 | | }) | |_____________- within this `[closure@src/bin/main_8e08fc7cb2794a1557ea5e32c268dd5d930f61fb.rs:11:27: 15:14]` | ::: /home/andrew/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:607:8 | 607 | F: Send + 'static, | ---- required by this bound in `spawn` | = help: within `[closure@src/bin/main_8e08fc7cb2794a1557ea5e32c268dd5d930f61fb.rs:11:27: 15:14]`, the trait `Send` is not implemented for `Rc<Vec<i32>>` = note: required because it appears within the type `[closure@src/bin/main_8e08fc7cb2794a1557ea5e32c268dd5d930f61fb.rs:11:27: 15:14]`
use std::rc::Rc;
use std::thread;

fn main() {
    let nums = Rc::new((0..5).collect::>());

    let handles = (0..2)
        .map(|_| {
            let nums = Rc::clone(&nums);

            thread::spawn(move || {
                for i in &*nums {
                    println!("number {}", i);
                }
            })
        })
        .collect::>();

    for h in handles {
        let _ = h.join();
    }
}

Arc

Решението е да заменим std::rc::Rc с std::sync::Arc

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
use std::sync::Arc;
use std::thread;

fn main() {
    let nums = Arc::new((0..5).collect::<Vec<_>>());

    let handles = (0..2)
        .map(|_| {
            let nums = Arc::clone(&nums);

            thread::spawn(move || {
                for i in &*nums {
                    println!("number {}", i);
                }
            })
        })
        .collect::<Vec<_>>();

    for h in handles {
        let _ = h.join();
    }
}
number 0 number 1 number 2 number 3 number 4 number 0 number 1 number 2 number 3 number 4
use std::sync::Arc;
use std::thread;

fn main() {
    let nums = Arc::new((0..5).collect::>());

    let handles = (0..2)
        .map(|_| {
            let nums = Arc::clone(&nums);

            thread::spawn(move || {
                for i in &*nums {
                    println!("number {}", i);
                }
            })
        })
        .collect::>();

    for h in handles {
        let _ = h.join();
    }
}

Arc

Arc

Arc

Arc

Arc

Синхронизация

Примитиви за синхронизация

Примитиви за синхронизация

Стандартния пример за грешен многонишков алгоритъм:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
let v = Arc::new((0..100).collect::<Vec<_>>());
let mut sum = 0;

let t1 = {
    let v = Arc::clone(&v);
    let sum = &mut sum;
    thread::spawn(move || for i in &v[0..50] { *sum += i; })
};

let t2 = {
    let v = Arc::clone(&v);
    let sum = &mut sum;
    thread::spawn(move || for i in &v[51..100] { *sum += i; })
};

let _ = t1.join();
let _ = t2.join();
println!("sum: {}", sum);
error[E0597]: `sum` does not live long enough --> src/bin/main_92ae06cbac3bb0b057de58f0494c471228f42c4d.rs:9:15 | 9 | let sum = &mut sum; | ^^^^^^^^ borrowed value does not live long enough 10 | thread::spawn(move || for i in &v[0..50] { *sum += i; }) | -------------------------------------------------------- argument requires that `sum` is borrowed for `'static` ... 22 | } | - `sum` dropped here while still borrowed error[E0499]: cannot borrow `sum` as mutable more than once at a time --> src/bin/main_92ae06cbac3bb0b057de58f0494c471228f42c4d.rs:15:15 | 9 | let sum = &mut sum; | -------- first mutable borrow occurs here 10 | thread::spawn(move || for i in &v[0..50] { *sum += i; }) | -------------------------------------------------------- argument requires that `sum` is borrowed for `'static` ... 15 | let sum = &mut sum; | ^^^^^^^^ second mutable borrow occurs here error[E0502]: cannot borrow `sum` as immutable because it is also borrowed as mutable --> src/bin/main_92ae06cbac3bb0b057de58f0494c471228f42c4d.rs:21:21 | 9 | let sum = &mut sum; | -------- mutable borrow occurs here 10 | thread::spawn(move || for i in &v[0..50] { *sum += i; }) | -------------------------------------------------------- argument requires that `sum` is borrowed for `'static` ... 21 | println!("sum: {}", sum); | ^^^ immutable borrow occurs here
use std::sync::Arc;
use std::thread;
fn main() {
let v = Arc::new((0..100).collect::>());
let mut sum = 0;

let t1 = {
    let v = Arc::clone(&v);
    let sum = &mut sum;
    thread::spawn(move || for i in &v[0..50] { *sum += i; })
};

let t2 = {
    let v = Arc::clone(&v);
    let sum = &mut sum;
    thread::spawn(move || for i in &v[51..100] { *sum += i; })
};

let _ = t1.join();
let _ = t2.join();
println!("sum: {}", sum);
}

Примитиви за синхронизация

Можем ли да го накараме да работи?

Примитиви за синхронизация

Можем ли да го накараме да работи?

Примитиви за синхронизация

Можем ли да го накараме да работи?

Примитиви за синхронизация

Можем ли да го накараме да работи?

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Модула std::sync

Mutex

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
use std::sync::Mutex;

fn main() {
    // мутекса опакова стойността, която предпазва
    let mutex = Mutex::new(10);

    {
        // заключваме мутекса
        // `lock` връща умен указател с deref до `&T` и `&mut T`
        let mut lock = mutex.lock().unwrap();
        *lock += 32;

        // мутекса се отключва когато `lock` се деалокира
    }
}
use std::sync::Mutex;

fn main() {
    // мутекса опакова стойността, която предпазва
    let mutex = Mutex::new(10);

    {
        // заключваме мутекса
        // `lock` връща умен указател с deref до `&T` и `&mut T`
        let mut lock = mutex.lock().unwrap();
        *lock += 32;

        // мутекса се отключва когато `lock` се деалокира
    }
}

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Mutex

Мutex

Обикновенно мутекса се възприема като примитива която определя критична секция

1 2 3 4 5 6 7
lock(my_mutex);
// начало на критичната секция

do_stuff(shared_data);

// край на критичната секция
unlock(my_mutex);

В Ръст това не би било удобно, защото не дава достатъчна информация на компилатора как ползваме данните.
Затова Mutex е generic и опакова данните.

Mutex

Mutex

Mutex

Mutex

Panic

Mutex

Panic

Mutex

Panic

Mutex

Panic

Mutex

Panic

Mutex

Panic

Mutex

Panic

RwLock

RwLock

RwLock

RwLock

Mutex срещу RwLock

Mutex срещу RwLock

Mutex срещу RwLock

Mutex срещу RwLock

Arc + Mutex

Подобно на Rc<RefCell<T>>, може често да виждате Arc<Mutex<T>> или Arc<RwLock<T>>

Condvar

Condvar

Condvar

Condvar

Condvar

1
let pair = Arc::new((Mutex::new(false), Condvar::new()));
use std::sync::{Arc, Mutex, Condvar};
fn main() {
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
}

1 2 3 4 5 6 7
// Thread A
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();

while !*started {
    started = cvar.wait(started).unwrap();
}

use std::sync::{Arc, Mutex, Condvar};
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
// Thread A
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();

while !*started {
    started = cvar.wait(started).unwrap();
}
}

1 2 3 4 5 6
// Thread B
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();

*started = true;
cvar.notify_one();

use std::sync::{Arc, Mutex, Condvar};
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
// Thread B
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();

*started = true;
cvar.notify_one();
}

Външни библиотеки

Parking lot

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Атомарни числа

Глобални променливи

Глобални променливи

Immutable static

Искаме да имаме глобална променлива, която се инициализира веднъж и повече не се модифицира

1
static MY_HASHMAP: HashMap<String, String> = /* ??? */;

Глобални променливи

Immutable static

Ако стойността може да се сметне по време на компилация сме готови

1
static MY_VEC: Vec<String> = Vec::new();
static MY_VEC: Vec = Vec::new();
fn main() {}

Глобални променливи

Immutable static

Ако стойността може да се сметне по време на компилация сме готови

1
static MY_VEC: Vec<String> = Vec::new();
static MY_VEC: Vec = Vec::new();
fn main() {}

Но това рядко се случва

1 2 3 4 5
static MY_VEC: Vec<String> = {
    let mut vec = Vec::new();
    vec.push(String::new());
    vec
};
error[E0764]: mutable references are not allowed in statics --> src/bin/main_6d890bc65513ef5d48a4d4a6c21499639c0709df.rs:3:5 | 3 | vec.push(String::new()); | ^^^ `&mut` is only allowed in `const fn` error[E0015]: calls in statics are limited to constant functions, tuple structs and tuple variants --> src/bin/main_6d890bc65513ef5d48a4d4a6c21499639c0709df.rs:3:5 | 3 | vec.push(String::new()); | ^^^^^^^^^^^^^^^^^^^^^^^ error[E0493]: destructors cannot be evaluated at compile-time --> src/bin/main_6d890bc65513ef5d48a4d4a6c21499639c0709df.rs:2:9 | 2 | let mut vec = Vec::new(); | ^^^^^^^ statics cannot evaluate destructors ... 5 | }; | - value is dropped here
static MY_VEC: Vec = {
    let mut vec = Vec::new();
    vec.push(String::new());
    vec
};
fn main() {}

Глобални променливи

Immutable static

За целта можем да използваме библиотеката lazy_static

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
use std::collections::HashMap;
use lazy_static::lazy_static;

lazy_static! {
    static ref FOOS: HashMap<u32, &'static str> = {
        let mut m = HashMap::new();
        m.insert(0, "foo");
        m.insert(1, "bar");
        m.insert(2, "baz");
        m
    };
}

fn main()  {
    println!("len: {}", FOOS.len());   // инициализира се при първи достъп
    println!("map: {:?}", *FOOS);      // deref magic
}
error[E0432]: unresolved import `lazy_static` --> src/bin/main_2ba49c35864d5b670a584634af87e855eb49d110.rs:2:5 | 2 | use lazy_static::lazy_static; | ^^^^^^^^^^^ maybe a missing crate `lazy_static`? error: cannot determine resolution for the macro `lazy_static` --> src/bin/main_2ba49c35864d5b670a584634af87e855eb49d110.rs:4:1 | 4 | lazy_static! { | ^^^^^^^^^^^ | = note: import resolution is stuck, try simplifying macro imports error[E0425]: cannot find value `FOOS` in this scope --> src/bin/main_2ba49c35864d5b670a584634af87e855eb49d110.rs:15:25 | 15 | println!("len: {}", FOOS.len()); // инициализира се при първи достъп | ^^^^ not found in this scope error[E0425]: cannot find value `FOOS` in this scope --> src/bin/main_2ba49c35864d5b670a584634af87e855eb49d110.rs:16:28 | 16 | println!("map: {:?}", *FOOS); // deref magic | ^^^^ not found in this scope warning: unused import: `std::collections::HashMap` --> src/bin/main_2ba49c35864d5b670a584634af87e855eb49d110.rs:1:5 | 1 | use std::collections::HashMap; | ^^^^^^^^^^^^^^^^^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default
use std::collections::HashMap;
use lazy_static::lazy_static;

lazy_static! {
    static ref FOOS: HashMap = {
        let mut m = HashMap::new();
        m.insert(0, "foo");
        m.insert(1, "bar");
        m.insert(2, "baz");
        m
    };
}

fn main()  {
    println!("len: {}", FOOS.len());   // инициализира се при първи достъп
    println!("map: {:?}", *FOOS);      // deref magic
}

Отзаде се използва std::sync::Once

Глобални променливи

Mutable static

Искаме да имаме глобална променлива, чиято стойност се променя по време на изпълнение на програмата

Глобални променливи

Mutable static

Искаме да имаме глобална променлива, чиято стойност се променя по време на изпълнение на програмата

Глобални променливи

Mutable static

Искаме да имаме глобална променлива, чиято стойност се променя по време на изпълнение на програмата

Глобални променливи

Mutable static

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
use std::sync::Mutex;
use std::collections::HashMap;
use lazy_static::lazy_static;

// `Mutex::new` не е константна функция
lazy_static! {
    static ref FOOS: Mutex<HashMap<u32, String>> = Mutex::new(HashMap::new());
}

fn main() {
    FOOS.lock().unwrap().insert(0, "foo".to_string());
    FOOS.lock().unwrap().insert(1, "bar".to_string());
    FOOS.lock().unwrap().insert(2, "baz".to_string());

    println!("{:?}", FOOS.lock().unwrap());
}
error[E0432]: unresolved import `lazy_static` --> src/bin/main_e9d73fa9a5c7c13d541bbd3f972b59ecd2346e9d.rs:3:5 | 3 | use lazy_static::lazy_static; | ^^^^^^^^^^^ maybe a missing crate `lazy_static`? error: cannot determine resolution for the macro `lazy_static` --> src/bin/main_e9d73fa9a5c7c13d541bbd3f972b59ecd2346e9d.rs:6:1 | 6 | lazy_static! { | ^^^^^^^^^^^ | = note: import resolution is stuck, try simplifying macro imports error[E0425]: cannot find value `FOOS` in this scope --> src/bin/main_e9d73fa9a5c7c13d541bbd3f972b59ecd2346e9d.rs:11:5 | 11 | FOOS.lock().unwrap().insert(0, "foo".to_string()); | ^^^^ not found in this scope error[E0425]: cannot find value `FOOS` in this scope --> src/bin/main_e9d73fa9a5c7c13d541bbd3f972b59ecd2346e9d.rs:12:5 | 12 | FOOS.lock().unwrap().insert(1, "bar".to_string()); | ^^^^ not found in this scope error[E0425]: cannot find value `FOOS` in this scope --> src/bin/main_e9d73fa9a5c7c13d541bbd3f972b59ecd2346e9d.rs:13:5 | 13 | FOOS.lock().unwrap().insert(2, "baz".to_string()); | ^^^^ not found in this scope error[E0425]: cannot find value `FOOS` in this scope --> src/bin/main_e9d73fa9a5c7c13d541bbd3f972b59ecd2346e9d.rs:15:22 | 15 | println!("{:?}", FOOS.lock().unwrap()); | ^^^^ not found in this scope warning: unused import: `std::sync::Mutex` --> src/bin/main_e9d73fa9a5c7c13d541bbd3f972b59ecd2346e9d.rs:1:5 | 1 | use std::sync::Mutex; | ^^^^^^^^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default warning: unused import: `std::collections::HashMap` --> src/bin/main_e9d73fa9a5c7c13d541bbd3f972b59ecd2346e9d.rs:2:5 | 2 | use std::collections::HashMap; | ^^^^^^^^^^^^^^^^^^^^^^^^^
use std::sync::Mutex;
use std::collections::HashMap;
use lazy_static::lazy_static;

// `Mutex::new` не е константна функция
lazy_static! {
    static ref FOOS: Mutex> = Mutex::new(HashMap::new());
}

fn main() {
    FOOS.lock().unwrap().insert(0, "foo".to_string());
    FOOS.lock().unwrap().insert(1, "bar".to_string());
    FOOS.lock().unwrap().insert(2, "baz".to_string());

    println!("{:?}", FOOS.lock().unwrap());
}

Глобални променливи

Thread local

Искаме да имаме отделна глобална променлива за всяка нишка

Глобални променливи

Thread local

Искаме да имаме отделна глобална променлива за всяка нишка

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
thread_local! {
    static FOOS: RefCell<HashMap<u32, String>> = RefCell::new(HashMap::new());
}

fn main() {
    FOOS.with(|foos| foos.borrow_mut().insert(0, "foo".to_string()));

    std::thread::spawn(|| {
        FOOS.with(|foos| {
            foos.borrow_mut().insert(1, "bar".to_string());
            foos.borrow_mut().insert(2, "baz".to_string());
            println!("thread 1: {:?}", foos.borrow());
        });
    })
    .join()
    .unwrap();

    FOOS.with(|foos| {
        println!("thread main: {:?}", foos.borrow());
    });
}
thread 1: {2: "baz", 1: "bar"} thread main: {0: "foo"}
use std::collections::HashMap;
use std::cell::RefCell;
thread_local! {
    static FOOS: RefCell> = RefCell::new(HashMap::new());
}

fn main() {
    FOOS.with(|foos| foos.borrow_mut().insert(0, "foo".to_string()));

    std::thread::spawn(|| {
        FOOS.with(|foos| {
            foos.borrow_mut().insert(1, "bar".to_string());
            foos.borrow_mut().insert(2, "baz".to_string());
            println!("thread 1: {:?}", foos.borrow());
        });
    })
    .join()
    .unwrap();

    FOOS.with(|foos| {
        println!("thread main: {:?}", foos.borrow());
    });
}

Канали

MPSC

Канали

Go-lang motto

Don't communicate by sharing memory,
share memory by communicating

Канали в стандартната библиотека

1 2 3 4 5 6 7 8 9 10 11 12
use std::sync::mpsc;
use std::thread;

fn main() {
    let (sender, receiver) = mpsc::channel();

    thread::spawn(move || {
        sender.send(10).unwrap();
    });

    println!("received {}", receiver.recv().unwrap());
}
received 10
use std::sync::mpsc;
use std::thread;

fn main() {
    let (sender, receiver) = mpsc::channel();

    thread::spawn(move || {
        sender.send(10).unwrap();
    });

    println!("received {}", receiver.recv().unwrap());
}

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

1 2 3 4 5 6 7 8 9 10 11
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
    sender.send(3).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
    sender.send(3).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);
}

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Ограничен канал

1 2 3 4 5 6 7 8 9 10 11 12
let (sender, receiver) = mpsc::sync_channel(1);

thread::spawn(move || {
    // записва съобщението и връща веднага
    sender.send(1).unwrap();

    // ще блокира докато главната нишка не извика `receiver.recv()`
    sender.send(2).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::sync_channel(1);

thread::spawn(move || {
    // записва съобщението и връща веднага
    sender.send(1).unwrap();

    // ще блокира докато главната нишка не извика `receiver.recv()`
    sender.send(2).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
}

Множество изпращачи

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
});

thread::spawn(move || {
    sender2.send(3).unwrap();
    sender2.send(4).unwrap();
});

println!("{} {} {} {}",
    receiver.recv().unwrap(), receiver.recv().unwrap(),
    receiver.recv().unwrap(), receiver.recv().unwrap());
1 2 3 4
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
});

thread::spawn(move || {
    sender2.send(3).unwrap();
    sender2.send(4).unwrap();
});

println!("{} {} {} {}",
    receiver.recv().unwrap(), receiver.recv().unwrap(),
    receiver.recv().unwrap(), receiver.recv().unwrap());
}

Sender

Методи

1 2 3
// изпраща `t`
// връща грешка ако получателят е бил унищожен
fn send(&self, t: T) -> Result<(), SendError<T>>

Sender

Методи

1 2 3 4 5 6 7 8 9 10 11
let (sender, receiver) = mpsc::channel();

assert_eq!(sender.send(12), Ok(()));

// унищожаваме получателя
// съобщението `12` никога няма да бъде получено
mem::drop(receiver);

// грешка - получателя е унищожен
// можем да си върнем съобщението `23` от грешката
assert_eq!(sender.send(23), Err(SendError(23)));
use std::mem;
use std::sync::mpsc::{self, SendError};
fn main() {
let (sender, receiver) = mpsc::channel();

assert_eq!(sender.send(12), Ok(()));

// унищожаваме получателя
// съобщението `12` никога няма да бъде получено
mem::drop(receiver);

// грешка - получателя е унищожен
// можем да си върнем съобщението `23` от грешката
assert_eq!(sender.send(23), Err(SendError(23)));
}

SyncSender

Методи

1 2 3 4 5
// блокира ако буфера е пълен
fn send(&self, t: T) -> Result<(), SendError<T>>

// връща грешка ако буфера е пълен или получателят е бил унищожен
fn try_send(&self, t: T) -> Result<(), TrySendError<T>>

SyncSender

Методи

1 2 3 4 5 6 7 8
let (sender, receiver) = mpsc::sync_channel(1);

assert_eq!(sender.try_send(12), Ok(()));
assert_eq!(sender.try_send(23), Err(TrySendError::Full(23)));

mem::drop(receiver);

assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23)));
use std::mem;
use std::sync::mpsc::{self, TrySendError};
fn main() {
let (sender, receiver) = mpsc::sync_channel(1);

assert_eq!(sender.try_send(12), Ok(()));
assert_eq!(sender.try_send(23), Err(TrySendError::Full(23)));

mem::drop(receiver);

assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23)));
}

Множество получатели

Множество получатели

Множество получатели

Множество получатели

Receiver

Методи

1 2 3 4 5 6 7 8 9 10 11
// блокира докато не получи съобщение
// връща грешка ако всички изпращачи са унищожени
fn recv(&self) -> Result<T, RecvError>

// не блокира
// връща грешка ако всички изпращачи са унищожени или няма съобщение в опашката
fn try_recv(&self) -> Result<T, TryRecvError>

// блокира за определено време
// връща грешка ако всички изпращачи са унищожени или е изтекло времето
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>

Receiver

Методи

1 2 3 4 5 6 7 8 9 10 11
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

while let Ok(msg) = receiver.recv() {
    println!("received {}", msg);
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

while let Ok(msg) = receiver.recv() {
    println!("received {}", msg);
}
}

Receiver

Итератори

1 2 3 4 5 6 7 8 9 10 11
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

for msg in receiver.iter() {
    println!("received {}", msg);
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

for msg in receiver.iter() {
    println!("received {}", msg);
}
}

Външни библиотеки

Crossbeam channel

Външни библиотеки

Crossbeam

Външни библиотеки

Rayon

Въпроси