ACE_Task

ACE_Task是ACE中的任務或主動對象“處理結構”的基類。

概述

ACE_Task是ACE中的任務或主動對象“處理結構”的基類。ACE使用此類來實現主動對象模式。所有希望成為“主動對象”的對象都必須由此類派生。同時可將它看作是更高級的、更為面向對象的執行緒。

ACE_Task處理的是對象,因此更有利於構造OO程式,產生更好的OO軟體,而且,它還包括了一種用於

與其他任務通信的易於使用的機制。

ACE_Task可用作:

<1>更高級的執行緒(常稱其為任務)

<2>主動對象模式中的主動對象

PS.ACE任務:

每個任務都含有一或多個執行緒,以及一個底層訊息佇列。各個任務通過訊息佇列進行通信。至於訊息佇列實現的內在細節程式設計師不必關注。傳送任務用putq() 將訊息插入到另一任務的訊息佇列中,接收任務通過使用getq()將訊息提取出來。這樣的體系結構大大簡化了多執行緒程式的編程模型。

步驟

從ACE_Task類派生的子類應實現以下業務邏輯:

<1>實現服務初始化和終止方法。

open()方法應該包含所有專屬於任務的初始化代碼。其中可能包括諸如連線控制塊、鎖和記憶體這樣的資源。close()方法用於終止。

<2>調用啟用(Activation)方法。

在主動對象實例化後,必須通過調用activate()啟用它。要在主動對象中創建的執行緒數目及其它參數,被傳遞給activate()方法。它將使svc()方法成為所有它生成的執行緒的啟動點。

<3>實現服務專有的處理方法。

在主動對象被啟用後,各個新執行緒在svc() 方法中啟動。程式設計師必須在子類中定義此方法。

實例

//消費者類定義

#i nclude "ace/OS.h"

#i nclude "ace/Task.h"

#i nclude "ace/Message_Block.h"

//The Consumer Task.

class Consumer :

public ACE_Task<ACE_MT_SYNCH>

{

public:

int open(void*)

{

ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));

//Activate the Task

activate(THR_NEW_LWP,1);

return 0;

}

//The Service Processing routine

int svc(void)

{

//Get ready to receive message from Producer

ACE_Message_Block * mb = 0;

do

{

mb = 0;

//Get message of underlying queue

getq(mb);

ACE_DEBUG((LM_DEBUG,

"(%t) Got message: %d from remote task\n", *mb->rd_ptr()));

}while(*mb->rd_ptr()<10);

return 0;

}

int close(u_long)

{

ACE_DEBUG((LM_DEBUG, "Consumer closes down \n"));

return 0;

}

};

//生產者類定義

class Producer :

public ACE_Task<ACE_MT_SYNCH>

{

public:

Producer(Consumer * consumer) :

consumer_(consumer), data_(0)

{

mb = new ACE_Message_Block((char *)&data_, sizeof(data_));

}

int open(void *)

{

ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));

//Activate the Task

activate(THR_NEW_LWP, 1);

return 0;

}

//The Service Processing routine

int svc(void)

{

while(data_ < 11)

{

//Send message to consumer

ACE_DEBUG((LM_DEBUG,

"(%t) Sending message: %d to remote task\n", data_));

consumer_->putq(mb_);

//Go to sleep for a sec.

ACE_OS::sleep(1);

data_++;

}

return 0;

}

int close(void)

{

ACE_DEBUG((LM_DEBUG, "Producer closes down \n"));

return 0;

}

private:

char data_;

Consumer * consumer_;

ACE_Message_Block * mb_;

};

//main()函式

int main(int argc, char *argv[])

{

Consumer * consumer = new Consumer;

Producer * producer = new Producer(consumer);

producer->open(0);

consumer->open(0);

//Wait for all the tasks to exit. ACE_Thread_Manager::instance()->wait();

ACE_OS::exit(0);

}

分析:

以上為經典的生產者-消費者例子,演示了兩個任務如何使用底層的訊息佇列進行通信。我們可以將生產者和消費者看作是不同的ACE_Task類型的對象。方案十分簡單,但卻是面向對象的,在編寫面向對象的多執行緒程式或主動對象的實例時,我們可採用此方案,它提供了比低級執行緒API更好的方法。

框架

ACE的ACE_Task框架提供了一種基於訊息的編程模式,可以Windows編程的訊息循環進行類比。

ACE_Task Windows 訊息循環 說明
訊息類型 ACE_Message_Block* MSG Windows 訊息用MSG結構表示,ACE_Task中因為不能預計各種套用中訊息的類型,所以ACE_Message_Block基本上可以理解為是對一個指針的封裝,這個指針指向實際的一塊記憶體或是一個對象等等。在創建ACE_Message_Block時,可以指定是由ACE_Message_Block來管理記憶體(構造函式中指定一個size_t類型的大小),還是由我們自己管理記憶體(構造函式中指定一個指針)。而一個ACE_Message_Block類型的指針,就是一個訊息,我們通過傳遞它來進行邏輯的業務處理。
傳送訊息 ACE_Task::putq SendMessage 事實上,到底用SendMessage還是PostMessage與ACE_Task::putq來進行類比,我很為難,PostMessage傳送一個訊息後立刻返回,這與通常的ACE_Task::putq行為非常類似,因為ACE_Task是運行在另外一個執行緒上,ACE_Task::putq只是完成將訊息插入到訊息佇列的工作,理論上它應該立刻返回,但實際上,ACE_Task的訊息佇列有容量大小限制,這個限制由我們自己限定,噹噹前訊息佇列滿時,ACE_Task::putq將阻塞一直到可以插入,這時候就比較類似與SendMessage,
取出訊息 ACE_Task::getq GetMessage GetMessage和ACE_Task::getq在當前訊息佇列沒有訊息時都會阻塞到有訊息可用為止,所以對於它們的使用比較類似,通常會寫一個訊息循環函式 BOOL bRet; MSG msg; while( (bRet = GetMessage( &msg, NULL, 0, 0 )) != 0) { if (bRet == -1) { // handle the error and possibly exit } else { TranslateMessage(&msg); DispatchMessage(&msg); } } ACE_Message_Block * msg; while(getq(msg) != -1) // int putq (ACE_Message_Block *, ACE_Time_Value *timeout = 0); { // process msg here }
訊息處理函式 默認沒有提供 WNDPROC 通過TranslateMessage和DispatchMessage,Windows將訊息投遞到相映視窗的WNDPROC上,ACE_Task沒有提供類似與WNDPROC的回調函式,如果願意,我們可以在ACE_Task上寫出類似的結構,而通常,我們直接在訊息循環中編寫處理訊息的代碼

儘管看起來ACE_Task提供的訊息系統與WIndows的訊息系統很象,但實際上,它們還是有比較大的區別,要搭架一個基於ACE_Task的訊息系統,通常要做如下的步驟:

編寫一個派生自ACE_Task的類,指定它的同步模式

ACE_Task的訊息佇列可以由多個處理執行緒共享使用,所以需要提供同步模式,例如ACE_MT_SYNCH和ACE_NULL_SYNCH分別表示基於多執行緒的同步和不使用同步,這個參數是ACE_Task的一個模板參數。 class My_Task : public ACE_Task<ACE_MT_SYNCH>

{

public:

virtual int svc();

}

重載ACE_Task的svc 方法,編寫訊息循環相關的代碼 int My_Task::svc()

{

ACE_Message_Block * msg;

while(getq(msg) != -1) // int putq (ACE_Message_Block *, ACE_Time_Value *timeout = 0);

{

// process msg here

}

}

svc 方法相當與處理執行緒的入口方法。

假設My_Task是一個基於ACE_Task的類,創建一個唯一的My_Task實例,這個可以通過

typedef ACE_Singleton<MyTask, SYNCH_METHOD> MYTASK;

然後總是使用MYTASK::instance方法來獲取一個My_Task的指針來完成。 在適當位置(一般是程式開始的時候),讓My_Task開始工作

MYTASK::intance()->activate(

THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED , // 執行緒創建的屬性

n_threads = 1, // 執行緒的數目,即有多少處理執行緒

...)在有訊息發生的時候傳送訊息 ACE_Message_Block * msg;

// fill the msg

...

MYTASK::intance()->putq(msg);

最後考慮一個使用ACE_Task的實例,在一個編寫WEB伺服器的項目中,類Request_Handler負責處理HTTP請求,Request_Hanlder派生自ACE_Task,當有請求時,其他的代碼將Http請求構造成一個ACE_Message_Block,並調用Request_Handler的putq方法將請求插入訊息佇列,Request_Handler 配置為根據CPU的數目創建處理執行緒,Request_Handler的svc方法從佇列中獲取請求進行處理,然後將處理的結果構造成為一個ACE_Message_Block,插入到Response_Handler的訊息佇列,Response_Handler也派生自ACE_Task,但它只有一個處理執行緒,它僅僅將相應的數據寫回給客戶端。

相關詞條

熱門詞條

聯絡我們