Don't let the PubSub internal offsets overflow + fix some warnings.
This commit is contained in:
@@ -3,16 +3,28 @@
|
||||
use serde::export::PhantomData;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
/// The position of a subscriber on a pub-sub queue.
|
||||
/// A permanent subscription to a pub-sub queue.
|
||||
#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
|
||||
pub struct PubSubCursor<T> {
|
||||
// Index of the next message to read.
|
||||
pub struct Subscription<T> {
|
||||
// Position on the cursor array.
|
||||
id: u32,
|
||||
next: u32,
|
||||
_phantom: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T> PubSubCursor<T> {
|
||||
#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
|
||||
struct PubSubCursor {
|
||||
// Position on the offset array.
|
||||
id: u32,
|
||||
// Index of the next message to read.
|
||||
// NOTE: Having this here is not actually necessary because
|
||||
// this value is supposed to be equal to `offsets[self.id]`.
|
||||
// However, we keep it because it lets us avoid one lookup
|
||||
// on the `offsets` array inside of message-polling loops
|
||||
// based on `read_ith`.
|
||||
next: u32,
|
||||
}
|
||||
|
||||
impl PubSubCursor {
|
||||
fn id(&self, num_deleted: u32) -> usize {
|
||||
(self.id - num_deleted) as usize
|
||||
}
|
||||
@@ -29,6 +41,7 @@ pub struct PubSub<T> {
|
||||
deleted_offsets: u32,
|
||||
messages: VecDeque<T>,
|
||||
offsets: VecDeque<u32>,
|
||||
cursors: Vec<PubSubCursor>,
|
||||
}
|
||||
|
||||
impl<T> PubSub<T> {
|
||||
@@ -39,9 +52,24 @@ impl<T> PubSub<T> {
|
||||
deleted_messages: 0,
|
||||
messages: VecDeque::new(),
|
||||
offsets: VecDeque::new(),
|
||||
cursors: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn reset_shifts(&mut self) {
|
||||
for offset in &mut self.offsets {
|
||||
*offset -= self.deleted_messages;
|
||||
}
|
||||
|
||||
for cursor in &mut self.cursors {
|
||||
cursor.id -= self.deleted_offsets;
|
||||
cursor.next -= self.deleted_messages;
|
||||
}
|
||||
|
||||
self.deleted_offsets = 0;
|
||||
self.deleted_messages = 0;
|
||||
}
|
||||
|
||||
/// Publish a new message.
|
||||
pub fn publish(&mut self, message: T) {
|
||||
if self.offsets.is_empty() {
|
||||
@@ -55,25 +83,33 @@ impl<T> PubSub<T> {
|
||||
/// Subscribe to the queue.
|
||||
///
|
||||
/// A subscription cannot be cancelled.
|
||||
pub fn subscribe(&mut self) -> PubSubCursor<T> {
|
||||
#[must_use]
|
||||
pub fn subscribe(&mut self) -> Subscription<T> {
|
||||
let cursor = PubSubCursor {
|
||||
next: self.messages.len() as u32 + self.deleted_messages,
|
||||
id: self.offsets.len() as u32 + self.deleted_offsets,
|
||||
};
|
||||
|
||||
let subscription = Subscription {
|
||||
id: self.cursors.len() as u32,
|
||||
_phantom: PhantomData,
|
||||
};
|
||||
|
||||
self.offsets.push_back(cursor.next);
|
||||
cursor
|
||||
self.cursors.push(cursor);
|
||||
subscription
|
||||
}
|
||||
|
||||
/// Read the i-th message not yet read by the given subsciber.
|
||||
pub fn read_ith(&self, cursor: &PubSubCursor<T>, i: usize) -> Option<&T> {
|
||||
pub fn read_ith(&self, sub: &Subscription<T>, i: usize) -> Option<&T> {
|
||||
let cursor = &self.cursors[sub.id as usize];
|
||||
self.messages
|
||||
.get(cursor.next(self.deleted_messages) as usize + i)
|
||||
}
|
||||
|
||||
/// Get the messages not yet read by the given subscriber.
|
||||
pub fn read(&self, cursor: &PubSubCursor<T>) -> impl Iterator<Item = &T> {
|
||||
pub fn read(&self, sub: &Subscription<T>) -> impl Iterator<Item = &T> {
|
||||
let cursor = &self.cursors[sub.id as usize];
|
||||
let next = cursor.next(self.deleted_messages);
|
||||
|
||||
// TODO: use self.queue.range(next..) once it is stabilised.
|
||||
@@ -86,11 +122,14 @@ impl<T> PubSub<T> {
|
||||
/// Makes the given subscribe acknowledge all the messages in the queue.
|
||||
///
|
||||
/// A subscriber cannot read acknowledged messages any more.
|
||||
pub fn ack(&mut self, cursor: &mut PubSubCursor<T>) {
|
||||
pub fn ack(&mut self, sub: &Subscription<T>) {
|
||||
// Update the cursor.
|
||||
cursor.next = self.messages.len() as u32 + self.deleted_messages;
|
||||
let cursor = &mut self.cursors[sub.id as usize];
|
||||
|
||||
self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX;
|
||||
cursor.id = self.offsets.len() as u32 + self.deleted_offsets;
|
||||
|
||||
cursor.next = self.messages.len() as u32 + self.deleted_messages;
|
||||
self.offsets.push_back(cursor.next);
|
||||
|
||||
// Now clear the messages we don't need to
|
||||
@@ -110,6 +149,12 @@ impl<T> PubSub<T> {
|
||||
}
|
||||
|
||||
self.deleted_messages += num_to_delete;
|
||||
|
||||
if self.deleted_messages > u32::MAX / 2 || self.deleted_offsets > u32::MAX / 2 {
|
||||
// Don't let the deleted_* shifts grow indefinitely otherwise
|
||||
// they will end up overflowing, breaking everything.
|
||||
self.reset_shifts();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user