1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use gateway::Shard;
use internal::prelude::*;
use parking_lot::Mutex as ParkingLotMutex;
use std::collections::HashMap;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use super::super::super::EventHandler;
use super::{
    ShardId,
    ShardManagerMessage,
    ShardQueuerMessage,
    ShardRunner,
    ShardRunnerInfo,
};
use typemap::ShareMap;

#[cfg(feature = "framework")]
use framework::Framework;

/// The shard queuer is a simple loop that runs indefinitely to manage the
/// startup of shards.
///
/// A shard queuer instance _should_ be run in its own thread, due to the
/// blocking nature of the loop itself as well as a 5 second thread sleep
/// between shard starts.
pub struct ShardQueuer<H: EventHandler + Send + Sync + 'static> {
    pub data: Arc<ParkingLotMutex<ShareMap>>,
    pub event_handler: Arc<H>,
    #[cfg(feature = "framework")]
    pub framework: Arc<Mutex<Option<Box<Framework + Send>>>>,
    pub last_start: Option<Instant>,
    pub manager_tx: Sender<ShardManagerMessage>,
    pub runners: Arc<ParkingLotMutex<HashMap<ShardId, ShardRunnerInfo>>>,
    pub rx: Receiver<ShardQueuerMessage>,
    pub token: Arc<Mutex<String>>,
    pub ws_url: Arc<Mutex<String>>,
}

impl<H: EventHandler + Send + Sync + 'static> ShardQueuer<H> {
    pub fn run(&mut self) {
        loop {
            let msg = match self.rx.recv() {
                Ok(msg) => msg,
                Err(_) => {
                    break;
                }
            };

            match msg {
                ShardQueuerMessage::Shutdown => break,
                ShardQueuerMessage::Start(shard_id, shard_total) => {
                    self.check_last_start();

                    if let Err(why) = self.start(shard_id, shard_total) {
                        warn!("Err starting shard {}: {:?}", shard_id, why);
                    }

                    self.last_start = Some(Instant::now());
                },
            }
        }
    }

    fn check_last_start(&mut self) {
        let instant = match self.last_start {
            Some(instant) => instant,
            None => return,
        };

        // We must wait 5 seconds between IDENTIFYs to avoid session
        // invalidations.
        let duration = Duration::from_secs(5);
        let elapsed = instant.elapsed();

        if elapsed >= duration {
            return;
        }

        let to_sleep = duration - elapsed;

        thread::sleep(to_sleep);
    }

    fn start(&mut self, shard_id: ShardId, shard_total: ShardId) -> Result<()> {
        let shard_info = [shard_id.0, shard_total.0];
        let shard = Shard::new(self.ws_url.clone(), self.token.clone(), shard_info)?;
        let locked = Arc::new(ParkingLotMutex::new(shard));

        let mut runner = feature_framework! {{
            ShardRunner::new(
                locked.clone(),
                self.manager_tx.clone(),
                self.framework.clone(),
                self.data.clone(),
                self.event_handler.clone(),
            )
        } else {
            ShardRunner::new(
                locked.clone(),
                self.manager_tx.clone(),
                self.data.clone(),
                self.event_handler.clone(),
            )
        }};

        let runner_info = ShardRunnerInfo {
            runner_tx: runner.runner_tx(),
            shard: locked,
        };

        thread::spawn(move || {
            let _ = runner.run();
        });

        self.runners.lock().insert(shard_id, runner_info);

        Ok(())
    }
}