/* Deferred Messaging Module jhaley 01/19/2011 This set of classes can be used to pass messages between arbitrary objects. The messages are deferrable, with an optional maximum number of dispatches. When a destination responds to a message, either by accepting or refusing it, the source of the message will be notified. */ #include "tcpip_messaging.h" #include "deferred_messages.h" // // classDeferredMessageListener Constructor / Destructor // classDeferredMessageListener::classDeferredMessageListener() : queues() { } classDeferredMessageListener::~classDeferredMessageListener() { } // // classDeferredMessageListener::ShutDown // // Cancel all messages for and from this listener, since this listener is // being deleted. // void classDeferredMessageListener::ShutDown() { set::iterator i_que = queues.begin(); while (i_que != queues.end()) { (*i_que)->RemoveListener(this); i_que = queues.begin(); } } // // classDeferredMessageListener::AddQueue // // Keep track of the queues this listener can have messages on. // void classDeferredMessageListener::AddQueue(classDeferredMessageQueue *queue) { if (queues.find(queue) == queues.end()) queues.insert(queue); } // // classDeferredMessageListener::RemoveQueue // // Remove a queue from the listener's list // void classDeferredMessageListener::RemoveQueue(classDeferredMessageQueue *queue) { set::iterator i_que = queues.find(queue); if (i_que != queues.end()) queues.erase(i_que); } classDeferredMessage::classDeferredMessage() : recipient(NULL), source(NULL), message_name(""), message_reason(""), dispatch_count(0), max_dispatch_count(0) { } classDeferredMessage::classDeferredMessage(classDeferredMessageListener *src, classDeferredMessageListener *dest, const string &msg_name, const string &msg_reason, int max_dispatches) : source(src), recipient(dest), message_name(msg_name), message_reason(msg_reason), dispatch_count(0), max_dispatch_count(max_dispatches) { } classDeferredMessage::classDeferredMessage(const classDeferredMessage &other) { recipient = other.recipient; source = other.source; message_name = other.message_name; message_reason = other.message_reason; dispatch_count = other.dispatch_count; max_dispatch_count = other.max_dispatch_count; } // // classDeferredMessage::AttemptDispatch // // Try to send the message to the recipient. If this fails, the dispatch_count is // incremented. // int classDeferredMessage::AttemptDispatch() { int res; ++dispatch_count; if (recipient->CanAcceptMessage(*this)) { recipient->AcceptMessage(*this); res = RES_DISPATCHED; } else { if (max_dispatch_count != -1 && dispatch_count >= max_dispatch_count) res = RES_FAILED; // Too many attempts else res = RES_REFUSED; // Can't accept it yet } // notify source of message dispatch attempt's result source->MessageResult(*this, res); return res; } // // classDeferredMessage::Cancel // // Cancel is called when the recipient cancels messages, in order to notify the // source. // void classDeferredMessage::Cancel() { int res = RES_CANCELED; source->MessageResult(*this, res); } string classDeferredMessage::GetName() { return message_name; } string classDeferredMessage::GetReason() { return message_reason; } classDeferredMessageListener *classDeferredMessage::GetRecipient() { return recipient; } classDeferredMessageListener *classDeferredMessage::GetSource() { return source; } int classDeferredMessage::GetDispatchCount() { return dispatch_count; } int classDeferredMessage::GetMaxDispatchCount() { return max_dispatch_count; } void classDeferredMessage::SetMaxDispatchCount(int ct) { max_dispatch_count = ct; } // // classDeferredMessageQueue Constructor // // Create a timer for the queue message dispatch. // classDeferredMessageQueue::classDeferredMessageQueue(bool p_use_timer, bool enabled, int interval) { use_timer = p_use_timer; if (use_timer) { try { timer = new TTimer(NULL); timer->OnTimer = timerEvent; timer->Interval = (Cardinal)interval; timer->Enabled = enabled; } catch(...) { // ???? } } } // // classDeferredMessageQueue Destructor // // Shut down the queue, and destroy the queue message dispatch timer. // classDeferredMessageQueue::~classDeferredMessageQueue() { if (timer) delete timer; timer = NULL; } // // classDeferredMessageQueue::AddListener // // Registers a listener object with the queue. This is necessary if you want // proper protection against receiving messages after your listener has been // deleted. // void classDeferredMessageQueue::AddListener(classDeferredMessageListener *listener) { string str_name = listener->GetName(); if (listeners.find(str_name) == listeners.end()) { listeners[str_name] = listener; listener->AddQueue(this); } } // // classDeferredMessageQueue::RemoveListener // // Remove a listener from the queue's list of registered listeners. // void classDeferredMessageQueue::RemoveListener(classDeferredMessageListener *listener) { string str_name = listener->GetName(); map::iterator i_listener = listeners.find(str_name); if (i_listener != listeners.end()) { CancelMessagesFrom(listener); // outgoing CancelMessagesFor(listener); // incoming listener->RemoveQueue(this); listeners.erase(i_listener); } } // // classDeferredMessageQueue::RemoveAllListeners // // Unregister all listeners from this queue. Sources will receive notification of // cancellation for any pending messages. // void classDeferredMessageQueue::RemoveAllListeners() { map::iterator i_listener = listeners.begin(); while (i_listener != listeners.end()) { CancelMessagesFrom(i_listener->second); CancelMessagesFor(i_listener->second); (i_listener->second)->RemoveQueue(this); listeners.erase(i_listener); i_listener = listeners.begin(); } } // // classDeferredMessageQueue::FindListener // // Looks for a listener by name. It must have been registered with the queue // already for this to succeed. NULL is returned if no such listener exists. // classDeferredMessageListener *classDeferredMessageQueue::FindListener(const string &name) { classDeferredMessageListener *retptr = NULL; map::iterator i_listener = listeners.find(name); if (i_listener != listeners.end()) retptr = i_listener->second; return retptr; } // // classDeferredMessageQueue::CreateMessage // // Create a message between named source and destination listeners. // classDeferredMessage classDeferredMessageQueue::CreateMessage( const string &source, const string &dest, const string &name, const string &reason, int max_dispatch_count) { classDeferredMessageListener *pSource = FindListener(source); classDeferredMessageListener *pDest = FindListener(dest); if (!pSource || !pDest) throw classNoSuchListener(); return classDeferredMessage(pSource, pDest, name, reason, max_dispatch_count); } // // classDeferredMessageQueue::ScheduleMessage // // Places a message on the queue. // bool classDeferredMessageQueue::ScheduleMessage(classDeferredMessage &msg) { // add both source and recipient as registered listeners AddListener(msg.GetSource()); AddListener(msg.GetRecipient()); if (!IsLocked()) { deferred_messages.push_back(msg); return true; } else return false; } // // classDeferredMessageQueue::CancelMessagesFor // // Remove all messages for a given recipient. // void classDeferredMessageQueue::CancelMessagesFor(classDeferredMessageListener *recipient) { list::iterator i_msg = deferred_messages.begin(); while(i_msg != deferred_messages.end()) { if (i_msg->GetRecipient() == recipient) { i_msg->Cancel(); i_msg = deferred_messages.erase(i_msg); } else ++i_msg; } } // // classDeferredMessageQueue::CancelMessagesFrom // // Remove all messages from a given source. // void classDeferredMessageQueue::CancelMessagesFrom(classDeferredMessageListener *source) { list::iterator i_msg = deferred_messages.begin(); while(i_msg != deferred_messages.end()) { if (i_msg->GetSource() == source) i_msg = deferred_messages.erase(i_msg); else ++i_msg; } } // // classDeferredMessageQueue::CancelAllMessages // // Empties the message queue. // void classDeferredMessageQueue::CancelAllMessages() { list::iterator i_msg = deferred_messages.begin(); while(i_msg != deferred_messages.end()) i_msg = deferred_messages.erase(i_msg); } // // classDeferredMessageQueue::DispatchMessages // // Attempts to dispatch all messages to their recipients. The messages may be // refused, in which case they may be tried again. The messages may also // expire, via exceeding a determined maximum number of dispatch attempts. // The source of the message is notified of the result of this operation // via classDeferredMessage::AttemptDispatch. // void classDeferredMessageQueue::DispatchMessages() { list::iterator i_msg = deferred_messages.begin(); while(i_msg != deferred_messages.end()) { int res = i_msg->AttemptDispatch(); // REFUSED means to try again later; any other code means the message // should be removed from the queue. if (res != classDeferredMessage::RES_REFUSED) i_msg = deferred_messages.erase(i_msg); else ++i_msg; } } // // classDeferredMessageQueue::timerEvent // // Handles the regularly occurring VCL timer event that drives the message // dispatch. // void __fastcall classDeferredMessageQueue::timerEvent(TObject *Sender) { DispatchMessages(); } void classDeferredMessageQueue::LockQueue() { ++lock_count; } void classDeferredMessageQueue::UnlockQueue() { if(lock_count > 0) --lock_count; } bool classDeferredMessageQueue::IsLocked() { return (lock_count > 0); } bool classDeferredMessageQueue::IsTimerEnabled() { return timer ? timer->Enabled : false; } void classDeferredMessageQueue::SetTimerEnabled(bool enabled) { if(timer) timer->Enabled = enabled; } Cardinal classDeferredMessageQueue::GetTimerInterval() { return timer ? timer->Interval : 0; } void classDeferredMessageQueue::SetTimerInterval(Cardinal interval) { if(timer) timer->Interval = (Cardinal)interval; } // EOF