Многонишково програмиране
30 ноември 2020
Административни неща
- второ домашно приключи
Административни неща
- второ домашно приключи
- следващата седмица няма да имаме лекции (7 и 9 декември)
Fearless concurrency
Нишки
use std::thread;
fn main() {
thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
}
use std::thread; fn main() { thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); }
Нишки
use std::thread;
fn main() {
thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
}
use std::thread; fn main() { thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); }
thread::spawn
пуска нова нишка (на операционната система)
Нишки
use std::thread;
fn main() {
thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
}
use std::thread; fn main() { thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); }
thread::spawn
пуска нова нишка (на операционната система)- и изпълнява подадената функция в нея
Нишки
use std::thread;
fn main() {
thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
}
use std::thread; fn main() { thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); }
thread::spawn
пуска нова нишка (на операционната система)- и изпълнява подадената функция в нея
- когато функцията завърши, нишката се спира
Нишки
use std::thread;
fn main() {
let handle = thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread; fn main() { let handle = thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); let _ = handle.join(); }
Нишки
use std::thread;
fn main() {
let handle = thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread; fn main() { let handle = thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); let _ = handle.join(); }
- програмата приключва когато главната нишка завърши
Нишки
use std::thread;
fn main() {
let handle = thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread; fn main() { let handle = thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); let _ = handle.join(); }
- програмата приключва когато главната нишка завърши
spawn
връщаJoinHandle
Нишки
use std::thread;
fn main() {
let handle = thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread; fn main() { let handle = thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); let _ = handle.join(); }
- програмата приключва когато главната нишка завърши
spawn
връщаJoinHandle
- можем да използваме
join
за да изчакаме пуснатите нишки
Нишки
use std::thread;
fn main() {
let handle = thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread; fn main() { let handle = thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); let _ = handle.join(); }
- програмата приключва когато главната нишка завърши
spawn
връщаJoinHandle
- можем да използваме
join
за да изчакаме пуснатите нишки - когато
JoinHandle
се drop-не нишката се detach-ва
Panic
Panic
panic!
в главната нишка спира програмата
Panic
panic!
в главната нишка спира програматаpanic!
в друга нишка спира нишката
Panic
panic!
в главната нишка спира програматаpanic!
в друга нишка спира нишкатаJoinHandle::join
връща резултат
Panic
panic!
в главната нишка спира програматаpanic!
в друга нишка спира нишкатаJoinHandle::join
връща резултатOk(T)
ако функцията е завършила успешно
Panic
panic!
в главната нишка спира програматаpanic!
в друга нишка спира нишкатаJoinHandle::join
връща резултатOk(T)
ако функцията е завършила успешноErr(Box<Any>)
ако е имало паника
Споделяне на стойности
use std::thread;
fn main() {
let nums = (0..5).collect::<Vec<_>>();
let handle = thread::spawn(|| {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
Споделяне на стойности
use std::thread;
fn main() {
let nums = (0..5).collect::<Vec<_>>();
let handle = thread::spawn(|| {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
error[E0373]: closure may outlive the current function, but it borrows `nums`, which is owned by the current function --> src/bin/main_70d75afc36316cf9348e1b9a1b43baada82fd5b2.rs:6:32 | 6 | let handle = thread::spawn(|| { | ^^ may outlive borrowed value `nums` 7 | for i in &nums { | ---- `nums` is borrowed here | note: function requires argument type to outlive `'static` --> src/bin/main_70d75afc36316cf9348e1b9a1b43baada82fd5b2.rs:6:18 | 6 | let handle = thread::spawn(|| { | __________________^ 7 | | for i in &nums { 8 | | println!("number {}", i); 9 | | } 10 | | }); | |______^ help: to force the closure to take ownership of `nums` (and any other referenced variables), use the `move` keyword | 6 | let handle = thread::spawn(move || { | ^^^^^^^
use std::thread; fn main() { let nums = (0..5).collect::>(); let handle = thread::spawn(|| { for i in &nums { println!("number {}", i); } }); let _ = handle.join(); }
Споделяне на стойности
- новосъздадената нишка може да надживее функцията в която е извикана
Споделяне на стойности
- новосъздадената нишка може да надживее функцията в която е извикана
- затова rust не позволява да подадем референции към локални променливи
Споделяне на стойности
- новосъздадената нишка може да надживее функцията в която е извикана
- затова rust не позволява да подадем референции към локални променливи
- това се налага от ограничението на
spawn
, която приемаF: 'static
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static
Споделяне на стойности
Можем да преместим стойността в новата нишка
use std::thread;
fn main() {
let nums = (0..5).collect::<Vec<_>>();
let handle = thread::spawn(move || {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
number 0 number 1 number 2 number 3 number 4
use std::thread; fn main() { let nums = (0..5).collect::>(); let handle = thread::spawn(move || { for i in &nums { println!("number {}", i); } }); let _ = handle.join(); }
Споделяне между няколко нишки
Как бихме споделили стойност между няколко нишки?
use std::thread;
fn main() {
let nums = (0..5).collect::<Vec<_>>();
let handles = (0..2).map(|_| {
thread::spawn(move || {
for i in &nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
Споделяне между няколко нишки
Как бихме споделили стойност между няколко нишки?
use std::thread;
fn main() {
let nums = (0..5).collect::<Vec<_>>();
let handles = (0..2).map(|_| {
thread::spawn(move || {
for i in &nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
- прехвърляне на собствеността няма как да работи
Споделяне между няколко нишки
Как бихме споделили стойност между няколко нишки?
use std::thread;
fn main() {
let nums = (0..5).collect::<Vec<_>>();
let handles = (0..2).map(|_| {
thread::spawn(move || {
for i in &nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
- прехвърляне на собствеността няма как да работи
- не можем да ползваме референция, защото нишката може да надживее
main
Споделяне между няколко нишки
Можем да пробваме с Rc
use std::rc::Rc;
use std::thread;
fn main() {
let nums_vec = (0..5).collect::<Vec<_>>();
let nums = Rc::new(nums_vec);
let handles = (0..2)
.map(|_| {
let nums = Rc::clone(&nums);
thread::spawn(move || {
for i in &*nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
Споделяне между няколко нишки
Можем да пробваме с Rc
use std::rc::Rc;
use std::thread;
fn main() {
let nums_vec = (0..5).collect::<Vec<_>>();
let nums = Rc::new(nums_vec);
let handles = (0..2)
.map(|_| {
let nums = Rc::clone(&nums);
thread::spawn(move || {
for i in &*nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
error[E0277]: `Rc<Vec<i32>>` cannot be sent between threads safely --> src/bin/main_a64edfb8be3b487ac4411ab31257712271d4ea64.rs:12:13 | 12 | thread::spawn(move || { | _____________^^^^^^^^^^^^^_- | | | | | `Rc<Vec<i32>>` cannot be sent between threads safely 13 | | for i in &*nums { 14 | | println!("number {}", i); 15 | | } 16 | | }) | |_____________- within this `[closure@src/bin/main_a64edfb8be3b487ac4411ab31257712271d4ea64.rs:12:27: 16:14]` | ::: /home/andrew/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:607:8 | 607 | F: Send + 'static, | ---- required by this bound in `spawn` | = help: within `[closure@src/bin/main_a64edfb8be3b487ac4411ab31257712271d4ea64.rs:12:27: 16:14]`, the trait `Send` is not implemented for `Rc<Vec<i32>>` = note: required because it appears within the type `[closure@src/bin/main_a64edfb8be3b487ac4411ab31257712271d4ea64.rs:12:27: 16:14]`
use std::rc::Rc; use std::thread; fn main() { let nums_vec = (0..5).collect::>(); let nums = Rc::new(nums_vec); let handles = (0..2) .map(|_| { let nums = Rc::clone(&nums); thread::spawn(move || { for i in &*nums { println!("number {}", i); } }) }) .collect:: >(); for h in handles { let _ = h.join(); } }
Send и Sync
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static
Send и Sync
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
- Sync - позволява споделяне между нишки през референция (
&T
)
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
- Sync - позволява споделяне между нишки през референция (
&T
) - marker traits
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
- Sync - позволява споделяне между нишки през референция (
&T
) - marker traits
- имплементирани са за повечето типове
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
- Sync - позволява споделяне между нишки през референция (
&T
) - marker traits
- имплементирани са за повечето типове
- auto traits
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
- Sync - позволява споделяне между нишки през референция (
&T
) - marker traits
- имплементирани са за повечето типове
- auto traits
- unsafe traits
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
- пример за типове, които не са
Send
:
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
- пример за типове, които не са
Send
:Rc
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
- пример за типове, които не са
Send
:Rc
*const T
и*mut T
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
- пример за типове, които не са
Send
:Rc
*const T
и*mut T
- thread local типове, напр.
rand::rngs::ThreadRng
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
- пример за типове, които не са
Send
:Rc
*const T
и*mut T
- thread local типове, напр.
rand::rngs::ThreadRng
- и други
Send + Sync
Sync
- позволява споделяне на референция
&T
между нишки
Send + Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
Send + Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
- пример за типове, които не са
Sync
:
Send + Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
- пример за типове, които не са
Sync
:Cell
,RefCell
Send + Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
- пример за типове, които не са
Sync
:Cell
,RefCell
Rc
Send + Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
- пример за типове, които не са
Sync
:Cell
,RefCell
Rc
*const T
и*mut T
Send + Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
- пример за типове, които не са
Sync
:Cell
,RefCell
Rc
*const T
и*mut T
- и други
Send + Sync
Sync
Това значи ли че Vec<T>
е Sync
?
Send + Sync
Sync
Това значи ли че Vec<T>
е Sync
?
- да, ако
T: Sync
Send + Sync
Sync
Това значи ли че Vec<T>
е Sync
?
- да, ако
T: Sync
- ако нашата нишка има
&Vec<_>
значи никой не може да модифицира вектора
Send + Sync
Sync
Това значи ли че Vec<T>
е Sync
?
- да, ако
T: Sync
- ако нашата нишка има
&Vec<_>
значи никой не може да модифицира вектора - ако нашата нишка има
&mut Vec<_>
значи никой друг няма референция до вектора
Send + Sync
Sync
Това значи ли че Vec<T>
е Sync
?
- да, ако
T: Sync
- ако нашата нишка има
&Vec<_>
значи никой не може да модифицира вектора - ако нашата нишка има
&mut Vec<_>
значи никой друг няма референция до вектора - типове, които не са
Sync
, обикновено имат internal mutability без синхронизация
Send и Sync
Аuto traits
- имплементират се автоматично ако всичките полета са съответно
Send
иSync
pub struct Token(u32);
pub struct Token(u32); fn main() {}
Send + Sync
Unsafe traits
- unsafe са за ръчна имплементация
struct MyBox(*mut u8);
unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}
fn main() {} struct MyBox(*mut u8); unsafe impl Send for MyBox {} unsafe impl Sync for MyBox {}
Send + Sync
Деимплементация
// Само на nightly
#![feature(optin_builtin_traits)]
struct SpecialToken(u8);
impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}
Send + Sync
Деимплементация
// Само на nightly
#![feature(optin_builtin_traits)]
struct SpecialToken(u8);
impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}
- автоматичната имплементация никога няма да е грешна от само себе си
Send + Sync
Деимплементация
// Само на nightly
#![feature(optin_builtin_traits)]
struct SpecialToken(u8);
impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}
- автоматичната имплементация никога няма да е грешна от само себе си
- но може да пишем код, който разчита, че определен тип не може да се прехвърля / споделя
Send + Sync
Деимплементация
Хак за stable
use std::marker::PhantomData;
struct SpecialToken(u8, PhantomData<*const ()>);
fn main() {} use std::marker::PhantomData; struct SpecialToken(u8, PhantomData<*const ()>);
Arc
Да се върнем на кода, който не се компилираше
use std::rc::Rc;
use std::thread;
fn main() {
let nums = Rc::new((0..5).collect::<Vec<_>>());
let handles = (0..2)
.map(|_| {
let nums = Rc::clone(&nums);
thread::spawn(move || {
for i in &*nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
error[E0277]: `Rc<Vec<i32>>` cannot be sent between threads safely --> src/bin/main_8e08fc7cb2794a1557ea5e32c268dd5d930f61fb.rs:11:13 | 11 | thread::spawn(move || { | _____________^^^^^^^^^^^^^_- | | | | | `Rc<Vec<i32>>` cannot be sent between threads safely 12 | | for i in &*nums { 13 | | println!("number {}", i); 14 | | } 15 | | }) | |_____________- within this `[closure@src/bin/main_8e08fc7cb2794a1557ea5e32c268dd5d930f61fb.rs:11:27: 15:14]` | ::: /home/andrew/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:607:8 | 607 | F: Send + 'static, | ---- required by this bound in `spawn` | = help: within `[closure@src/bin/main_8e08fc7cb2794a1557ea5e32c268dd5d930f61fb.rs:11:27: 15:14]`, the trait `Send` is not implemented for `Rc<Vec<i32>>` = note: required because it appears within the type `[closure@src/bin/main_8e08fc7cb2794a1557ea5e32c268dd5d930f61fb.rs:11:27: 15:14]`
use std::rc::Rc; use std::thread; fn main() { let nums = Rc::new((0..5).collect::>()); let handles = (0..2) .map(|_| { let nums = Rc::clone(&nums); thread::spawn(move || { for i in &*nums { println!("number {}", i); } }) }) .collect:: >(); for h in handles { let _ = h.join(); } }
Arc
Решението е да заменим std::rc::Rc
с std::sync::Arc
use std::sync::Arc;
use std::thread;
fn main() {
let nums = Arc::new((0..5).collect::<Vec<_>>());
let handles = (0..2)
.map(|_| {
let nums = Arc::clone(&nums);
thread::spawn(move || {
for i in &*nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
number 0 number 1 number 2 number 3 number 4 number 0 number 1 number 2 number 3 number 4
use std::sync::Arc; use std::thread; fn main() { let nums = Arc::new((0..5).collect::>()); let handles = (0..2) .map(|_| { let nums = Arc::clone(&nums); thread::spawn(move || { for i in &*nums { println!("number {}", i); } }) }) .collect:: >(); for h in handles { let _ = h.join(); } }
Arc
Arc
- Atomic Reference Counter
Arc
- Atomic Reference Counter
- аналогично на Rc (споделена собственост, позволява само взимане на
&T
към вътрешността)
Arc
- Atomic Reference Counter
- аналогично на Rc (споделена собственост, позволява само взимане на
&T
към вътрешността) - но използва атомарни операции за броене на референциите
Arc
- Atomic Reference Counter
- аналогично на Rc (споделена собственост, позволява само взимане на
&T
към вътрешността) - но използва атомарни операции за броене на референциите
- може да се използва за споделяне на стойности между нишки, ако
T: Send + Sync
Синхронизация
Примитиви за синхронизация
Примитиви за синхронизация
Стандартния пример за грешен многонишков алгоритъм:
let v = Arc::new((0..100).collect::<Vec<_>>());
let mut sum = 0;
let t1 = {
let v = Arc::clone(&v);
let sum = &mut sum;
thread::spawn(move || for i in &v[0..50] { *sum += i; })
};
let t2 = {
let v = Arc::clone(&v);
let sum = &mut sum;
thread::spawn(move || for i in &v[51..100] { *sum += i; })
};
let _ = t1.join();
let _ = t2.join();
println!("sum: {}", sum);
error[E0597]: `sum` does not live long enough --> src/bin/main_92ae06cbac3bb0b057de58f0494c471228f42c4d.rs:9:15 | 9 | let sum = &mut sum; | ^^^^^^^^ borrowed value does not live long enough 10 | thread::spawn(move || for i in &v[0..50] { *sum += i; }) | -------------------------------------------------------- argument requires that `sum` is borrowed for `'static` ... 22 | } | - `sum` dropped here while still borrowed error[E0499]: cannot borrow `sum` as mutable more than once at a time --> src/bin/main_92ae06cbac3bb0b057de58f0494c471228f42c4d.rs:15:15 | 9 | let sum = &mut sum; | -------- first mutable borrow occurs here 10 | thread::spawn(move || for i in &v[0..50] { *sum += i; }) | -------------------------------------------------------- argument requires that `sum` is borrowed for `'static` ... 15 | let sum = &mut sum; | ^^^^^^^^ second mutable borrow occurs here error[E0502]: cannot borrow `sum` as immutable because it is also borrowed as mutable --> src/bin/main_92ae06cbac3bb0b057de58f0494c471228f42c4d.rs:21:21 | 9 | let sum = &mut sum; | -------- mutable borrow occurs here 10 | thread::spawn(move || for i in &v[0..50] { *sum += i; }) | -------------------------------------------------------- argument requires that `sum` is borrowed for `'static` ... 21 | println!("sum: {}", sum); | ^^^ immutable borrow occurs here
use std::sync::Arc; use std::thread; fn main() { let v = Arc::new((0..100).collect::>()); let mut sum = 0; let t1 = { let v = Arc::clone(&v); let sum = &mut sum; thread::spawn(move || for i in &v[0..50] { *sum += i; }) }; let t2 = { let v = Arc::clone(&v); let sum = &mut sum; thread::spawn(move || for i in &v[51..100] { *sum += i; }) }; let _ = t1.join(); let _ = t2.join(); println!("sum: {}", sum); }
Примитиви за синхронизация
Можем ли да го накараме да работи?
Примитиви за синхронизация
Можем ли да го накараме да работи?
&mut i32
- не можем да имаме два пъти&mut
, а иspawn
очаква'static
Примитиви за синхронизация
Можем ли да го накараме да работи?
&mut i32
- не можем да имаме два пъти&mut
, а иspawn
очаква'static
Arc<i32>
- нямаме как да модифицираме съдържанието
Примитиви за синхронизация
Можем ли да го накараме да работи?
&mut i32
- не можем да имаме два пъти&mut
, а иspawn
очаква'static
Arc<i32>
- нямаме как да модифицираме съдържаниетоArc<Cell<i32>>
,Arc<RefCell<i32>>
-Cell
иRefCell
не саSync
Примитиви за синхронизация
Можем да го накараме да работи
Примитиви за синхронизация
Можем да го накараме да работи
Arc<Mutex<i32>>
Примитиви за синхронизация
Можем да го накараме да работи
Arc<Mutex<i32>>
- атомарни числа
Примитиви за синхронизация
Можем да го накараме да работи
Arc<Mutex<i32>>
- атомарни числа
- да връщаме резултат от нишката
Примитиви за синхронизация
Можем да го накараме да работи
Arc<Mutex<i32>>
- атомарни числа
- да връщаме резултат от нишката
- …
Примитиви за синхронизация
Модула std::sync
- std::sync
Arc
Mutex
,RwLock
Condvar
,Barrier
atomic
mpsc
- …
Mutex
use std::sync::Mutex;
fn main() {
// мутекса опакова стойността, която предпазва
let mutex = Mutex::new(10);
{
// заключваме мутекса
// `lock` връща умен указател с deref до `&T` и `&mut T`
let mut lock = mutex.lock().unwrap();
*lock += 32;
// мутекса се отключва когато `lock` се деалокира
}
}
use std::sync::Mutex; fn main() { // мутекса опакова стойността, която предпазва let mutex = Mutex::new(10); { // заключваме мутекса // `lock` връща умен указател с deref до `&T` и `&mut T` let mut lock = mutex.lock().unwrap(); *lock += 32; // мутекса се отключва когато `lock` се деалокира } }
Mutex
- mutual exclusion
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
- съдържа флаг - дали мутекса е заключен или свободен
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
- съдържа флаг - дали мутекса е заключен или свободен
- ако мутекса е отключен и извикаме
lock
- заключваме го
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
- съдържа флаг - дали мутекса е заключен или свободен
- ако мутекса е отключен и извикаме
lock
- заключваме го - ако мутекса е заключен и извикаме
lock
- нишката ни се спира
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
- съдържа флаг - дали мутекса е заключен или свободен
- ако мутекса е отключен и извикаме
lock
- заключваме го - ако мутекса е заключен и извикаме
lock
- нишката ни се спира - операционната система ще я събуди когато мутекса е свободен
Мutex
Обикновенно мутекса се възприема като примитива която определя критична секция
lock(my_mutex);
// начало на критичната секция
do_stuff(shared_data);
// край на критичната секция
unlock(my_mutex);
В Ръст това не би било удобно, защото не дава достатъчна информация на компилатора как ползваме данните.
Затова Mutex
е generic и опакова данните.
Mutex
Mutex<()>
може да се използва като флаг
Mutex
Mutex<()>
може да се използва като флаг- дали е наш ред да изпълним някаква операция
Mutex
Mutex<()>
може да се използва като флаг- дали е наш ред да изпълним някаква операция
- понякога
Condvar
е по-подходящ за този случай
Mutex
Panic
Mutex
Panic
- ако нишка е заключила мутекс и влезе в
panic!
по това време, мутекса се зачита за отровен
Mutex
Panic
- ако нишка е заключила мутекс и влезе в
panic!
по това време, мутекса се зачита за отровен - може данните пазени от мутекса да са останали в невалидно състояние
Mutex
Panic
- ако нишка е заключила мутекс и влезе в
panic!
по това време, мутекса се зачита за отровен - може данните пазени от мутекса да са останали в невалидно състояние
- затова
Mutex::lock()
иMutex::try_lock()
връщат резултат
Mutex
Panic
- ако нишка е заключила мутекс и влезе в
panic!
по това време, мутекса се зачита за отровен - може данните пазени от мутекса да са останали в невалидно състояние
- затова
Mutex::lock()
иMutex::try_lock()
връщат резултат Ok(MutexGuard)
Mutex
Panic
- ако нишка е заключила мутекс и влезе в
panic!
по това време, мутекса се зачита за отровен - може данните пазени от мутекса да са останали в невалидно състояние
- затова
Mutex::lock()
иMutex::try_lock()
връщат резултат Ok(MutexGuard)
Err(PoisonError)
Mutex
Panic
- ако нишка е заключила мутекс и влезе в
panic!
по това време, мутекса се зачита за отровен - може данните пазени от мутекса да са останали в невалидно състояние
- затова
Mutex::lock()
иMutex::try_lock()
връщат резултат Ok(MutexGuard)
Err(PoisonError)
- често срещано е резултата от
lock
просто да сеunwrap
-не
RwLock
- Reader-writer lock
RwLock
- Reader-writer lock
- позволява четене от много места
RwLock
- Reader-writer lock
- позволява четене от много места
- или писане от едно място
RwLock
- Reader-writer lock
- позволява четене от много места
- или писане от едно място
- подобно на
RefCell
в многонишков контекст
Mutex срещу RwLock
- Mutex е по-бърз и по-лек от RwLock
Mutex срещу RwLock
- Mutex е по-бърз и по-лек от RwLock
- добра практика е критичните секции да са възможно най-кратки
Mutex срещу RwLock
- Mutex е по-бърз и по-лек от RwLock
- добра практика е критичните секции да са възможно най-кратки
- ако спазваме това няма да имаме много съперничество между нишките и мутекса е добра идея
Mutex срещу RwLock
- Mutex е по-бърз и по-лек от RwLock
- добра практика е критичните секции да са възможно най-кратки
- ако спазваме това няма да имаме много съперничество между нишките и мутекса е добра идея
RwLock
може да се използва да опаковаме стари C++ библиотеки
Arc + Mutex
Подобно на Rc<RefCell<T>>
, може често да виждате Arc<Mutex<T>>
или Arc<RwLock<T>>
Condvar
- Conditional variable
Condvar
- Conditional variable
- позволява да изчакаме за някакво събитие или състояние
Condvar
- Conditional variable
- позволява да изчакаме за някакво събитие или състояние
- нишката ни ще заспи докато това не се изпълни
Condvar
- Conditional variable
- позволява да изчакаме за някакво събитие или състояние
- нишката ни ще заспи докато това не се изпълни
- и ще бъде събудена от операционната система
Condvar
let pair = Arc::new((Mutex::new(false), Condvar::new()));
use std::sync::{Arc, Mutex, Condvar}; fn main() { let pair = Arc::new((Mutex::new(false), Condvar::new())); }
// Thread A
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
use std::sync::{Arc, Mutex, Condvar}; fn main() { let pair = Arc::new((Mutex::new(false), Condvar::new())); // Thread A let (lock, cvar) = &*pair; let mut started = lock.lock().unwrap(); while !*started { started = cvar.wait(started).unwrap(); } }
// Thread B
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
use std::sync::{Arc, Mutex, Condvar}; fn main() { let pair = Arc::new((Mutex::new(false), Condvar::new())); // Thread B let (lock, cvar) = &*pair; let mut started = lock.lock().unwrap(); *started = true; cvar.notify_one(); }
Външни библиотеки
Parking lot
- https://crates.io/crates/parking_lot
- https://docs.rs/parking_lot
- имплементация на
Mutex
,RwLock
,Condvar
,Once
- по-малки от към памет от
std
- по-бързи от
std
(в почти всички случаи)
Атомарни числа
- AtomicBool, AtomicUsize, AtomicIsize, AtomicPtr
Атомарни числа
- AtomicBool, AtomicUsize, AtomicIsize, AtomicPtr
- AtomicU8, AtomicU16, …
Атомарни числа
- AtomicBool, AtomicUsize, AtomicIsize, AtomicPtr
- AtomicU8, AtomicU16, …
- имплементират се чрез специални инструкции на процесора
Атомарни числа
- AtomicBool, AtomicUsize, AtomicIsize, AtomicPtr
- AtomicU8, AtomicU16, …
- имплементират се чрез специални инструкции на процесора
- стоят в основата на много алгоритми
Атомарни числа
- AtomicBool, AtomicUsize, AtomicIsize, AtomicPtr
- AtomicU8, AtomicU16, …
- имплементират се чрез специални инструкции на процесора
- стоят в основата на много алгоритми
- удобни са за създаване на различни броячи и подобни
Атомарни числа
- препоръчително да се използват пред
Mutex<{integer}>
Атомарни числа
- препоръчително да се използват пред
Mutex<{integer}>
- интерфейса наподобява
Cell<{integer}>
, но саSend + Sync
Атомарни числа
- препоръчително да се използват пред
Mutex<{integer}>
- интерфейса наподобява
Cell<{integer}>
, но саSend + Sync
- т.е. модифицират се през
&T
и връщат копие на числото
Атомарни числа
- препоръчително да се използват пред
Mutex<{integer}>
- интерфейса наподобява
Cell<{integer}>
, но саSend + Sync
- т.е. модифицират се през
&T
и връщат копие на числото - аритметични операции:
fetch_add
,fetch_xor
, …
Атомарни числа
- препоръчително да се използват пред
Mutex<{integer}>
- интерфейса наподобява
Cell<{integer}>
, но саSend + Sync
- т.е. модифицират се през
&T
и връщат копие на числото - аритметични операции:
fetch_add
,fetch_xor
, … - oперации по паметта:
load
,store
,compare_and_swap
, …
Глобални променливи
Глобални променливи
Immutable static
Искаме да имаме глобална променлива, която се инициализира веднъж и повече не се модифицира
static MY_HASHMAP: HashMap<String, String> = /* ??? */;
Глобални променливи
Immutable static
Ако стойността може да се сметне по време на компилация сме готови
static MY_VEC: Vec<String> = Vec::new();
static MY_VEC: Vec= Vec::new(); fn main() {}
Глобални променливи
Immutable static
Ако стойността може да се сметне по време на компилация сме готови
static MY_VEC: Vec<String> = Vec::new();
static MY_VEC: Vec= Vec::new(); fn main() {}
Но това рядко се случва
static MY_VEC: Vec<String> = {
let mut vec = Vec::new();
vec.push(String::new());
vec
};
error[E0764]: mutable references are not allowed in statics --> src/bin/main_6d890bc65513ef5d48a4d4a6c21499639c0709df.rs:3:5 | 3 | vec.push(String::new()); | ^^^ `&mut` is only allowed in `const fn` error[E0015]: calls in statics are limited to constant functions, tuple structs and tuple variants --> src/bin/main_6d890bc65513ef5d48a4d4a6c21499639c0709df.rs:3:5 | 3 | vec.push(String::new()); | ^^^^^^^^^^^^^^^^^^^^^^^ error[E0493]: destructors cannot be evaluated at compile-time --> src/bin/main_6d890bc65513ef5d48a4d4a6c21499639c0709df.rs:2:9 | 2 | let mut vec = Vec::new(); | ^^^^^^^ statics cannot evaluate destructors ... 5 | }; | - value is dropped here
static MY_VEC: Vec= { let mut vec = Vec::new(); vec.push(String::new()); vec }; fn main() {}
Глобални променливи
Immutable static
За целта можем да използваме библиотеката lazy_static
use std::collections::HashMap;
use lazy_static::lazy_static;
lazy_static! {
static ref FOOS: HashMap<u32, &'static str> = {
let mut m = HashMap::new();
m.insert(0, "foo");
m.insert(1, "bar");
m.insert(2, "baz");
m
};
}
fn main() {
println!("len: {}", FOOS.len()); // инициализира се при първи достъп
println!("map: {:?}", *FOOS); // deref magic
}
error[E0432]: unresolved import `lazy_static` --> src/bin/main_2ba49c35864d5b670a584634af87e855eb49d110.rs:2:5 | 2 | use lazy_static::lazy_static; | ^^^^^^^^^^^ maybe a missing crate `lazy_static`? error: cannot determine resolution for the macro `lazy_static` --> src/bin/main_2ba49c35864d5b670a584634af87e855eb49d110.rs:4:1 | 4 | lazy_static! { | ^^^^^^^^^^^ | = note: import resolution is stuck, try simplifying macro imports error[E0425]: cannot find value `FOOS` in this scope --> src/bin/main_2ba49c35864d5b670a584634af87e855eb49d110.rs:15:25 | 15 | println!("len: {}", FOOS.len()); // инициализира се при първи достъп | ^^^^ not found in this scope error[E0425]: cannot find value `FOOS` in this scope --> src/bin/main_2ba49c35864d5b670a584634af87e855eb49d110.rs:16:28 | 16 | println!("map: {:?}", *FOOS); // deref magic | ^^^^ not found in this scope warning: unused import: `std::collections::HashMap` --> src/bin/main_2ba49c35864d5b670a584634af87e855eb49d110.rs:1:5 | 1 | use std::collections::HashMap; | ^^^^^^^^^^^^^^^^^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default
use std::collections::HashMap; use lazy_static::lazy_static; lazy_static! { static ref FOOS: HashMap= { let mut m = HashMap::new(); m.insert(0, "foo"); m.insert(1, "bar"); m.insert(2, "baz"); m }; } fn main() { println!("len: {}", FOOS.len()); // инициализира се при първи достъп println!("map: {:?}", *FOOS); // deref magic }
Отзаде се използва std::sync::Once
Глобални променливи
Mutable static
Искаме да имаме глобална променлива, чиято стойност се променя по време на изпълнение на програмата
Глобални променливи
Mutable static
Искаме да имаме глобална променлива, чиято стойност се променя по време на изпълнение на програмата
- можем да използваме
static mut
иunsafe
код, но това обикновенно е лоша идея
Глобални променливи
Mutable static
Искаме да имаме глобална променлива, чиято стойност се променя по време на изпълнение на програмата
- можем да използваме
static mut
иunsafe
код, но това обикновенно е лоша идея - просто трябва да се погрижим, че достъпването и в многонишков контекст е безопасно
Глобални променливи
Mutable static
use std::sync::Mutex;
use std::collections::HashMap;
use lazy_static::lazy_static;
// `Mutex::new` не е константна функция
lazy_static! {
static ref FOOS: Mutex<HashMap<u32, String>> = Mutex::new(HashMap::new());
}
fn main() {
FOOS.lock().unwrap().insert(0, "foo".to_string());
FOOS.lock().unwrap().insert(1, "bar".to_string());
FOOS.lock().unwrap().insert(2, "baz".to_string());
println!("{:?}", FOOS.lock().unwrap());
}
error[E0432]: unresolved import `lazy_static` --> src/bin/main_e9d73fa9a5c7c13d541bbd3f972b59ecd2346e9d.rs:3:5 | 3 | use lazy_static::lazy_static; | ^^^^^^^^^^^ maybe a missing crate `lazy_static`? error: cannot determine resolution for the macro `lazy_static` --> src/bin/main_e9d73fa9a5c7c13d541bbd3f972b59ecd2346e9d.rs:6:1 | 6 | lazy_static! { | ^^^^^^^^^^^ | = note: import resolution is stuck, try simplifying macro imports error[E0425]: cannot find value `FOOS` in this scope --> src/bin/main_e9d73fa9a5c7c13d541bbd3f972b59ecd2346e9d.rs:11:5 | 11 | FOOS.lock().unwrap().insert(0, "foo".to_string()); | ^^^^ not found in this scope error[E0425]: cannot find value `FOOS` in this scope --> src/bin/main_e9d73fa9a5c7c13d541bbd3f972b59ecd2346e9d.rs:12:5 | 12 | FOOS.lock().unwrap().insert(1, "bar".to_string()); | ^^^^ not found in this scope error[E0425]: cannot find value `FOOS` in this scope --> src/bin/main_e9d73fa9a5c7c13d541bbd3f972b59ecd2346e9d.rs:13:5 | 13 | FOOS.lock().unwrap().insert(2, "baz".to_string()); | ^^^^ not found in this scope error[E0425]: cannot find value `FOOS` in this scope --> src/bin/main_e9d73fa9a5c7c13d541bbd3f972b59ecd2346e9d.rs:15:22 | 15 | println!("{:?}", FOOS.lock().unwrap()); | ^^^^ not found in this scope warning: unused import: `std::sync::Mutex` --> src/bin/main_e9d73fa9a5c7c13d541bbd3f972b59ecd2346e9d.rs:1:5 | 1 | use std::sync::Mutex; | ^^^^^^^^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default warning: unused import: `std::collections::HashMap` --> src/bin/main_e9d73fa9a5c7c13d541bbd3f972b59ecd2346e9d.rs:2:5 | 2 | use std::collections::HashMap; | ^^^^^^^^^^^^^^^^^^^^^^^^^
use std::sync::Mutex; use std::collections::HashMap; use lazy_static::lazy_static; // `Mutex::new` не е константна функция lazy_static! { static ref FOOS: Mutex> = Mutex::new(HashMap::new()); } fn main() { FOOS.lock().unwrap().insert(0, "foo".to_string()); FOOS.lock().unwrap().insert(1, "bar".to_string()); FOOS.lock().unwrap().insert(2, "baz".to_string()); println!("{:?}", FOOS.lock().unwrap()); }
Глобални променливи
Thread local
Искаме да имаме отделна глобална променлива за всяка нишка
Глобални променливи
Thread local
Искаме да имаме отделна глобална променлива за всяка нишка
thread_local! {
static FOOS: RefCell<HashMap<u32, String>> = RefCell::new(HashMap::new());
}
fn main() {
FOOS.with(|foos| foos.borrow_mut().insert(0, "foo".to_string()));
std::thread::spawn(|| {
FOOS.with(|foos| {
foos.borrow_mut().insert(1, "bar".to_string());
foos.borrow_mut().insert(2, "baz".to_string());
println!("thread 1: {:?}", foos.borrow());
});
})
.join()
.unwrap();
FOOS.with(|foos| {
println!("thread main: {:?}", foos.borrow());
});
}
thread 1: {2: "baz", 1: "bar"} thread main: {0: "foo"}
use std::collections::HashMap; use std::cell::RefCell; thread_local! { static FOOS: RefCell> = RefCell::new(HashMap::new()); } fn main() { FOOS.with(|foos| foos.borrow_mut().insert(0, "foo".to_string())); std::thread::spawn(|| { FOOS.with(|foos| { foos.borrow_mut().insert(1, "bar".to_string()); foos.borrow_mut().insert(2, "baz".to_string()); println!("thread 1: {:?}", foos.borrow()); }); }) .join() .unwrap(); FOOS.with(|foos| { println!("thread main: {:?}", foos.borrow()); }); }
Канали
Канали
Go-lang motto
Don't communicate by sharing memory,
share memory by communicating
Канали в стандартната библиотека
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
sender.send(10).unwrap();
});
println!("received {}", receiver.recv().unwrap());
}
received 10
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::channel(); thread::spawn(move || { sender.send(10).unwrap(); }); println!("received {}", receiver.recv().unwrap()); }
Типове канали
Неограничен канал
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
std::sync::mpsc::channel()
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
std::sync::mpsc::channel()
(Sender, Receiver)
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
std::sync::mpsc::channel()
(Sender, Receiver)
- изпращане на съобщение никога не блокира
Типове канали
Неограничен канал
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
sender.send(1).unwrap();
sender.send(2).unwrap();
sender.send(3).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::channel(); thread::spawn(move || { sender.send(1).unwrap(); sender.send(2).unwrap(); sender.send(3).unwrap(); }); assert_eq!(receiver.recv().unwrap(), 1); assert_eq!(receiver.recv().unwrap(), 2); assert_eq!(receiver.recv().unwrap(), 3); }
Типове канали
Oграничен канал
- bounded / "synchronous"
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)
(SyncSender, Receiver)
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)
(SyncSender, Receiver)
- има буфер за
k
съобщения
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)
(SyncSender, Receiver)
- има буфер за
k
съобщения - изпращане на съобщения ще блокира ако буфера е пълен
Типове канали
Ограничен канал
let (sender, receiver) = mpsc::sync_channel(1);
thread::spawn(move || {
// записва съобщението и връща веднага
sender.send(1).unwrap();
// ще блокира докато главната нишка не извика `receiver.recv()`
sender.send(2).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::sync_channel(1); thread::spawn(move || { // записва съобщението и връща веднага sender.send(1).unwrap(); // ще блокира докато главната нишка не извика `receiver.recv()` sender.send(2).unwrap(); }); assert_eq!(receiver.recv().unwrap(), 1); assert_eq!(receiver.recv().unwrap(), 2); }
Множество изпращачи
let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();
thread::spawn(move || {
sender.send(1).unwrap();
sender.send(2).unwrap();
});
thread::spawn(move || {
sender2.send(3).unwrap();
sender2.send(4).unwrap();
});
println!("{} {} {} {}",
receiver.recv().unwrap(), receiver.recv().unwrap(),
receiver.recv().unwrap(), receiver.recv().unwrap());
1 2 3 4
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::channel(); let sender2 = sender.clone(); thread::spawn(move || { sender.send(1).unwrap(); sender.send(2).unwrap(); }); thread::spawn(move || { sender2.send(3).unwrap(); sender2.send(4).unwrap(); }); println!("{} {} {} {}", receiver.recv().unwrap(), receiver.recv().unwrap(), receiver.recv().unwrap(), receiver.recv().unwrap()); }
Sender
Методи
// изпраща `t`
// връща грешка ако получателят е бил унищожен
fn send(&self, t: T) -> Result<(), SendError<T>>
Sender
Методи
let (sender, receiver) = mpsc::channel();
assert_eq!(sender.send(12), Ok(()));
// унищожаваме получателя
// съобщението `12` никога няма да бъде получено
mem::drop(receiver);
// грешка - получателя е унищожен
// можем да си върнем съобщението `23` от грешката
assert_eq!(sender.send(23), Err(SendError(23)));
use std::mem; use std::sync::mpsc::{self, SendError}; fn main() { let (sender, receiver) = mpsc::channel(); assert_eq!(sender.send(12), Ok(())); // унищожаваме получателя // съобщението `12` никога няма да бъде получено mem::drop(receiver); // грешка - получателя е унищожен // можем да си върнем съобщението `23` от грешката assert_eq!(sender.send(23), Err(SendError(23))); }
SyncSender
Методи
// блокира ако буфера е пълен
fn send(&self, t: T) -> Result<(), SendError<T>>
// връща грешка ако буфера е пълен или получателят е бил унищожен
fn try_send(&self, t: T) -> Result<(), TrySendError<T>>
SyncSender
Методи
let (sender, receiver) = mpsc::sync_channel(1);
assert_eq!(sender.try_send(12), Ok(()));
assert_eq!(sender.try_send(23), Err(TrySendError::Full(23)));
mem::drop(receiver);
assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23)));
use std::mem; use std::sync::mpsc::{self, TrySendError}; fn main() { let (sender, receiver) = mpsc::sync_channel(1); assert_eq!(sender.try_send(12), Ok(())); assert_eq!(sender.try_send(23), Err(TrySendError::Full(23))); mem::drop(receiver); assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23))); }
Множество получатели
Множество получатели
- не може - каналите са multi-producer, single-consumer
Множество получатели
- не може - каналите са multi-producer, single-consumer
Receiver
не може да се клонира
Множество получатели
- не може - каналите са multi-producer, single-consumer
Receiver
не може да се клонираReceiver
eSend
, но не еSync
Receiver
Методи
// блокира докато не получи съобщение
// връща грешка ако всички изпращачи са унищожени
fn recv(&self) -> Result<T, RecvError>
// не блокира
// връща грешка ако всички изпращачи са унищожени или няма съобщение в опашката
fn try_recv(&self) -> Result<T, TryRecvError>
// блокира за определено време
// връща грешка ако всички изпращачи са унищожени или е изтекло времето
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>
Receiver
Методи
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
for i in (0..50).rev() {
sender.send(i).unwrap();
}
});
while let Ok(msg) = receiver.recv() {
println!("received {}", msg);
}
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::channel(); thread::spawn(move || { for i in (0..50).rev() { sender.send(i).unwrap(); } }); while let Ok(msg) = receiver.recv() { println!("received {}", msg); } }
Receiver
Итератори
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
for i in (0..50).rev() {
sender.send(i).unwrap();
}
});
for msg in receiver.iter() {
println!("received {}", msg);
}
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::channel(); thread::spawn(move || { for i in (0..50).rev() { sender.send(i).unwrap(); } }); for msg in receiver.iter() { println!("received {}", msg); } }
Външни библиотеки
Crossbeam channel
- https://crates.io/crates/crossbeam-channel
- https://docs.rs/crossbeam-channel/
- Multi-producer multi-consumer (mpmc) channel
- с опция за select по няколко канала
Външни библиотеки
Crossbeam
- https://crates.io/crates/crossbeam
- https://docs.rs/crossbeam/
- колекция от алгоритми и структури от данни
- lock-free структури от данни - опашка, стек, deque
- crossbeam_channel
- scoped threads
- и доста utilities
Външни библиотеки
Rayon
- https://crates.io/crates/rayon
- https://docs.rs/rayon/
- библиотека за паралелизъм по данни
- threadpool
- parallel iterators
- split/join