2013年1月31日木曜日

Zend_QueueでActiveMQを使う_その2

さて、その1の続き。

ひたすらZendのソースを追ってみたところ、その1で作ったMy_Queue_Stomp_Clientクラスを使うようにするにはZend_Queueのインスを作成する際のパラメータで、オブジェクトを渡してやれば良いことが判明。
$stompClient = new My_Queue_Stomp_Client( Zend_Queue_Adapter_Activemq::DEFAULT_SCHEME );
$options = array(
    'name'          => $name
  , 'driverOptions' => array( 'stompClient' => $stompClient )
);
$activeMq = new Zend_Queue( 'Activemq', $options );
後は今までどおり
$msgQueue = $activeMq -> receive( 1 );
とやれば、キューを一つ取り出すことができた。

とりあえずこれで最低限使える状態にはなったのだけど、このやり方だとキューを1つしか取り出すことができない。
マニュアルを読んだところ
$msgQueue = $activeMq -> receive( 5 );
のようにすると、キューを5つ取り出すことができるらしい。
というわけで早速試してみたところ、またも微妙に意図した動きをしてくれない。
キューが5件以上ある時は問題なく5件取得できるし、キューが1件もない時はすぐに終了してくれるのだけど、キューが5件未満だった場合、いつまで待ってもレスポンスが返ってこない。
またもZendのソースを追ってみたところ、今度はかなり難解だった。
というわけで今度は素直に断念して、以下のようなコードで対応することに。
$insertDataCount = 0;
$queueArray = array();
while ( 1 ) {
  
  // ActiveMQから一件ずつ取得
  $msgQueue = $activeMq -> receive( 1 );
  $queue    = $msgQueue -> current();

  // 取得できなければ終了
  if ( is_null( $queue ) === TRUE ) {
    break;
  } else {
    ;
  }
    
  // Bodyを抽出
  $message = $queue -> body;
    
  // 内部変数にプール
  $queueArray[] = $message;
    
  // キューを削除
  $activeMq -> deleteMessage( $queue );
    
  // 制限数を超えたら終了
  $insertDataCount++;
  if ( $insertDataCount >= 10000 ) {
    break;
  } else {
    ;
  }
    
}


久々にちゃんとZendのソースを読むのはかなり疲れる作業だったけど、色々と仕組みがわかってとても勉強になりましたとさ。

2013年1月30日水曜日

Zend_QueueでActiveMQを使う_その1

DBサーバの負荷を軽減するため、キューサーバを利用し、非同期処理をすることになった。
そこで、Zend_Queueを使い、ActiveMQに対してキューの出し入れをする。

キューを送信するのはマニュアルに従って、
$activeMq = new Zend_Queue( 'Activemq', array( 'name' => '/queue/hoge' ) );
$activeMq -> send( $message );
のような感じに書いたら、とりあえず問題なく動いてくれた。

こいつは予想外に簡単♪と思い今度は受信するために
$activeMq = new Zend_Queue( 'Activemq', array( 'name' => '/queue/hoge' ) );
$msgQueue = $activeMq -> receive( 1 );
とやってみたところ、どうにも挙動がおかしい。
ActiveMQにキューが1件以上ある場合はすぐにレスポンスが返ってくるのだけど、キューが1件もないと5秒程度レスポンスが返ってこない。

Zend_Queueのコンストラクタのパラメータに"timeout"というものが設定できるのだが、その値を変更しても挙動は何も変わらない。
というわけで、Zendの中身を見てみることにした。

Zend_QueueでAcitveMQと接続する流れとしては、
$msgQueue = $activeMq -> receive( 1 );
    public function receive($maxMessages=null, $timeout=null)
    {
        if (($maxMessages !== null) && !is_integer($maxMessages)) {
            require_once 'Zend/Queue/Exception.php';
            throw new Zend_Queue_Exception('$maxMessages must be an integer or null');
        }

        if (($timeout !== null) && !is_integer($timeout)) {
            require_once 'Zend/Queue/Exception.php';
            throw new Zend_Queue_Exception('$timeout must be an integer or null');
        }

        // Default to returning only one message
        if ($maxMessages === null) {
            $maxMessages = 1;
        }

        // Default to standard timeout
        if ($timeout === null) {
            $timeout = $this->getOption(self::TIMEOUT);
        }

        return $this->getAdapter()->receive($maxMessages, $timeout);
    public function receive($maxMessages=null, $timeout=null, Zend_Queue $queue=null)
    {
        if ($maxMessages === null) {
            $maxMessages = 1;
        }
        if ($timeout === null) {
            $timeout = self::RECEIVE_TIMEOUT_DEFAULT;
        }
        if ($queue === null) {
            $queue = $this->_queue;
        }

        // read
        $data = array();

        // signal that we are reading
        if (!$this->_isSubscribed($queue)){
            $this->_subscribe($queue);
        }

        if ($maxMessages > 0) {
            if ($this->_client->canRead()) {
    public function canRead()
    {
        return $this->getConnection()->canRead();
 public function canRead()
    {
        $read   = array($this->_socket);
        $write  = null;
        $except = null;

        return stream_select(
            $read,
            $write,
            $except,
            $this->_options['timeout_sec'],
            $this->_options['timeout_usec']
        ) == 1;
        // see http://us.php.net/manual/en/function.stream-select.php
    }
と、ここまで追って、やっとPHPの標準メソッドに到着。
ふぅ。

調べてみると、このstream_selectというメソッドの第四、第五引数で接続の待ち時間を設定するらしい。
というわけで、この値の初期値を調べてみると、
    const READ_TIMEOUT_DEFAULT_USEC = 0; // 0 microseconds
    const READ_TIMEOUT_DEFAULT_SEC = 5; // 5 seconds
となっていて、これが原因で5秒間レスポンスが返ってこないことが判明。
というわけで、後はこの値を変えてやればいいのだけど、これが絶妙に難しい。
Connection.php側では、
    public function open($scheme, $host, $port, array $options = array())
    {
        $str = $scheme . '://' . $host;
        $this->_socket = fsockopen($str, $port, $errno, $errstr);

        if ($this->_socket === false) {
            // aparently there is some reason that fsockopen will return false
            // but it normally throws an error.
            require_once 'Zend/Queue/Exception.php';
            throw new Zend_Queue_Exception("Unable to connect to $str; error = $errstr ( errno = $errno )");
        }

        stream_set_blocking($this->_socket, 0); // non blocking

        if (!isset($options['timeout_sec'])) {
            $options['timeout_sec'] = self::READ_TIMEOUT_DEFAULT_SEC;
        }
        if (! isset($options['timeout_usec'])) {
            $options['timeout_usec'] = self::READ_TIMEOUT_DEFAULT_USEC;
        }

        $this->_options = $options;

        return true;
    }
となっているので、このメソッドの第四引数で設定できるようになっているのだけど、呼び出し元が
    public function addConnection($scheme, $host, $port, $class = 'Zend_Queue_Stomp_Client_Connection')
    {
        if (!class_exists($class)) {
            require_once 'Zend/Loader.php';
            Zend_Loader::loadClass($class);
        }

        $connection = new $class();

        if ($connection->open($scheme, $host, $port)) {

となっていて、そもそも第四引数が存在しない。
そして、Connection.php側では上記メソッド以外で該当の変数を変更する手段が用意されていない・・・

対応方法としては
  • Connection.phpのREAD_TIMEOUT_DEFAULT_SECの値を書き換える。
  • 継承したクラスを作って頑張る
の2つ。
簡単なのは前者なのだけど、Zendのソースを書き換えて使うというのはかなり抵抗があるので、後者にチャレンジすることに。

継承したクラスを作るのは簡単で、
require_once 'Zend/Queue/Stomp/Client.php';

class My_Queue_Stomp_Client extends Zend_Queue_Stomp_Client {
 
 /**
  * (non-PHPdoc)
  * @see Zend_Queue_Stomp_Client::addConnection()
  */
 public function addConnection($scheme, $host, $port, $class = 'Zend_Queue_Stomp_Client_Connection') {
  
  OutputLog::outLog( OutputLog::INFO, __METHOD__, __LINE__, 'START' );
  
        if (!class_exists($class)) {
            require_once 'Zend/Loader.php';
            Zend_Loader::loadClass($class);
        }

        $connection = new $class();

        // とりあえず0.2秒レスポンスを待つことに
        if ($connection->open($scheme, $host, $port, array( 'timeout_sec' => 0, 'timeout_usec' => 200000 ) ) ) {
            $this->setConnection($connection);
            return true;
        }

        $connection->close();
        return false;
    }
}
のようにしてやればOK。
問題はどうやってこいつを使うようにするか。

かなり長くなってきたので、続きはまた今度。