Use the publish-subscribe mechanism to handle collider removals across pipelines.

This commit is contained in:
Crozet Sébastien
2020-10-05 19:04:18 +02:00
parent 2d0a888484
commit 93aa7b6e1e
11 changed files with 236 additions and 174 deletions

View File

@@ -2,3 +2,4 @@
pub mod arena;
pub(crate) mod graph;
pub mod pubsub;

View File

@@ -1,16 +1,18 @@
//! Publish-subscribe mechanism for internal events.
use serde::export::PhantomData;
use std::collections::VecDeque;
/// The position of a subscriber on a pub-sub queue.
#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
pub struct PubSubCursor {
pub struct PubSubCursor<T> {
// Index of the next message to read.
id: u32,
next: u32,
_phantom: PhantomData<T>,
}
impl PubSubCursor {
impl<T> PubSubCursor<T> {
fn id(&self, num_deleted: u32) -> usize {
(self.id - num_deleted) as usize
}
@@ -53,18 +55,25 @@ impl<T> PubSub<T> {
/// Subscribe to the queue.
///
/// A subscription cannot be cancelled.
pub fn subscribe(&mut self) -> PubSubCursor {
pub fn subscribe(&mut self) -> PubSubCursor<T> {
let cursor = PubSubCursor {
next: self.messages.len() as u32 + self.deleted_messages,
id: self.offsets.len() as u32 + self.deleted_offsets,
_phantom: PhantomData,
};
self.offsets.push_back(cursor.next);
cursor
}
/// Read the i-th message not yet read by the given subsciber.
pub fn read_ith(&self, cursor: &PubSubCursor<T>, i: usize) -> Option<&T> {
self.messages
.get(cursor.next(self.deleted_messages) as usize + i)
}
/// Get the messages not yet read by the given subscriber.
pub fn read(&self, cursor: &PubSubCursor) -> impl Iterator<Item = &T> {
pub fn read(&self, cursor: &PubSubCursor<T>) -> impl Iterator<Item = &T> {
let next = cursor.next(self.deleted_messages);
// TODO: use self.queue.range(next..) once it is stabilised.
@@ -77,7 +86,7 @@ impl<T> PubSub<T> {
/// Makes the given subscribe acknowledge all the messages in the queue.
///
/// A subscriber cannot read acknowledged messages any more.
pub fn ack(&mut self, cursor: &mut PubSubCursor) {
pub fn ack(&mut self, cursor: &mut PubSubCursor<T>) {
// Update the cursor.
cursor.next = self.messages.len() as u32 + self.deleted_messages;
self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX;