BrokerService.php 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. <?php
  2. namespace App\Services\Broker;
  3. use Nats\ConnectionOptions;
  4. use Nats\EncodedConnection;
  5. use Nats\Encoders\JSONEncoder;
  6. class BrokerService
  7. {
  8. /**
  9. * @var EncodedConnection
  10. */
  11. protected $client;
  12. public function __construct()
  13. {
  14. $encoder = new JSONEncoder();
  15. $options = new ConnectionOptions([
  16. 'host' => config('services.micro.broker_host'),
  17. 'port' => config('services.micro.broker_port'),
  18. ]);
  19. $this->client = new EncodedConnection($options, $encoder);
  20. $this->client->connect();
  21. }
  22. // 订阅消息
  23. public function subscribe($topic, \Closure $callback)
  24. {
  25. $this->client->subscribe($topic, $callback);
  26. }
  27. // 发布消息
  28. public function publish($topic, $message)
  29. {
  30. $this->client->publish($topic, $message);
  31. }
  32. // 同步请求
  33. public function request($topic, $message, \Closure $callback)
  34. {
  35. $this->client->request($topic, $message, $callback);
  36. }
  37. // 等待消息
  38. public function wait($number = 0)
  39. {
  40. $this->client->wait($number);
  41. }
  42. }