1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use gateway::Shard;
use internal::prelude::*;
use parking_lot::Mutex as ParkingLotMutex;
use std::collections::HashMap;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use super::super::super::EventHandler;
use super::{
ShardId,
ShardManagerMessage,
ShardQueuerMessage,
ShardRunner,
ShardRunnerInfo,
};
use typemap::ShareMap;
#[cfg(feature = "framework")]
use framework::Framework;
pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> {
pub data: Arc<ParkingLotMutex<ShareMap>>,
pub event_handler: Arc<H>,
#[cfg(feature = "framework")]
pub framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
pub last_start: Option<Instant>,
pub manager_tx: Sender<ShardManagerMessage>,
pub runners: Arc<ParkingLotMutex<HashMap<ShardId, ShardRunnerInfo>>>,
pub rx: Receiver<ShardQueuerMessage>,
pub token: Arc<Mutex<String>>,
pub ws_url: Arc<Mutex<String>>,
}
impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
pub fn run(&mut self) {
loop {
let msg = match self.rx.recv() {
Ok(msg) => msg,
Err(_) => {
break;
}
};
match msg {
ShardQueuerMessage::Shutdown => break,
ShardQueuerMessage::Start(shard_id, shard_total) => {
self.check_last_start();
if let Err(why) = self.start(shard_id, shard_total) {
warn!("Err starting shard {}: {:?}", shard_id, why);
}
self.last_start = Some(Instant::now());
},
}
}
}
fn check_last_start(&mut self) {
let instant = match self.last_start {
Some(instant) => instant,
None => return,
};
let duration = Duration::from_secs(5);
let elapsed = instant.elapsed();
if elapsed >= duration {
return;
}
let to_sleep = duration - elapsed;
thread::sleep(to_sleep);
}
fn start(&mut self, shard_id: ShardId, shard_total: ShardId) -> Result<()> {
let shard_info = [shard_id.0, shard_total.0];
let shard = Shard::new(self.ws_url.clone(), self.token.clone(), shard_info)?;
let locked = Arc::new(ParkingLotMutex::new(shard));
let mut runner = feature_framework! {{
ShardRunner::new(
locked.clone(),
self.manager_tx.clone(),
self.framework.clone(),
self.data.clone(),
self.event_handler.clone(),
)
} else {
ShardRunner::new(
locked.clone(),
self.manager_tx.clone(),
self.data.clone(),
self.event_handler.clone(),
)
}};
let runner_info = ShardRunnerInfo {
runner_tx: runner.runner_tx(),
shard: locked,
};
thread::spawn(move || {
let _ = runner.run();
});
self.runners.lock().insert(shard_id, runner_info);
Ok(())
}
}