DBサーバの負荷を軽減するため、キューサーバを利用し、非同期処理をすることになった。
そこで、Zend_Queueを使い、ActiveMQに対してキューの出し入れをする。
キューを送信するのはマニュアルに従って、
$activeMq = new Zend_Queue( 'Activemq', array( 'name' => '/queue/hoge' ) );
$activeMq -> send( $message );
のような感じに書いたら、とりあえず問題なく動いてくれた。
こいつは予想外に簡単♪と思い今度は受信するために
$activeMq = new Zend_Queue( 'Activemq', array( 'name' => '/queue/hoge' ) );
$msgQueue = $activeMq -> receive( 1 );
とやってみたところ、どうにも挙動がおかしい。
ActiveMQにキューが1件以上ある場合はすぐにレスポンスが返ってくるのだけど、キューが1件もないと5秒程度レスポンスが返ってこない。
Zend_Queueのコンストラクタのパラメータに"timeout"というものが設定できるのだが、その値を変更しても挙動は何も変わらない。
というわけで、Zendの中身を見てみることにした。
Zend_QueueでAcitveMQと接続する流れとしては、
$msgQueue = $activeMq -> receive( 1 );
↓
public function receive($maxMessages=null, $timeout=null)
{
if (($maxMessages !== null) && !is_integer($maxMessages)) {
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception('$maxMessages must be an integer or null');
}
if (($timeout !== null) && !is_integer($timeout)) {
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception('$timeout must be an integer or null');
}
// Default to returning only one message
if ($maxMessages === null) {
$maxMessages = 1;
}
// Default to standard timeout
if ($timeout === null) {
$timeout = $this->getOption(self::TIMEOUT);
}
return $this->getAdapter()->receive($maxMessages, $timeout);
↓
public function receive($maxMessages=null, $timeout=null, Zend_Queue $queue=null)
{
if ($maxMessages === null) {
$maxMessages = 1;
}
if ($timeout === null) {
$timeout = self::RECEIVE_TIMEOUT_DEFAULT;
}
if ($queue === null) {
$queue = $this->_queue;
}
// read
$data = array();
// signal that we are reading
if (!$this->_isSubscribed($queue)){
$this->_subscribe($queue);
}
if ($maxMessages > 0) {
if ($this->_client->canRead()) {
↓
public function canRead()
{
return $this->getConnection()->canRead();
↓
public function canRead()
{
$read = array($this->_socket);
$write = null;
$except = null;
return stream_select(
$read,
$write,
$except,
$this->_options['timeout_sec'],
$this->_options['timeout_usec']
) == 1;
// see http://us.php.net/manual/en/function.stream-select.php
}
と、ここまで追って、やっとPHPの標準メソッドに到着。
ふぅ。
調べてみると、このstream_selectというメソッドの第四、第五引数で接続の待ち時間を設定するらしい。
というわけで、この値の初期値を調べてみると、
const READ_TIMEOUT_DEFAULT_USEC = 0; // 0 microseconds
const READ_TIMEOUT_DEFAULT_SEC = 5; // 5 seconds
となっていて、これが原因で5秒間レスポンスが返ってこないことが判明。
というわけで、後はこの値を変えてやればいいのだけど、これが絶妙に難しい。
Connection.php側では、
public function open($scheme, $host, $port, array $options = array())
{
$str = $scheme . '://' . $host;
$this->_socket = fsockopen($str, $port, $errno, $errstr);
if ($this->_socket === false) {
// aparently there is some reason that fsockopen will return false
// but it normally throws an error.
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception("Unable to connect to $str; error = $errstr ( errno = $errno )");
}
stream_set_blocking($this->_socket, 0); // non blocking
if (!isset($options['timeout_sec'])) {
$options['timeout_sec'] = self::READ_TIMEOUT_DEFAULT_SEC;
}
if (! isset($options['timeout_usec'])) {
$options['timeout_usec'] = self::READ_TIMEOUT_DEFAULT_USEC;
}
$this->_options = $options;
return true;
}
となっているので、このメソッドの第四引数で設定できるようになっているのだけど、呼び出し元が
public function addConnection($scheme, $host, $port, $class = 'Zend_Queue_Stomp_Client_Connection')
{
if (!class_exists($class)) {
require_once 'Zend/Loader.php';
Zend_Loader::loadClass($class);
}
$connection = new $class();
if ($connection->open($scheme, $host, $port)) {
となっていて、そもそも第四引数が存在しない。
そして、Connection.php側では上記メソッド以外で該当の変数を変更する手段が用意されていない・・・
対応方法としては
- Connection.phpのREAD_TIMEOUT_DEFAULT_SECの値を書き換える。
- 継承したクラスを作って頑張る
の2つ。
簡単なのは前者なのだけど、Zendのソースを書き換えて使うというのはかなり抵抗があるので、後者にチャレンジすることに。
継承したクラスを作るのは簡単で、
require_once 'Zend/Queue/Stomp/Client.php';
class My_Queue_Stomp_Client extends Zend_Queue_Stomp_Client {
/**
* (non-PHPdoc)
* @see Zend_Queue_Stomp_Client::addConnection()
*/
public function addConnection($scheme, $host, $port, $class = 'Zend_Queue_Stomp_Client_Connection') {
OutputLog::outLog( OutputLog::INFO, __METHOD__, __LINE__, 'START' );
if (!class_exists($class)) {
require_once 'Zend/Loader.php';
Zend_Loader::loadClass($class);
}
$connection = new $class();
// とりあえず0.2秒レスポンスを待つことに
if ($connection->open($scheme, $host, $port, array( 'timeout_sec' => 0, 'timeout_usec' => 200000 ) ) ) {
$this->setConnection($connection);
return true;
}
$connection->close();
return false;
}
}
のようにしてやればOK。
問題はどうやってこいつを使うようにするか。
かなり長くなってきたので、続きはまた今度。