1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
//! The `event_bus` module provides an [EventBus] that supports publish-subscribe-style
//! communication between different tasks.

use alloc::{boxed::Box, sync::Arc, vec::Vec};
use core::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

use bitflags::bitflags;

use crate::sync::Mutex;

bitflags! {
    #[derive(Default, Copy, Clone)]
    /// The `Event` struct represents events that can be subscribed to on [EventBus].
  pub struct Event: u32 {
    /// Indicates that a child process has quit.
    const CHILD_PROCESS_QUIT = 1 << 0;
  }
}

type EventCallback = Box<dyn Fn(Event) -> bool + Send>;

/// The `EventBus` structsupports publish-subscribe-style communication between different tasks.
#[derive(Default)]
pub struct EventBus {
    event: Event,
    callback_list: Vec<EventCallback>,
}

impl EventBus {
    /// Creates a new event bus.
    pub fn new() -> Arc<Mutex<Self>> {
        Arc::new(Mutex::new(Self::default()))
    }

    /// Publishes an event on the event bus.
    pub fn push(&mut self, event: Event) {
        self.event.set(event, true);
        for callback in &self.callback_list {
            callback(event);
        }
    }

    /// Clears an event from the event bus.
    pub fn clear(&mut self, event: Event) {
        self.event.remove(event);
    }

    /// Subscribes to events on the event bus and executes the given callback function when an event
    /// is published.
    pub fn subscribe(&mut self, callback: EventCallback) {
        self.callback_list.push(callback);
    }
}

/// The `EventBusFuture` struct is a future that completes when a specified event is published on
/// an [EventBus].
struct EventBusFuture {
    event_bus: Arc<Mutex<EventBus>>,
    subscribed_event: Event,
}

impl Future for EventBusFuture {
    type Output = Event;

    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Event> {
        let mut event_bus = self.event_bus.lock();
        if event_bus.event.contains(self.subscribed_event) {
            return Poll::Ready(event_bus.event);
        }

        let subscribed_event = self.subscribed_event;
        let waker = context.waker().clone();
        event_bus.subscribe(Box::new(move |event| {
            if event.contains(subscribed_event) {
                waker.wake_by_ref();
                true
            } else {
                false
            }
        }));
        Poll::Pending
    }
}

/// Returns a future that completes when a specified event is published on an [EventBus].
pub fn wait_for_event(
    event_bus: Arc<Mutex<EventBus>>,
    subscribed_event: Event,
) -> impl Future<Output = Event> {
    EventBusFuture {
        event_bus,
        subscribed_event,
    }
}