lace.events: Add event sequence to ensure events are processed in order.

This commit is contained in:
Rod Kay
2024-10-16 19:37:49 +11:00
parent ad2f92b791
commit 2a17b624c1
14 changed files with 129 additions and 79 deletions

View File

@@ -33,7 +33,7 @@ is
private
use ada.Strings.unbounded;
pragma Suppress (Container_Checks); -- Suppress expensive tamper checks.
-- pragma Suppress (Container_Checks); -- Suppress expensive tamper checks.
package Observer is new event.make_Observer (Any.limited_item);

View File

@@ -32,7 +32,7 @@ is
private
use ada.Strings.unbounded;
pragma Suppress (Container_Checks); -- Suppress expensive tamper checks.
-- pragma Suppress (Container_Checks); -- Suppress expensive tamper checks.
package Observer is new event.make_Observer (Any.limited_item);

View File

@@ -36,7 +36,7 @@ private
use ada.Strings.unbounded;
pragma Suppress (Container_Checks); -- Suppress expensive tamper checks.
-- pragma Suppress (Container_Checks); -- Suppress expensive tamper checks.
package Subject is new event.make_Subject (Any.limited_item);

View File

@@ -39,7 +39,7 @@ is
private
use ada.Strings.unbounded;
pragma Suppress (Container_Checks); -- Suppress expensive tamper checks.
-- pragma Suppress (Container_Checks); -- Suppress expensive tamper checks.
package Subject is new event.make_Subject (Any.limited_item);

View File

@@ -36,7 +36,7 @@ is
private
use ada.Strings.unbounded;
pragma Suppress (Container_Checks); -- Suppress expensive tamper checks.
-- pragma Suppress (Container_Checks); -- Suppress expensive tamper checks.
package Subject is new event.make_Subject (Any.limited_item);

View File

@@ -29,6 +29,11 @@ is
is
begin
Self.Responses.add (Self, the_Response, to_Kind, from_Subject);
if not Self.sequence_Id_Map.contains (from_Subject)
then
Self.sequence_Id_Map.insert (from_Subject, 0);
end if;
end add;

View File

@@ -4,6 +4,7 @@ with
private
with
lace.event.Containers,
ada.Containers.indefinite_hashed_Maps,
ada.Strings.Hash;
@@ -57,7 +58,7 @@ is
private
pragma Suppress (Container_Checks); -- Suppress expensive tamper checks.
-- pragma Suppress (Container_Checks); -- Suppress expensive tamper checks.
----------------------
@@ -135,7 +136,8 @@ private
and Observer.item
with
record
Responses : safe_Responses;
Responses : safe_Responses;
sequence_Id_Map : Containers.name_Map_of_sequence_Id; -- Contains the next expected sequence ID from each subject.
end record;
end lace.event.make_Observer;

View File

@@ -85,7 +85,7 @@ is
private
pragma suppress (container_Checks); -- Suppress expensive tamper checks.
-- pragma suppress (container_Checks); -- Suppress expensive tamper checks.
--------------------------
@@ -144,7 +144,7 @@ private
with
record
safe_Observers : make_Subject.safe_Observers;
sequence_Id_Map : Containers.safe_sequence_Id_Map;
sequence_Id_Map : Containers.safe_sequence_Id_Map; -- Contains the next send sequence ID for each observer.
Emitter : event_Emitter_view;
Sender : event_Sender_view;
end record;

View File

@@ -34,6 +34,14 @@ is
next_Id := next_Id + 1;
end get_Next;
procedure decrement (for_Name : in String)
is
next_Id : name_Maps_of_sequence_Id.Reference_type renames the_Map (for_Name);
begin
next_Id := next_Id - 1;
end decrement;
end safe_sequence_Id_Map;

View File

@@ -1,9 +1,10 @@
with
ada.Strings.Hash,
ada.Containers.indefinite_hashed_Maps;
ada.Containers.indefinite_hashed_Maps,
ada.Containers.indefinite_Holders;
private
-- private
package lace.event.Containers
--
-- Common containers.
@@ -13,6 +14,13 @@ is
pragma suppress (container_Checks); -- Suppress expensive tamper checks.
----------------
-- Event holder.
--
package event_Holders is new ada.Containers.indefinite_Holders (Event.item'Class);
subtype event_Holder is event_Holders.Holder;
---------------------------
-- Name map of sequence Id.
--
@@ -32,8 +40,10 @@ is
procedure add (Name : in String);
procedure rid (Name : in String);
procedure get_Next (Id : out event.sequence_Id;
for_Name : in String);
procedure get_Next (Id : out event.sequence_Id;
for_Name : in String);
procedure decrement (for_Name : in String);
private
the_Map : name_Map_of_sequence_Id;
end safe_sequence_Id_Map;

View File

@@ -1,10 +1,12 @@
with
lace.Observer,
lace.Event.Containers,
lace.Event.utility,
ada.Text_IO,
ada.Exceptions,
ada.unchecked_Deallocation;
ada.unchecked_Deallocation,
ada.Containers.Vectors;
package body lace.event_Emitter
@@ -64,7 +66,7 @@ is
is
Myself : Emitter_view;
s_Id : event.sequence_Id;
Event : event_Holder;
Event : lace.event.Containers.event_Holder;
the_Observer : lace.Observer.view;
subject_Name : string_Holder;
emitter_Pool : safe_Emitters_view;

View File

@@ -5,9 +5,9 @@ with
private
with
lace.Subject,
lace.event.Containers,
ada.Containers.indefinite_Holders,
ada.Containers.indefinite_Vectors,
ada.Containers.Vectors;
ada.Containers.indefinite_Vectors;
private
@@ -39,22 +39,13 @@ private
---------------
--- Containers.
--
use type Event.item'Class;
package event_Holders is new ada.Containers.Indefinite_Holders (Event.item'Class);
subtype event_Holder is event_Holders.Holder;
-- type event_Details is
-- record
-- Sequence : event.sequence_Id;
-- Event : event_Holder;
-- end record;
-- use type Event.item'Class;
-- package event_Holders is new ada.Containers.Indefinite_Holders (Event.item'Class);
-- subtype event_Holder is event_Holders.Holder;
use type lace.Event.item'Class;
package event_Vectors is new ada.Containers.indefinite_Vectors (Positive,
lace.Event.item'Class);
-- package event_Vectors is new ada.Containers.Vectors (Positive,
-- event_Details);
subtype event_Vector is event_Vectors.Vector;
@@ -67,7 +58,6 @@ private
type safe_Events
is
procedure add (new_Event : in lace.Event.item'Class);
-- Sequence : in event.sequence_Id);
procedure get (the_Events : out event_Vector);
function is_Empty return Boolean;

View File

@@ -3,6 +3,8 @@ with
lace.Event.utility,
ada.unchecked_Deallocation;
with ada.Text_IO; use ada.Text_IO;
package body lace.event.make_Observer.deferred
is
@@ -28,10 +30,13 @@ is
Sequence : in sequence_Id)
is
begin
Self.pending_Events.add (the_Event, from_Subject);
Self.pending_Events.add (the_Event,
Sequence,
from_Subject);
end receive;
overriding
procedure respond (Self : access Item)
is
@@ -44,7 +49,8 @@ is
the_Events : in Event_Vector;
from_subject_Name : in Event.subject_Name)
is
Cursor : Event_Vectors.Cursor := the_Events.First;
expected_Sequence : Containers.name_Maps_of_sequence_Id.Reference_type renames Self.sequence_Id_Map (from_subject_Name);
Cursor : Event_Vectors.Cursor := the_Events.First;
begin
while has_Element (Cursor)
loop
@@ -54,45 +60,59 @@ is
ada.Containers;
use type Observer.view;
the_Event : constant Event.item'Class := Element (Cursor);
Response : constant event_response_Maps.Cursor := the_Responses.find (to_Kind (the_Event'Tag));
the_Event : constant Event.item'Class := Element (Cursor).Event.Element;
the_Sequence : constant sequence_Id := Element (Cursor).Sequence;
Response : constant event_response_Maps.Cursor := the_Responses.find (to_Kind (the_Event'Tag));
begin
if has_Element (Response)
-- put_Line ("observer " & my_Name & " from " & from_subject_Name & " seq" & the_Sequence'Image & " exp seq " & sequence_Id' (expected_Sequence)'Image);
if the_Sequence = expected_Sequence
then
Element (Response).respond (the_Event);
expected_Sequence := expected_Sequence + 1;
if Observer.Logger /= null
if has_Element (Response)
then
Observer.Logger.log_Response (Element (Response),
Observer.view (Self),
the_Event,
from_subject_Name);
end if;
Element (Response).respond (the_Event);
elsif Self.Responses.relay_Target /= null
then
-- Self.relay_Target.notify (the_Event, from_Subject_Name); -- todo: Re-enable relayed events.
if Observer.Logger /= null
then
Observer.Logger.log_Response (Element (Response),
Observer.view (Self),
the_Event,
from_subject_Name);
end if;
if Observer.Logger /= null
elsif Self.Responses.relay_Target /= null
then
Observer.Logger.log ("[Warning] ~ Relayed events are currently disabled.");
-- Self.relay_Target.notify (the_Event, from_Subject_Name); -- todo: Re-enable relayed events.
if Observer.Logger /= null
then
Observer.Logger.log ("[Warning] ~ Relayed events are currently disabled.");
else
raise program_Error with "Event relaying is currently disabled.";
end if;
else
raise program_Error with "Event relaying is currently disabled.";
if Observer.Logger /= null
then
Observer.Logger.log ("[Warning] ~ Observer "
& my_Name
& " has no response to " & Name_of (the_Event)
& " from " & from_subject_Name & ".");
Observer.Logger.log (" Count of responses =>"
& the_Responses.Length'Image);
else
raise program_Error with "Observer " & my_Name & " has no response to " & Name_of (the_Event)
& " from " & from_subject_Name & ".";
end if;
end if;
else
if Observer.Logger /= null
then
Observer.Logger.log ("[Warning] ~ Observer "
& my_Name
& " has no response to " & Name_of (the_Event)
& " from " & from_subject_Name & ".");
Observer.Logger.log (" Count of responses =>"
& the_Responses.Length'Image);
else
raise program_Error with "Observer " & my_Name & " has no response to " & Name_of (the_Event)
& " from " & from_subject_Name & ".";
end if;
Self.receive (the_Event => the_Event, -- Return the event to the event queue for later processing,
from_Subject => from_subject_Name, -- after the missing sequence event has arrived.
Sequence => the_Sequence);
end if;
end;
@@ -103,26 +123,23 @@ is
the_subject_Events : constant subject_events_Pairs := Self.pending_Events.fetch;
-- the_subject_Events : subject_events_Pairs (1 .. 5_000);
-- Count : Natural;
begin
-- Self.pending_Events.fetch (the_subject_Events, Count);
-- for i in 1 .. Count
for i in the_subject_Events'Range
loop
declare
function Less_than (L, R : in event_sequence_Pair) return Boolean is (L.Sequence < R.Sequence);
package Sorter is new event_Vectors.generic_Sorting ("<" => Less_than);
procedure deallocate is new ada.unchecked_Deallocation (String, String_view);
subject_Name : String_view := the_subject_Events (i).Subject;
the_Events : Event_vector renames the_subject_Events (i).Events;
subject_Name : String_view := the_subject_Events (i).Subject;
the_Events : Event_vector := the_subject_Events (i).Events;
begin
if Self.Responses.contains (subject_Name.all)
then
actuate (Self.Responses.Element (subject_Name.all),
the_Events,
subject_Name.all);
Sorter.sort (the_Events);
actuate (Self.Responses.Element (subject_Name.all),
the_Events,
subject_Name.all);
else
declare
Message : constant String := "*** Warning *** ~ " & my_Name & " has no responses for events from " & subject_Name.all & ".";
@@ -149,10 +166,13 @@ is
protected
body safe_Events
is
procedure add (the_Event : in Event.item'Class)
procedure add (the_Event : in Event.item'Class;
Sequence : in sequence_Id)
is
use Containers.event_Holders;
begin
the_Events.append (the_Event);
the_Events.append (event_sequence_Pair' (to_Holder (the_Event),
Sequence));
end add;
@@ -172,6 +192,7 @@ is
body safe_subject_Map_of_safe_events
is
procedure add (the_Event : in Event.item'Class;
Sequence : in sequence_Id;
from_Subject : in String)
is
begin
@@ -181,7 +202,8 @@ is
new safe_Events);
end if;
the_Map.Element (from_Subject).add (the_Event);
the_Map.Element (from_Subject).add (the_Event,
Sequence);
end add;

View File

@@ -1,6 +1,7 @@
private
with
ada.Containers.indefinite_Vectors,
lace.Event.Containers,
ada.Containers.Vectors,
ada.Containers.indefinite_hashed_Maps,
ada.Strings.Hash;
@@ -38,12 +39,20 @@ is
private
pragma Suppress (Container_Checks); -- Suppress expensive tamper checks.
-- pragma Suppress (Container_Checks); -- Suppress expensive tamper checks.
type event_sequence_Pair is
record
Event : Containers.event_Holder;
Sequence : sequence_Id;
end record;
----------------
-- Event Vectors
--
package event_Vectors is new ada.Containers.indefinite_Vectors (Positive, Event.item'Class);
package event_Vectors is new ada.Containers.Vectors (Positive, event_sequence_Pair);
subtype event_Vector is event_Vectors.Vector;
type event_Vector_view is access all event_Vector;
@@ -54,7 +63,8 @@ private
protected
type safe_Events
is
procedure add (the_Event : in Event.item'Class);
procedure add (the_Event : in Event.item'Class;
Sequence : in sequence_Id);
procedure fetch (all_Events : out event_Vector);
private
the_Events : event_Vector;
@@ -95,6 +105,7 @@ private
type safe_subject_Map_of_safe_events
is
procedure add (the_Event : in Event.item'Class;
Sequence : in sequence_Id;
from_Subject : in String);
function fetch return subject_events_Pairs;