備忘録

備忘録

C#でActiveMQを利用する方法

Ⅰ. はじめに

タイトルの通り「C#ActiveMQを利用する方法」です。

CentOSActiveMQをインストールする方法
https://kagasu.hatenablog.com/entry/2019/04/08/195932

Ⅱ. やり方

1. NuGet からライブラリをインストールする
Install-Package Apache.NMS.ActiveMQ.NetStd
2. サンプルプログラムを書く

Consumer.cs

using Apache.NMS;
using Apache.NMS.Util;
using System;

namespace ActiveMQTest
{
  class Program
  {
    public static void Main(string[] args)
    {
      var connecturi = new Uri("activemq:tcp://127.0.0.1:61616");
      var factory = new NMSConnectionFactory(connecturi);

      using (var connection = factory.CreateConnection("user", "password2"))
      using (var session = connection.CreateSession())
      {
        var destination = SessionUtil.GetDestination(session, "queue://test001");

        using (var consumer = session.CreateConsumer(destination))
        {
          consumer.Listener += new MessageListener(OnMessage);
          Thread.Sleep(100); // すぐに connection.Start するとうまく動作しない場合がある
          connection.Start();

          Console.ReadLine();
        }
      }
    }

    static void OnMessage(IMessage receivedMsg)
    {
      var msg = (ITextMessage)receivedMsg;

      // Console.WriteLine(msg.NMSMessageId);
      Console.WriteLine(msg.Text);
    }
  }
}

Producer.cs

using Apache.NMS;
using Apache.NMS.Util;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace ActiveMQTest
{
  class Program
  {
    public static void Main(string[] args)
    {
      var connecturi = new Uri("activemq:tcp://127.0.0.1:61616");
      var factory = new NMSConnectionFactory(connecturi);

      using (var connection = factory.CreateConnection("user", "password2"))
      using (var session = connection.CreateSession())
      {
        var destination = SessionUtil.GetDestination(session, "queue://test001");

        using (var producer = session.CreateProducer(destination))
        {
          connection.Start();
          producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
          producer.RequestTimeout = TimeSpan.FromSeconds(10);
          
          for(var i = 0; i < int.MaxValue; i++)
          {
            // 1秒ごとにメッセージを送信する
            var request = session.CreateTextMessage($"Hello {i}");
            // request.Properties["myHeader"] = "hoge";
            producer.Send(request);
            Console.WriteLine("send ok");

            Thread.Sleep(TimeSpan.FromSeconds(1));
          }
        }
      }
    }
  }
}
実行結果

f:id:kagasu:20190408224459p:plain