U E D R S I H C RSS
ID
Password
Join
전쟁에서 이등상이란 없다. ―오마 브레들리



Contents

1 메세지 큐
1.1 메세지 블럭
1.1.1 메세지 블럭을 생성하기
1.1.1.1 ACE_Message_Block이 데이타 메모리를 할당하고 관리하는 경우
1.1.1.2 사용자가 직접 메세지 메모리를 할당하고 관리하는 경우
1.1.2 메세지 블럭안에 데이타를 삽입하고 관리하기
1.1.3 복사하기와 중복시키기(Copying and Duplicating)
1.1.4 메세지 블럭을 해제하기(Releasing Message Blocks)
1.2 ACE에서의 메세지 큐
1.3 워터마크
1.4 Using Message Queue Iterators
1.5 Dynamic or Real-Time Message Queues
1.5.1 Creating Message Queues

1 메세지 큐 #

현재의 실시간 어플리케이션들은 일반적으로 통신가능한, 그러나 독립적인 작업단위 - 테스크(task)들의 집합으로 구축된다. 이 작업단위는 서로서로 몇몇 체계를 통하여 통신할 수 있다 - 체계들중 일반적으로 사용되는 것은 메세지 큐를 들 수 있다. 이 경우의 통신 기본 모드는 송신자(또는 producer) task가 메세지를 메세지 큐에 넣는 것(enqueue)과 수신자(또는 consumer)가 큐로부터 메세지를 꺼내는 것(dequeue)이 있다. 물론 이것은 메세지 큐를 사용하는 방법들 중 한가지일 뿐이다. 우리는 메세지 큐의 각각 다른 예를 이어지는 글에서 보게 될 것이다.

ACE의 메세지 큐는 UNIX System V 메세지 큐를 모델삼아 만들어졌다. 그러므로 만약 System V에 익숙하다면 금방 습득할 수 있을 것이다. ACE에서 사용가능한 메세지 큐는 몇가지 다른 형태로써 존재한다. 각각의 메세지 큐들은 큐에서 메시지를 넣고 꺼내는데 있어 서로 다른 스케쥴링 알고리즘을 사용한다.

1.1 메세지 블럭 #

ACE에서 메세지는 메세지 블럭의 형태로서 메세지 큐에 적재된다. 메세지 블럭은 저장되는 실제 메세지 데이타를 포장하며, 몇가지 데이타 삽입 및 관리명령들을 제공한다. 각 메세지 블럭은 헤더와 데이터 블럭을 "담고" 있다. 여기서 담고 있다는 의미는 느슨한 의미의 뜻이라는 것에 주의하기 바란다. 메세지 블럭은 언제나 데이타 블럭 또는 메세지 헤더와 관련된 메모리를 관리하지는 않는다. (although you can have it do this for you) 단지 그것의 역할은 포인터를 잡고 있는 것뿐이다. 적재되어있다는 것은 개념적인 의미일 뿐이다. 데이타 블럭은 차례대로 자신이 가리키는 실제 데이타 버퍼 포인터를 담고 있다. 이것은 아래 그림에 나타난 것처럼 여러 메세지 블럭들 간의 (데이타 버퍼에 대한) 융통성있는 공유를 가능하게 한다. 그림에서는 한개의 데이타 블럭을 두개의 메세지 블럭이 공유하고 있다는 것에 주목해라. 이것은 데이타 복사에 따른 오버헤드없이 서로다른 데이타 큐에 같은 데이타를 push하는 것을 가능하게 해준다. 메세지 블럭 클래스는 ACE_Message_Block으로 선언되어있으며, 데이타 블럭 클래스는 ACE_Data_Block으로 선언되어있다. 데이타 블럭과 메세지 블럭을 실제로 생성하려면 ACE_Message_Block의 생성자를 사용하라.

1.1.1 메세지 블럭을 생성하기 #

ACE_Message_Block 클래스는 몇가지 다른 형태의 생성자를 가지고 있다. 이 생성자들은 메세지/데이타 블럭뒤에 숨겨져있는 메세지 데이타를 다루는데 사용될 수 있다. The ACE_Message_Block class can be used to completely hide the ACE_Data_Block and manage the message data for you or if you want you can create and manage data blocks and message data on your own. The next section goes over how you can use ACE_Message_Block to manage message memory and data blocks for you. We then go over how you can manage this on your own without relying on ACE_Message_Block’s management features.

1.1.1.1 ACE_Message_Block이 데이타 메모리를 할당하고 관리하는 경우 #

메세지 블럭을 생성하는 가장 간단한 방법은 ACE_Message_Block의 생성자의 인자로 넘겨줄 데이타 조각의 크기를 넘겨주는 것이다. 이것은 메세지 데이타를 위한 텅빈 메모리 영역의 할당과 ACE_Data_Block의 생성작업을 내부적으로 수행한다. 메세지 블럭을 생성한 후에는 데이타의 추가 삭제작업을 위해 rd_ptr()과 wd_ptr() 관리 메소드를 사용할 수 있다. ACE_Message_Block로 하여금 데이타와 데이타 블럭을 생성하도록 하는 데 있어 주된 잇점은 이 클래스가 사용할 메모리를 정확하게 관리를 해준다는 데 있다. 다시말하면 메모리관리에 있어서 쉽게 겪을 수 있는 메모리 누수에 대한 골칫거리를 미연에 방지해주는 역할을 한다.

ACE_Message_Block 생성자는 또한 프로그래머로 하여금 내부적으로 메모리를 할당하는데 항상 사용하는 할당자(allocator)를 정의하는 것을 허용한다. 만약 할당자를 ACE_Message_Block를 생성할 때 인자로 넘겨준다면, 메세지 블록은 그것을 내부 데이타 블럭과 메세지 데이타를 할당하는데 사용하게 된다. 생성자는 다음과 같다:

ACE_Message_Block (size_t size,
    ACE_Message_Type type = MB_DATA,
    ACE_Message_Block *cont = 0,
    const char *data = 0,
    ACE_Allocator *allocator_strategy = 0,
    ACE_Lock *locking_strategy = 0,
    u_long priority = 0,
    const ACE_Time_Value & execution_time = ACE_Time_Value::zero,
    const ACE_Time_Value & deadline_time = ACE_Time_Value::max_time);

위 생성자의 정의는 다음과 같은 인자들을 가진다 :
  1. size 변수는 메세지 블럭에 대응되는 데이타 버퍼의 크기를 나타낸다. 메세지 블럭의 크기는 정해지더라도 길이(length)는 wr_ptr()이 설정될 때까지 0이란 점에 주의해라. 이것에 대해서는 차후 설명할 것이다.
  2. type 변수는 메세지의 타입을 정의한다. (ACE_Message_Type 열거형 타입으로 지정가능한 몇가지 타입값이 존재한다. 기본적으로 데이타 메세지형(MB_DATA)로 지정된다.)
  3. cont 변수는 "조각 연결들(fragment chain)"안에서 다음 메세지 블럭으로의 포인터를 나타낸다. 메세지 블럭은 실제로 연결 리스트 형태로 연결되어있는 것이 가능하다. 이 연결리스트의 각각의 노드는 메세지큐에 마치 하나의 메세지 블럭처럼 간주되어 적재(enqueue)할 수 있다. 기본값은 0이며, 블럭에 대한 연결리스트는 고려하지 않는다는 뜻이 된다.
  4. data 변수는 이 메세지 블럭에 담겨질 자료의 시작 포인터를 의미한다. 만약 이 변수가 0으로 지정되면, 첫번째 인자에 지정한 크기의 버퍼공간을 생성하게되고, 이것은 메세지 블럭에 의해 관리된다. 메세지블럭이 지워지면, 이 버퍼도 같이 소멸한다. 반면에, 만약 이 변수에 다른 값을 지정했다면(다시말해서, null이 아니면) 차후 데이타 블럭이 지워지더라도 이 포인터에 할당된 버퍼는 소멸되지 않는다! 이것은 중요한 차이점이며 잘 노트해놓을 필요가 있다.
  5. allocator_strategy 변수는 (만약 필요하다면) 데이타 버퍼를 메모리할당하는데 사용될 수 있다. to be used to allocate the data buffer (if needed), i.e. if the 4th parameter was null (as explained above). Any of the ACE_Allocator subclasses can be used as this argument. (See the chapter on Memory Management for more on ACE_Allocators).
  6. If locking_strategy is non-zero, then this is used to protect regions of code that access shared state (e.g. reference counting) from race conditions.
  7. This and the next two parameters are used for scheduling for the real-time message queues which are available with ACE, and should be left at their default for now.

1.1.1.2 사용자가 직접 메세지 메모리를 할당하고 관리하는 경우 #

만약 ACE_Message_Block를 사용하고 있다면, (If you are using ACE_Message_Block you are not tied down to using it to allocate memory for you.) 메세지 블럭의 생성자는 다음과 같은 일을 할 수 있게 해준다.
  • 직접 작성한 데이타 블럭을 생성하고 인자로 넘겨서 메세지 데이타로서 지정되도록 할 수 있다.
  • 메세지데이타의 포인터를 넘겨 줌으로서 메세지 블럭으로 하여금 내장된 데이타 블럭을 생성하고 설정하도록 할 수 있다. 메세지 블럭은 데이타 블럭을 위한 메세지를 관리할 것이지만 메세지 데이타에 대해서는 하지 않는다.

아래 제시한 예제는 어떻게 메세지 블럭이 메세지 데이타의 포인터를 넘겨받는지와 ACE_Message_Block이 내장되어있는 ACE_Data_Block을 어떻게 생성하고 관리하는 지를 설명하기 위한 것이다.

//실제 데이타
char data[size];
data = "This is my data";
//데이타를 담고 있을 메세지 블럭을 생성한다.
ACE_Message_Block *mb = new ACE_Message_Block (data, // 신규로 생성될 데이타 블럭에 담겨질 데이타
                                               blocksize); //저장될 블럭의 크기

This constructor creates an underlying data block and sets it up to point to the beginning of the data that is passed to it. The message block that is created does not make a copy of the data nor does it assume ownership of it. This means that when the message block mb is destroyed, the associated data buffer data will NOT be destroyed (i.e. this memory will not be deallocated). This makes sense, the message block didn’t make a copy therefore the memory was not allocated by the message block, so it shouldn’t be responsible for its deallocation.

1.1.2 메세지 블럭안에 데이타를 삽입하고 관리하기 #

In addition to the constructors, ACE_Message_Block offers several methods to insert data into a message block directly. Additional methods are also available to manipulate the data that is already present in a message block. Each ACE_Message_Block has two underlying pointers that are used to read and write data to and from a message block, aptly named the rd_ptr and wr_ptr. They are accessible directly by calling the rd_ptr() and wr_ptr() methods. The rd_ptr points to the location where data is to be read from next, and the wr_ptr points to the location where data is to be written to next. The programmer must carefully manage these pointers to insure that both of them always point to the correct location. When data is read or written using these pointers they must be advanced by the programmer, they do not update automagically. Most internal message block methods also make use of these two pointers therefore making it possible for them to change state when you call a method on the message block. It is the programmer's responsibility to make sure that he/she is aware of what is going on with these pointers.

1.1.3 복사하기와 중복시키기(Copying and Duplicating) #

Data can be copied into a message block by using the copy() method on ACE_Message_Block.

int copy(const char *buf, size_t n);

The copy method takes a pointer to the buffer that is to be copied into the message block and the size of the data to be copied as arguments. This method uses the wr_ptr and begins writing from this point onwards till it reaches the end of the data buffer as specified by the size argument. copy() will also ensure that the wr_ptr is updated so that is points to the new end of the buffer. Note that this method will actually perform a physical copy, and thus should be used with caution. The base() and length() methods can be used in conjunction to copy out the entire data buffer from a message block. base() returns a pointer that points to the first data item on the data block and length() returns the total size of the enqueued data. Adding the base and length gets you a pointer to the end of the data block. Using these methods together you can write a routinue that takes the data from the message block and makes an external copy. The duplicate() and clone() methods are used to make a “copy” of a message block. The clone() method as the name suggests actually creates a fresh copy of the entire message block including its data blocks and continuations, i.e. a deep copy. The duplicate() method, on the other hand, uses the ACE_Message_Block’s reference counting mechanism. It returns a pointer to the message block that is to be duplicated and internally increments an internal reference count.

1.1.4 메세지 블럭을 해제하기(Releasing Message Blocks) #

Once done with a message block the programmer can call the release() method on it. If the message data memory was allocated by the message block then calling the release() method will also de-allocate that memory. If the message block was reference counted, then the release () will cause the count to decrement until the count reaches zero, after which the message block and its associated data blocks are removed from memory.

1.2 ACE에서의 메세지 큐 #

As mentioned earlier, ACE has several different types of message queues, which in general can be divided into two categories, static and dynamic. The static queue is a general purpose message queue named ACE_Message_Queue (as if you couldn't guess) whereas the dynamic message queues (ACE_Dynamic_Message_Queue) are real-time message queues. The major difference between these two types of queues is that messages on static queues have static priority, i.e. once the priority is set it does not change. On the other hand, in the dynamic message queues, the priority of messages may change dynamically, based on parameters such as execution time and deadline. The following example illustrates how to create a simple static message queue and then how to enqueue and dequeue message blocks onto it.

//Example 1a
#ifndef MQ_EG1_H_
#define MQ_EG1_H_
#include "ace/Message_Queue.h"
class QTest
{
public:
    //Constructor creates a message queue with no synchronization
    QTest(int num_msgs);
    //Enqueue the num of messages required onto the message mq.
    int enq_msgs();
    //Dequeue all the messages previously enqueued.
    int deq_msgs ();
private:
    //Underlying message queue
    ACE_Message_Queue<ACE_NULL_SYNCH> *mq_;
    //Number of messages to enqueue.
    int no_msgs_;
};
#endif /*MQ_EG1.H_*/
//Example 1b
#include "mq_eg1.h"
QTest::QTest(int num_msgs):no_msgs_(num_msgs)
{
    ACE_TRACE("QTest::QTest");
    //First create a message queue of default size.
    if(!(this->mq_=new ACE_Message_Queue<ACE_NULL_SYNCH> ()))
    ACE_DEBUG((LM_ERROR,"Error in message queue initialization \n"));
}
int QTest::enq_msgs()
{
    ACE_TRACE("QTest::enq_msg");
    for(int i=0; i<no_msgs_;i++)
    {
        //create a new message block specifying exactly how large
        //an underlying data block should be created.
        ACE_Message_Block *mb;
        ACE_NEW_RETURN(mb, ACE_Message_Block(ACE_OS::strlen("This is message 1\n")), -1);
        //Insert data into the message block using the wr_ptr
        ACE_OS::sprintf(mb->wr_ptr(), "This is message %d\n", i);
        //Be careful to advance the wr_ptr by the necessary amount.
        //Note that the argument is of type "size_t" that is mapped to bytes.
        mb->wr_ptr(ACE_OS::strlen("This is message 1\n"));
        //Enqueue the message block onto the message queue
        if(this->mq_->enqueue_prio(mb)==-1)
        {
            ACE_DEBUG((LM_ERROR,"\nCould not enqueue on to mq!!\n"));
            return -1;
        }
        ACE_DEBUG((LM_INFO,"EQ'd data: %s\n", mb->rd_ptr() ));
    } //end for
    //Now dequeue all the messages
    this->deq_msgs();
    return 0;
}

int QTest::deq_msgs()
{
    ACE_TRACE("QTest::dequeue_all");
    ACE_DEBUG((LM_INFO,"No. of Messages on Q:%d Bytes on Q:%d \n", mq_->message_count(), mq_->message_bytes()));
    ACE_Message_Block *mb;
    //dequeue the head of the message queue until no more messages are
    //left. Note that I am overwriting the message block mb and I since
    //I am using the dequeue_head() method I dont have to worry about
    //resetting the rd_ptr() as I did for the wrt_ptr()
    for(int i=0;i <no_msgs_; i++)
    {
        mq_->dequeue_head(mb);
        ACE_DEBUG((LM_INFO,"DQ'd data %s\n", mb->rd_ptr() ));
        //Release the memory associated with the mb
        mb->release();
    }
    return 0;
}

int main(int argc, char* argv[])
{
    if(argc <2)
        ACE_ERROR_RETURN((LM_ERROR, "Usage %s num_msgs", argv[0]), -1);
    QTest test(ACE_OS::atoi(argv[1]));
    if(test.enq_msgs() == -1)
        ACE_ERROR_RETURN( (LM_ERROR,"Program failure \n"), -1);
}

The above example illustrates several methods of the message queue class. The example consists of a single QTest class which instantiates a message queue of default size with ACE_NULL_SYNCH locking. The locks (a mutex and a condition variable) are used by the message queue to

  • Ensure the safety of the reference count maintained by message blocks against race conditions when accessed by multiple threads.
  • To "wake up" all threads that are sleeping because the message queue was empty or full.

In this example, since there is just a single thread, the template synchronization parameter for the message queue is set to null (ACE_NULL_SYNCH which means use ACE_Null_Mutex and ACE_Null_Condition). The enq_msgs() method of QTest is then called, which enters a loop that creates and enqueues messages onto the message queue. The constructor of ACE_Message_Block is passed the size of the message data. Using this constructor causes the memory to be managed automatically (i.e. the memory will be released when the message block is deleted i.e. release()'d). The wr_ptr is then obtained (using the wr_ptr() accessor method) and data is copied into the message block. After this the wr_ptr is advanced forward. The enqueue_prio() method of the message queue is then used to actually enqueue the message block onto the underlying message queue.

After no_msgs_ message blocks have been created, initialized and inserted onto the message queue, enq_msgs() calls the deq_msgs () method. This method dequeues each of the messages from the message queue using the dequeue_head() method of ACE_Message_Queue. After dequeing a message its data is displayed and then the message is release()'d.

1.3 워터마크 #

워터마크는 그 메시지 큐에 너무 많은 데이타를 가지고 있을 때(메세지 큐가 high 워터마크에 도달했을때)나 충분하지 않은 양의 데이타를 가지고 있을 때 (메세지 큐가 low 워터마크에 도달했을 때)를 알아내기 위해 사용한다. 두 워터마크 모두 흐름 제어에 사용된다. - 예를 들면 low 워터마크는 TCP 상에서 Silly Window Syndrome와 같은 상황을 피하기 위해 사용되며, high 워터마크는 데이타 송신이나 생성을 막거나 느리게 만들기 위해 사용될 수 있다.

ACE상의 메세지 큐들은 이 기능을 큐에 적재되는 전체 데이타 바이트 수를 관리함으로서 구현하고 있다. 따라서, 메세지큐에 새로운 메세지 블럭이 적재될때마다, 먼저 블럭의 길이를 진단하고 메세지 블럭이 큐에 들어갈 수 있는지 여부를 검사한다(다시말하면, 새로운 메세지 블럭이 적재된다면 새로운 메세지 블럭은 high 워터마크를 초과하지 않는다는 것이 확실해야한다) 만약 메세지 큐가 데이타를 적재할 수 없고 잠금을 소유했다면(다시말하면, 메세지 큐의 템플릿 인자에 ACE_NULL_SYNCH가 아닌 ACE_SYNC가 사용되었다면), 충분한 양의 공간이 확보되거나 enqueue 메소드가 타임아웃상태가 될 때까지 호출자를 블럭상태로 만들게 된다. 타임아웃상태가 되거나 null 잠금을 소유하는 경우하면, enqueue 메소드는 -1 값을 반환하게 된다.(이것은 메세지를 적재할 수 없다는 의미)

비슷한 방법으로, ACE_Message_Queue의 dequeue_head 메소드가 실행될 때에는 큐에서 데이타를 빼낸이후에 low 워터마크보다 더 많이 남아있을지 여부를 확인한다. 만일 그렇지 않다면, 큐가 잠금을 획득하게 될때까지 블럭되거나 -1을 반환할 것이다. (이것은 enqueue 메소드와 같은 방법으로 동작한다) 다음은 워터마크를 설정하고 값을 읽는데 사용하기위한 메소드들이다.

// high 워터마크값을 얻는다.
size_t high_water_mark(void)
// high 워터마크값을 설정한다.
void high_water_mark(size_t hwm);
// low 워터마크값을 얻는다.
size_t low_water_mark(void)
// low 워터마크값을 설정한다.
void low_water_mark(size_t lwm)

1.4 Using Message Queue Iterators #

As is common with other container classes, forward and reverse iterators are available for message queues in ACE. These iterators are named ACE_Message_Queue_Iterator and ACE_Message_Queue_Reverse_Iterator. Each of these require a template parameter which is used for synchronization while traversing the message queue. If multiple threads are using the message queue, then this should be set to ACE_SYNCH - otherwise it may be set to ACE_NULL_SYNCH. When an iterator object is created, its constructor must be passed a reference to the message queue we wish it to iterate over. The following example illustrates the water marks and the iterators:
//Example 2 
#include "ace/Message_Queue.h" 
#include "ace/Get_Opt.h" 
#include "ace/Malloc_T.h" 
#define SIZE_BLOCK 1 
 
class Args{ 
public: 
    Args(int argc, char*argv[],int& no_msgs, ACE_Message_Queue<ACE_NULL_SYNCH>* &mq){ 
        ACE_Get_Opt get_opts(argc,argv,"h:l:t:n:xsd"); 
        while((opt=get_opts())!=-1) 
            switch(opt){ 
            case 'n': 
                //set the number of messages we wish to enqueue and dequeue 
                no_msgs=ACE_OS::atoi(get_opts.optarg); 
                ACE_DEBUG((LM_INFO,"Number of Messages %d \n",no_msgs)); 
                break; 
            case 'h': 
                //set the high water mark 
                hwm=ACE_OS::atoi(get_opts.optarg); 
                mq->high_water_mark(hwm); 
                ACE_DEBUG((LM_INFO,"High Water Mark %d msgs \n",hwm)); 
                break; 
            case 'l': 
                //set the low water mark 
                lwm=ACE_OS::atoi(get_opts.optarg); 
                mq->low_water_mark(lwm); 
                ACE_DEBUG((LM_INFO,"Low Water Mark %d msgs \n",lwm)); 
                break; 
            default: 
                ACE_DEBUG((LM_ERROR, "Usage -n<no. messages> -h<hwm> -l<lwm>\n")); 
                break; 
            } 
    } 
private: 
    int opt; 
    int hwm; 
    int lwm; 
}; 
 
class QTest{ 
public: 
    QTest(int argc, char*argv[]){ 
        //First create a message queue of default size. 
    if(!(this->mq_=new ACE_Message_Queue<ACE_NULL_SYNCH> ())) 
        ACE_DEBUG((LM_ERROR,"Error in message queue initialization \n")); 
    //Use the arguments to set the water marks and the no of messages 
    args_ = new Args(argc,argv,no_msgs_,mq_); 
    } 
    int start_test(){ 
        for(int i=0; i<no_msgs_;i++){ 
            //Create a new message block of data buffer size 1 
            ACE_Message_Block * mb= new ACE_Message_Block(SIZE_BLOCK); 
            //Insert data into the message block using the rd_ptr 
            *mb->wr_ptr()=i; 
            //Be careful to advance the wr_ptr 
            mb->wr_ptr(1); 
            //Enqueue the message block onto the message queue 
            if(this->mq_->enqueue_prio(mb)==-1){ 
                ACE_DEBUG((LM_ERROR,"\nCould not enqueue on to mq!!\n")); 
                return -1; 
            } 
            ACE_DEBUG((LM_INFO,"EQ’d data: %d\n",*mb->rd_ptr())); 
        } 
        //Use the iterators to read 
        this->read_all(); 
        //Dequeue all the messages 
        this->dequeue_all(); 
        return 0; 
    } 
    void read_all(){ 
        ACE_DEBUG((LM_INFO,"No. of Messages on Q:%d Bytes on Q:%d \n",mq_->message_count(),mq_->message_bytes())); 
        ACE_Message_Block *mb; 
        //Use the forward iterator 
        ACE_DEBUG((LM_INFO,"\n\nBeginning Forward Read \n")); 
        ACE_Message_Queue_Iterator<ACE_NULL_SYNCH> mq_iter_(*mq_); 
        while(mq_iter_.next(mb)){ 
            mq_iter_.advance(); 
            ACE_DEBUG((LM_INFO,"Read data %d\n",*mb->rd_ptr())); 
        } 
        //Use the reverse iterator 
        ACE_DEBUG((LM_INFO,"\n\nBeginning Reverse Read \n")); 
        ACE_Message_Queue_Reverse_Iterator<ACE_NULL_SYNCH> 
        mq_rev_iter_(*mq_); 
        while(mq_rev_iter_.next(mb)){ 
            mq_rev_iter_.advance(); 
            ACE_DEBUG((LM_INFO,"Read data %d\n",*mb->rd_ptr())); 
        } 
    } 
    void dequeue_all(){ 
        ACE_DEBUG((LM_INFO,"\n\nBeginning DQ \n")); 
        ACE_DEBUG((LM_INFO,"No. of Messages on Q:%d Bytes on Q:%d \n", mq_->message_count(),mq_->message_bytes())); 
        ACE_Message_Block *mb; 
        //dequeue the head of the message queue until no more messages are left 
        for(int i=0;i<no_msgs_;i++){ 
            mq_->dequeue_head(mb); 
            ACE_DEBUG((LM_INFO,"DQ’d data %d\n",*mb->rd_ptr())); 
        } 
    } 
private: 
    Args *args_; 
    ACE_Message_Queue<ACE_NULL_SYNCH> *mq_; 
    int no_msgs_; 
}; 
 
int main(int argc, char* argv[]){ 
    QTest test(argc,argv); 
    if(test.start_test()<0) 
        ACE_DEBUG((LM_ERROR,"Program failure \n")); 
} 

This example uses the ACE_Get_Opt class (See Appendix for more on this utility class) to obtain the low and high water marks (in the Args class). The low and high water marks are set using the low_water_mark() and high_water_mark() accessor functions. Besides this, there is a read_all() method which uses the ACE_Message_Queue_Iterator and ACE_Message_Queue_Reverse_Iterator to first read in the forward and then in the reverse direction.

1.5 Dynamic or Real-Time Message Queues #

As was mentioned above, dynamic message queues are queues in which the priority of the messages enqueued change with time. Such message queues are thus inherently more useful in real-time applications, where such kind of behavior is desirable. ACE currently provides for two types of dynamic message queues, deadline-based and laxity-based (see IX ). The deadline-based message queues use the deadlines of each of the messages to set their priority. The message block on the queue which has the earliest deadline will be dequeued first when the dequeue_head() method is called using the earliest deadline first algorithm. The laxity-based message queues, however, use both execution time and deadline together to calculate the laxity, which is then used to prioritize each message block. The laxity is useful, as when scheduling by deadline it may be possible that a task is scheduled which has the earliest deadline, but has such a long execution time that it will not complete even if it is scheduled immediately. This negatively affects other tasks, since it may block out tasks which are schedulable. The laxity will take into account this long execution time and make sure that if the task will not complete, that it is not scheduled. The scheduling in laxity queues is based on the minimum laxity first algorithm. Both laxity-based message queues and deadline-based message queues are implemented as ACE_Dynamic_Message_Queue’s. ACE uses the STRATEGY pattern to provide for dynamic queues with different scheduling characteristics. Each message queue uses a different “strategy” object to dynamically set priorities of the messages on the message queue. These "strategy" objects each encapsulate a different algorithm to calculate priorities based on execution time, deadlines, etc., and are called to do so whenever messages are enqueued or removed from the message queue. (For more on the STRATEGY pattern please see the reference "Design Patterns"). The message strategy patterns derive from ACE_Dynamic_Message_Strategy and currently there are two strategies available: ACE_Laxity_Message_Strategy and ACE_Deadline_Message_Strategy. Therefore, to create a "laxity-based" dynamic message queue, an ACE_Laxity_Message_Strategy object must be created first. Subsequently, an ACE_Dynamic_Message_Queue object should be instantiated, which is passed the new strategy object as one of the parameters to its constructor.

1.5.1 Creating Message Queues #

To simplify the creation of these different types of message queues, ACE provides for a concrete message queue factory named ACE_Message_Queue_Factory, which creates message queues of the appropriate type using a variant of the FACTORY METHOD pattern. (For more on the FACTORY METHOD pattern please see reference "Design Patterns"). The message queue factory has three static factory methods to create three different types of message queues:
static ACE_Message_Queue<ACE_SYNCH_USE> * create_static_message_queue ();
static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * create_deadline_message_queue ();
static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * create_laxity_message_queue ();

Each of these methods returns a pointer to the message queue it has just created. Notice that all methods are static and that the create_static_message_queue() method returns an ACE_Message_Queue whereas the other two methods return an ACE_Dynamic_Message_Queue.

This simple example illustrates the creation and use of dynamic and static message queues.
//Example 3
#include "ace/Message_Queue.h"
#include "ace/Get_Opt.h"
#include "ace/OS.h"
class Args{
public:
Args(int argc, char*argv[],int& no_msgs, int& time,ACE_Message_Queue<ACE_NULL_SYNCH>*
&mq){
ACE_Get_Opt get_opts(argc,argv,”h:l:t:n:xsd”);
while((opt=get_opts())!=-1)
switch(opt){
case ’t’:
time=ACE_OS::atoi(get_opts.optarg);
ACE_DEBUG((LM_INFO,”Time: %d \n”,time));
break;
case ’n’:
no_msgs=ACE_OS::atoi(get_opts.optarg);
ACE_DEBUG((LM_INFO,”Number of Messages %d \n”,no_msgs));
break;
case ’x’:
mq=ACE_Message_Queue_Factory<ACE_NULL_SYNCH>::
create_laxity_message_queue();
ACE_DEBUG((LM_DEBUG,”Creating laxity q\n”));
break;
case ’d’:
mq=ACE_Message_Queue_Factory<ACE_NULL_SYNCH>::
create_deadline_message_queue();
ACE_DEBUG((LM_DEBUG,”Creating deadline q\n”));
break;
case ’s’:
mq=ACE_Message_Queue_Factory<ACE_NULL_SYNCH>::
create_static_message_queue();
ACE_DEBUG((LM_DEBUG,”Creating static q\n”));
break;
case ’h’:
hwm=ACE_OS::atoi(get_opts.optarg);
mq->high_water_mark(hwm);
ACE_DEBUG((LM_INFO,”High Water Mark %d msgs \n”,hwm));
break;
case ’l’:
lwm=ACE_OS::atoi(get_opts.optarg);
mq->low_water_mark(lwm);
ACE_DEBUG((LM_INFO,”Low Water Mark %d msgs \n”,lwm));
break;
default:
ACE_DEBUG((LM_ERROR,”Usage specify queue type\n”));
break;
}
}
private:
int opt;
int hwm;
int lwm;
};

class QTest{
public:
QTest(int argc, char*argv[]){
args_ = new Args(argc,argv,no_msgs_,time_,mq_);
array_ =new ACE_Message_Block*[no_msgs_];
}
int start_test(){
for(int i=0; i<no_msgs_;i++){
ACE_NEW_RETURN (array_[i], ACE_Message_Block (1), -1);
set_deadline(i);
set_execution_time(i);
enqueue(i);
}
this->dequeue_all();
return 0;
}
//Call the underlying ACE_Message_Block method msg_deadline_time() to
//set the deadline of the message.
void set_deadline(int msg_no){
float temp=(float) time_/(msg_no+1);
ACE_Time_Value tv;
tv.set(temp);
ACE_Time_Value deadline(ACE_OS::gettimeofday()+tv);
array_[msg_no]->msg_deadline_time(deadline);
ACE_DEBUG((LM_INFO,”EQ’d with DLine %d:%d,”, deadline.sec(),deadline.usec()));
}
//Call the underlying ACE_Message_Block method to set the execution time
void set_execution_time(int msg_no){
float temp=(float) time_/10*msg_no;
ACE_Time_Value tv;
tv.set(temp);
ACE_Time_Value xtime(ACE_OS::gettimeofday()+tv);
array_[msg_no]->msg_execution_time (xtime);
ACE_DEBUG((LM_INFO,”Xtime %d:%d,”,xtime.sec(),xtime.usec()));
}
void enqueue(int msg_no){
//Set the value of data at the read position
*array_[msg_no]->rd_ptr()=msg_no;
//Advance write pointer
array_[msg_no]->wr_ptr(1);
//Enqueue on the message queue
if(mq_->enqueue_prio(array_[msg_no])==-1){
ACE_DEBUG((LM_ERROR,”\nCould not enqueue on to mq!!\n”));
return;
}
ACE_DEBUG((LM_INFO,”Data %d\n”,*array_[msg_no]->rd_ptr()));
}
void dequeue_all(){
ACE_DEBUG((LM_INFO,”Beginning DQ \n”));
ACE_DEBUG((LM_INFO,”No. of Messages on Q:%d Bytes on Q:%d \n”,
mq_->message_count(),mq_->message_bytes()));
for(int i=0;i<no_msgs_ ;i++){
ACE_Message_Block *mb;
if(mq_->dequeue_head(mb)==-1){
ACE_DEBUG((LM_ERROR,”\nCould not dequeue from mq!!\n”));
return;
}
ACE_DEBUG((LM_INFO,”DQ’d data %d\n”,*mb->rd_ptr()));
}
}
private:
Args *args_;
ACE_Message_Block **array_;
ACE_Message_Queue<ACE_NULL_SYNCH> *mq_;
int no_msgs_;
int time_;
};
int main(int argc, char* argv[]){
QTest test(argc,argv);
if(test.start_test()<0)
ACE_DEBUG((LM_ERROR,”Program failure \n”));
}
The above example is very similar to the previous examples, but adds the dynamic message queues into the picture. In the Args class, we have added options to create all the different types of message queues using the ACE_Message_Queue_Factory. Furthermore, two new methods have been added to the QTest class to set the deadlines and execution times of each of the message blocks as they are created (set_deadline()and set_execution_time()). These methods use the ACE_Message_Block methods msg_execution_time() and msg_deadline_time(). Note that these methods take the absolute and NOT the relative time, which is why they are used in conjunction with the ACE_OS::gettimeofday() method. The deadlines and execution times are set with the help of a time parameter. The deadline is set such that the first message will have the latest deadline and should be scheduled last in the case of deadline message queues. Both the execution time and deadline are taken into account when using the laxity queues, however.

Valid XHTML 1.0! Valid CSS! powered by MoniWiki
last modified 2010-10-28 12:42:52
Processing time 0.6699 sec