IBM MQ - PHP Π±ΠΈΠ±Π»ΠΈΠΎΡΠ΅ΠΊΠ° Π΄Π»Ρ Π²Π·Π°ΠΈΠΌΠΎΠ΄Π΅ΠΉΡΡΠ²ΠΈΡ Ρ IBM WebSphere MQ
Π£ΡΡΠ°Π½ΠΎΠ²ΠΊΠ°
ΠΠ»Ρ ΡΠ°Π±ΠΎΡΡ ΡΡΠ΅Π±ΡΠ΅ΡΡΡ Π΄ΠΎΡΠ°Π±ΠΎΡΠ°Π½Π½ΠΎΠ΅ Π΄Π»Ρ PHP7 pecl ΡΠ°ΡΡΠΈΡΠ΅Π½ΠΈΠ΅ mqseries.
composer require rstmpw/ibmmq
ΠΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π½ΠΈΠ΅
Π€ΡΠ½ΠΊΡΠΈΠΈ ΠΌΠΎΠ³ΡΡ Π²ΡΠ±ΡΠ°ΡΡΠ²Π°ΡΡ \RuntimeException ΠΈΠ»ΠΈ \InvalidArgumentException
ΠΠΎΠ»ΡΡΠ΅Π½ΠΈΠ΅/ΠΎΡΠΏΡΠ°Π²ΠΊΠ° ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠΉ ΠΈΠ· ΠΎΡΠ΅ΡΠ΅Π΄ΠΈ
<?php
use rstmpw\ibmmq\MQClient;
use rstmpw\ibmmq\MQMessage;
// Π€ΡΠ½ΠΊΡΠΈΡ-ΠΏΠΎΠΌΠΎΡΠ½ΠΈΠΊ Π΄Π»Ρ ΡΠΎΡΠΌΠΈΡΠΎΠ²Π°Π½ΠΈΡ ΠΏΡΠ°Π²ΠΈΠ»ΡΠ½ΠΎΠΉ ΡΡΡΡΠΊΡΡΡΡ ΠΏΠ°ΡΠ°ΠΌΠ΅ΡΡΠΎΠ² ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΡ.
// ΠΡΠ»ΠΈ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΡΡΡ Π²Π½Π΅ΡΠ½Π΅Π΅ Ρ
ΡΠ°Π½ΠΈΠ»ΠΈΡΠ΅ ΠΏΠ°ΡΠ°ΠΌΠ΅ΡΡΠΎΠ² ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠ΅Π½ΠΈΠΉ, ΡΠΎ ΡΠ°ΠΌ ΡΠ»Π΅Π΄ΡΠ΅Ρ Ρ
ΡΠ°Π½ΠΈΡΡ ΡΠΆΠ΅
// ΡΡΠΎΡΠΌΠΈΡΠΎΠ²Π°Π½Π½ΡΡ ΡΡΡΡΠΊΡΡΡΡ.
$mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414');
// Π‘ΠΎΠ΅Π΄ΠΈΠ½ΡΠ΅ΠΌΡΡ Ρ ΡΠ΅ΡΠ²Π΅ΡΠΎΠΌ
$mqServer = new MQClient($mqConnOpts);
// ΠΡΠΊΡΡΠ²Π°Π΅ΠΌ ΠΎΡΠ΅ΡΠ΅Π΄Ρ
$mqQueue = $mqServer->openQueue('QUEUE.NAME');
// ΠΠ»Ρ ΡΠ΄Π°Π»Π΅Π½Π½ΡΡ
ΠΎΡΠ΅ΡΠ΅Π΄Π΅ΠΉ:
// $mqOueue = $mqServer->openQueue('REMOTE.QUEUE.NAME', [MQSERIES_MQOO_FAIL_IF_QUIESCING, MQSERIES_MQOO_OUTPUT]);
// Π‘ΠΎΠ·Π΄Π°Π΅ΠΌ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅ ΠΈ ΠΎΡΠΏΡΠ°Π²Π»ΡΠ΅ΠΌ Π΅Π³ΠΎ Π² ΠΎΡΠ΅ΡΠ΅Π΄Ρ
$outMessage = new MQMessage('Somebody message '.time());
echo "\n Data: ".$outMessage->data();
$mqQueue->put($outMessage);
echo "\n msgId: ".$outMessage->property('MsgId');
// ΠΠΎΠ»ΡΡΠ°Π΅ΠΌ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅ ΠΈΠ· ΠΎΡΠ΅ΡΠ΅Π΄ΠΈ
$inMessage = $mqQueue->get();
echo "\n Data: ".$inMessage->data();
echo "\n msgId: ".$inMessage->property('MsgId');
// ΠΠ°ΠΊΡΡΠ²Π°Π΅ΠΌ ΠΎΠ±ΡΠ΅ΠΊΡ ΠΈ ΡΠΎΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΠ΅ Ρ ΡΠ΅ΡΠ²ΡΠΎΠΌ
// Π―Π²Π½ΡΠΌ ΠΎΠ±ΡΠ°Π·ΠΎΠΌ ΠΌΠΎΠΆΠ½ΠΎ Π½Π΅ Π·Π°ΠΊΡΡΠ²Π°ΡΡ, Π΄Π΅ΡΡΡΡΠΊΡΠΎΡΡ ΡΠ΄Π΅Π»Π°ΡΡ Π²ΡΠ΅ Π°Π²ΡΠΎΠΌΠ°ΡΠΈΡΠ΅ΡΠΊΠΈ
$mqQueue->close();
unset($mqServer);
ΠΡΠ»ΠΈ Π² ΠΎΡΠ΅ΡΠ΅Π΄Ρ Π½ΡΠΆΠ½ΠΎ ΠΏΠΎΠ»ΠΎΠΆΠΈΡΡ ΡΠΎΠ»ΡΠΊΠΎ ΠΎΠ΄Π½ΠΎ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅, ΡΠΎ ΠΎΡΠ΅ΡΠ΅Π΄Ρ ΠΌΠΎΠΆΠ½ΠΎ ΡΠ²Π½ΠΎ Π½Π΅ ΠΎΡΠΊΡΡΠ²Π°ΡΡ ΠΈ Π²ΠΎΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΡΡΡ ΡΡΠ½ΠΊΡΠΈΠ΅ΠΉ put1()
<?php
use rstmpw\ibmmq\MQClient;
use rstmpw\ibmmq\MQMessage;
$mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414');
$mqServer = new MQClient($mqConnOpts);
// Π‘ΠΎΠ·Π΄Π°Π΅ΠΌ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅
$outMessage = new MQMessage('Somebody message '.time());
$mqServer->put1($outMessage, 'QUEUE.NAME');
echo "\n msgId: ".$outMessage->property('MsgId');
ΠΡΠΏΡΠ°Π²ΠΊΠ° ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠΉ Π² Π΄ΡΡΠ³ΠΎΠΉ ΠΌΠ΅Π½Π΅ΠΆΠ΄Π΅Ρ ΠΎΡΠ΅ΡΠ΅Π΄Π΅ΠΉ
ΠΠ°Ρ QM Π΄ΠΎΠ»ΠΆΠ΅Π½ Π·Π½Π°ΡΡ ΠΊΠ°ΠΊ ΠΎΡΠΏΡΠ°Π²Π»ΡΡΡ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ Π² Π΄ΡΡΠ³ΠΎΠΉ ΠΌΠ΅Π½Π΅Π΄ΠΆΠ΅Ρ (ΡΠ΅ΡΠ΅Π· Π²ΡΠ΄Π΅Π»Π΅Π½Π½ΡΠ΅ ΡΠ΄Π°Π»Π΅Π½Π½ΡΡ ΠΈΠ»ΠΈ ΡΡΠ°Π½ΡΠΏΠΎΡΡΠ½ΡΡ ΠΎΡΠ΅ΡΠ΅Π΄Ρ)
<?php
use rstmpw\ibmmq\MQClient;
use rstmpw\ibmmq\MQMessage;
$mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414');
$mqServer = new MQClient($mqConnOpts);
// Π‘ΠΎΠ·Π΄Π°Π΅ΠΌ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅
$outMessage = new MQMessage('Somebody message '.time());
$mqServer->put1($outMessage, 'QM.REMOT//QUEUE.NAME');
echo "\n msgId: ".$outMessage->property('MsgId');
ΠΠΎΠ»ΡΡΠ΅Π½ΠΈΠ΅ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠΉ ΠΈΠ· ΠΎΡΠ΅ΡΠ΅Π΄ΠΈ Ρ ΠΎΠΆΠΈΠ΄Π°Π½ΠΈΠ΅ΠΌ ΠΈ Π²ΡΠ±ΠΎΡΠΊΠΎΠΉ ΠΏΠΎ CorrelId
<?php
use rstmpw\ibmmq\MQClient;
use rstmpw\ibmmq\MQMessage;
$mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414');
$mqServer = new MQClient($mqConnOpts);
$mqQueue = $mqServer->openQueue('QUEUE.NAME');
// Π£ΡΡΠ°Π½Π°Π²Π»ΠΈΠ²Π°Π΅ΠΌ ΠΎΠΏΡΠΈΠΈ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ ΠΈ Π²ΡΠ±ΠΎΡΠΊΠΈ
$getOpts = [
'Version' => MQSERIES_MQGMO_VERSION_2,
'Options' => [MQSERIES_MQGMO_FAIL_IF_QUIESCING, MQSERIES_MQGMO_WAIT],
'WaitInterval' => 60*1000, //60 sec
'MatchOptions' => [MQSERIES_MQMO_MATCH_CORREL_ID]
];
$MQMD = [
'MsgId' => MQSERIES_MQMI_NONE,
'CorrelId' => 'NeedCorrelId'
];
// ΠΠΎΠ»ΡΡΠ°Π΅ΠΌ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅ ΠΈΠ· ΠΎΡΠ΅ΡΠ΅Π΄ΠΈ
$inMessage = $mqQueue->get($getOpts, $MQMD);
echo "\n Data: ".$inMessage->data();
echo "\n msgId: ".$inMessage->property('MsgId');
echo "\n correlId: ".$inMessage->property('CorrelId');
$mqQueue->close();
unset($mqServer);
ΠΠ°ΠΏΠΈΡΡ/ΡΡΠ΅Π½ΠΈΠ΅ ΡΠ²ΠΎΠΉΡΡΠ² ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠΉ
<?php
use rstmpw\ibmmq\MQClient;
use rstmpw\ibmmq\MQMessage;
$mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414');
$mqServer = new MQClient($mqConnOpts);
// Π‘ΠΎΠ·Π΄Π°Π΅ΠΌ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅ ΠΈ ΡΡΡΠ½Π°Π²Π»ΠΈΠ²Π°Π΅ΠΌ ΡΠ²ΠΎΠΉΡΡΠ²Π°
$outMessage = new MQMessage('Somebody message '.time());
$outMessage->property('Priority', 7);
$outMessage->property('ReplyToQ', 'RESPONSE.QUEUE');
$outMessage->property('ReplyToQMgr', 'RESPONSE.QM');
// ΠΡΠΏΡΠ°Π²Π»ΡΠ΅ΠΌ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅
$mqServer->put1($outMessage, 'QUEUE.NAME');
// ΠΠΎΠ»ΡΡΠ°Π΅ΠΌ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅ ΠΈ ΡΠΌΠΎΡΡΠΈΠΌ ΡΠ²ΠΎΠΉΡΡΠ²Π°
$mqQueue = $mqServer->openQueue('QUEUE.NAME');
$inMessage = $mqQueue->get();
echo "\n Data: ".$inMessage->data();
echo "\n msgId: ".$inMessage->property('MsgId');
echo "\n Priority: ".$inMessage->property('Priority');
echo "\n ReplyToQ: ".$inMessage->property('ReplyToQ');
echo "\n ReplyToQMgr: ".$inMessage->property('ReplyToQMgr');
ΠΡΠ±Π»ΠΈΠΊΠ°ΡΠΈΡ ΡΠΎΠΏΠΈΠΊΠΎΠ²
ΠΠΎΠ»Π½ΠΎΡΡΡΡ ΠΏΠΎΡ ΠΎΠΆΠ° Π½Π° ΠΎΡΠΏΡΠ°Π²ΠΊΡ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ, Π·Π° ΠΈΡΠΊΠ»ΡΡΠ΅Π½ΠΈΠ΅ΠΌ ΡΠΎΠ³ΠΎ ΡΡΠΎ Π²ΠΌΠ΅ΡΡΠΎ ΠΈΠΌΠ΅Π½ΠΈ ΠΎΡΠ΅ΡΠ΅Π΄ΠΈ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΡΡΡ ΠΈΠΌΡ ΡΠΎΠΏΠΈΠΊΠ° ΠΈ ΠΈΠ· ΡΠΎΠΏΠΈΠΊΠ° Π½Π΅Π»ΡΠ·Ρ ΡΠΈΡΠ°ΡΡ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ. ΠΠΌΡ ΡΠΎΠΏΠΈΠΊΠ° ΠΏΡΠ΅Π΄ΡΡΠ²Π΄ΡΠ΅Ρ ΡΠΎΠ±ΠΎΠΉ ΡΡΡΠΎΠΊΡ ΡΠ°Π·Π΄Π΅Π»Π΅Π½Π½ΡΡ '/', Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ 'news/sport/nascar'.
<?php
use rstmpw\ibmmq\MQClient;
use rstmpw\ibmmq\MQMessage;
$mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414');
$mqServer = new MQClient($mqConnOpts);
// ΠΡΠΊΡΡΠ²Π°Π΅ΠΌ ΡΠΎΠΏΠΈΠΊ
$mqQueue = $mqServer->openTopic('news/sport/nascar');
// Π‘ΠΎΠ·Π΄Π°Π΅ΠΌ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅ ΠΈ ΠΏΡΠ±Π»ΠΈΠΊΡΠ΅ΠΌ Π΅Π³ΠΎ Π² ΡΠΎΠΏΠΈΠΊ
$outMessage = new MQMessage('Latest news about nascar racing');
$mqQueue->put($outMessage);
Π’Π°ΠΊΠΆΠ΅ ΠΏΠΎΠ΄Π΄Π΅ΡΠΆΠΈΠ²Π°Π΅ΡΡΡ ΡΠΎΠΊΡΠ°ΡΠ΅Π½Π½ΡΠΉ Π²ΡΠ·ΠΎΠ² ΡΠ΅ΡΠ΅Π· put1, Π·Π° ΠΈΡΠΊΠ»ΡΡΠ΅Π½ΠΈΠ΅ΠΌ ΡΠΎΠ³ΠΎ ΡΡΠΎ ΠΈΠΌΡ ΡΠΎΠΏΠΈΠΊΠ° Π΄ΠΎΠ»ΠΆΠ½ΠΎ Π½Π°ΡΠΈΠ½Π°ΡΡΡΡ Ρ 'TOPIC::'
<?php
use rstmpw\ibmmq\MQClient;
use rstmpw\ibmmq\MQMessage;
$mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414');
$mqServer = new MQClient($mqConnOpts);
// Π‘ΠΎΠ·Π΄Π°Π΅ΠΌ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅ ΠΈ ΠΏΡΠ±Π»ΠΈΠΊΡΠ΅ΠΌ Π΅Π³ΠΎ Π² ΡΠΎΠΏΠΈΠΊ
$outMessage = new MQMessage('Latest news about nascar racing');
$mqServer->put1($outMessage, 'TOPIC::news/sport/nascar/winners/top/1');
Π’ΡΠ°Π½Π·Π°ΠΊΡΠΈΠΈ
Π Π°Π±ΠΎΡΠ°ΡΡ Π°Π½Π°Π»ΠΎΠ³ΠΈΡΠ½ΠΎ ΡΡΠ°Π½Π·Π°ΠΊΡΠΈΡΠΌ Π² Π‘Π£ΠΠ
<?php
use rstmpw\ibmmq\MQClient;
use rstmpw\ibmmq\MQMessage;
$mqConnOpts = MQClient::makeConnOpts('QM.NAME', '172.18.0.7', '1414');
$mqServer = new MQClient($mqConnOpts);
// ΠΡΠΏΡΠ°Π²Π»ΡΠ΅ΠΌ 2 ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ Π²Π½Π΅ ΡΡΠ°Π½Π·Π°ΠΊΡΠΈΠΈ
$outMessage = new MQMessage('Put message w/o transaction');
$mqServer->put1($outMessage, 'OTHER.QUEUE.NAME');
$mqServer->put1($outMessage, 'OTHER.QUEUE.NAME');
// ΠΡΠΊΡΡΠ²Π΅ΠΌ ΡΡΠ°Π½Π·Π°ΠΊΡΠΈΡ
$mqServer->begin();
// ΠΠΎΠ»ΡΡΠ°Π΅ΠΌ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅ ΠΈΠ· ΠΎΡΠ΅ΡΠ΅Π΄ΠΈ OTHER.QUEUE.NAME
$mqQueue = $mqServer->openQueue('OTHER.QUEUE.NAME');
$mqQueue->get();
// ΠΡΠΏΡΠ°Π²Π»ΡΠ΅ΠΌ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅ Π² ΠΎΡΠ΅ΡΠ΅Π΄Ρ QUEUE.NAME Ρ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π½ΠΈΠ΅ΠΌ put1
$mqServer->put1(new MQMessage('MSG1'), 'QUEUE.NAME');
// ΠΡΠΏΡΠ°Π²Π»ΡΠ΅ΠΌ Π΅ΡΠ΅ 2 ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ Π² ΠΎΡΠ΅ΡΠ΅Π΄Ρ QUEUE.NAME ΠΈ ΠΎΠ΄Π½ΠΎ ΠΎΡΡΡΠ΄Π° Π·Π°Π±ΠΈΡΠ°Π΅ΠΌ ΡΠ΅ΡΠ΅Π· ΡΠ²Π½ΠΎΠ΅ ΠΎΡΠΊΡΡΡΠΈΠ΅ ΠΎΡΠ΅ΡΠ΅Π΄ΠΈ
$mqQueueOut = $mqServer->openQueue('QUEUE.NAME');
$mqQueueOut->put(new MQMessage('MSG2'));
$mqQueueOut->put(new MQMessage('MSG2'));
$mqQueueOut->get();
// Π€ΠΈΠΊΡΠΈΡΡΠ΅ΠΌ ΡΡΠ°Π½Π·Π°ΠΊΡΠΈΡ
$mqServer->commit();
// ΠΡΠ»ΠΈ ΡΡΠΈΡΠ°ΡΡ, ΡΡΠΎ ΠΏΠ΅ΡΠ΅Π΄ Π·Π°ΠΏΡΡΠΊΠΎΠΌ ΡΠΊΡΠΈΠΏΡΠ° ΠΎΡΠ΅ΡΠ΅Π΄ΠΈ Π±ΡΠ»ΠΈ ΠΏΡΡΡΠ΅, ΡΠΎ Π² ΠΈΡΠΎΠ³Π΅ Π² ΠΎΡΠ΅ΡΠ΅Π΄ΠΈ OTHER.QUEUE.NAME Π±ΡΠ΄Π΅Ρ ΠΎΠ΄Π½ΠΎ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅,
// a Π² ΠΎΡΠ΅ΡΠ΅Π΄ΠΈ QUEUE.NAME - 2 ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ.
// ΠΡΠ»ΠΈ ΡΠ²Π½ΡΠΌ ΠΎΠ±ΡΠ°Π·ΠΎΠΌ ΠΎΡΠΊΠ°ΡΠΈΡΡ ΡΡΠ°Π½Π·Π°ΠΊΡΠΈΡ
// $mqServer->rollback();
// ΠΈΠ»ΠΈ ΡΠ²Π½ΠΎ Π΅Π΅ Π½Π΅ Π·Π°ΠΊΠΎΠΌΠΌΠΈΡΠΈΡΡ, ΡΠΎ Π² ΠΎΡΠ΅ΡΠ΅Π΄ΠΈ OTHER.QUEUE.NAME Π±ΡΠ΄Π΅Ρ 2 ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ, Π° QUEUE.NAME ΠΎΡΡΠ°Π½Π΅ΡΡΡ ΠΏΡΡΡΠΎΠΉ.
ΠΠ°Π²ΠΈΡΠΈΠΌΠΎΡΡΠΈ
- ΠΠΈΠ±Π»ΠΈΠΎΡΠ΅ΠΊΠ° ΡΠ°Π±ΠΎΡΠ°Π΅Ρ Ρ PHP 7.0 ΠΈ Π²ΡΡΠ΅.
- ΠΠ»Ρ ΡΠ°Π±ΠΎΡΡ ΡΡΠ΅Π±ΡΠ΅ΡΡΡ Π΄ΠΎΡΠ°Π±ΠΎΡΠ°Π½Π½ΠΎΠ΅ pecl ΡΠ°ΡΡΠΈΡΠ΅Π½ΠΈΠ΅ mqseries.