1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use super::immediate::ImmediateWorker;
use super::{RowData, Worker};
use crate::decoder::MAX_COMPONENTS;
use crate::error::Result;
use std::{
mem,
sync::mpsc::{self, Receiver, Sender},
};
#[allow(dead_code)]
pub fn with_multithreading<T>(f: impl FnOnce(&mut dyn Worker) -> T) -> T {
let mut worker = MpscWorker::default();
f(&mut worker)
}
enum WorkerMsg {
Start(RowData),
AppendRow(Vec<i16>),
GetResult(Sender<Vec<u8>>),
}
#[derive(Default)]
pub struct MpscWorker {
senders: [Option<Sender<WorkerMsg>>; MAX_COMPONENTS],
}
impl MpscWorker {
fn start_with(
&mut self,
row_data: RowData,
spawn_worker: impl FnOnce(usize) -> Result<Sender<WorkerMsg>>,
) -> Result<()> {
let component = row_data.index;
if let None = self.senders[component] {
let sender = spawn_worker(component)?;
self.senders[component] = Some(sender);
}
let sender = self.senders[component].as_mut().unwrap();
sender
.send(WorkerMsg::Start(row_data))
.expect("jpeg-decoder worker thread error");
Ok(())
}
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
let component = row.0;
let sender = self.senders[component].as_mut().unwrap();
sender
.send(WorkerMsg::AppendRow(row.1))
.expect("jpeg-decoder worker thread error");
Ok(())
}
fn get_result_with(
&mut self,
index: usize,
collect: impl FnOnce(Receiver<Vec<u8>>) -> Vec<u8>,
) -> Result<Vec<u8>> {
let (tx, rx) = mpsc::channel();
let sender = mem::take(&mut self.senders[index]).unwrap();
sender
.send(WorkerMsg::GetResult(tx))
.expect("jpeg-decoder worker thread error");
Ok(collect(rx))
}
}
impl Worker for MpscWorker {
fn start(&mut self, row_data: RowData) -> Result<()> {
self.start_with(row_data, spawn_worker_thread)
}
fn append_row(&mut self, row: (usize, Vec<i16>)) -> Result<()> {
MpscWorker::append_row(self, row)
}
fn get_result(&mut self, index: usize) -> Result<Vec<u8>> {
self.get_result_with(index, collect_worker_thread)
}
}
fn create_worker() -> (Sender<WorkerMsg>, impl FnOnce() + 'static) {
let (tx, rx) = mpsc::channel();
let closure = move || {
let mut worker = ImmediateWorker::new_immediate();
while let Ok(message) = rx.recv() {
match message {
WorkerMsg::Start(mut data) => {
data.index = 0;
worker.start_immediate(data);
}
WorkerMsg::AppendRow(row) => {
worker.append_row_immediate((0, row));
}
WorkerMsg::GetResult(chan) => {
let _ = chan.send(worker.get_result_immediate(0));
break;
}
}
}
};
(tx, closure)
}
fn spawn_worker_thread(component: usize) -> Result<Sender<WorkerMsg>> {
let (tx, worker) = create_worker();
let thread_builder =
std::thread::Builder::new().name(format!("worker thread for component {}", component));
thread_builder.spawn(worker)?;
Ok(tx)
}
fn collect_worker_thread(rx: Receiver<Vec<u8>>) -> Vec<u8> {
rx.recv().expect("jpeg-decoder worker thread error")
}