MQBindlib 1.0.4

dotnet add package MQBindlib --version 1.0.4                
NuGet\Install-Package MQBindlib -Version 1.0.4                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="MQBindlib" Version="1.0.4" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add MQBindlib --version 1.0.4                
#r "nuget: MQBindlib, 1.0.4"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install MQBindlib as a Cake Addin
#addin nuget:?package=MQBindlib&version=1.0.4

// Install MQBindlib as a Cake Tool
#tool nuget:?package=MQBindlib&version=1.0.4                

ZmqBindlib

zmq常用封装

使用方法

基本使用

1.简单请求回复

 ZmqRequest request = new ZmqRequest();
            request.RemoteAddress = "tcp://127.0.0.1:5566";
            request.ClientFlage = "A";
            int num = 0;
            while (true)
            {
                //   Thread.Sleep(1000);
                //string msg = request.Request("hi");
                Person p=  request.Request<Person,Person>(new Person { Name = "jin", Description = "请求", Id = num++, Title = "rr" });
                Console.WriteLine(p.Description+p.Name);
            }
    ZmqResponse rep = new ZmqResponse();
            rep.LocalAddress = "tcp://127.0.0.1:5566";
            rep.Start();
            int num = 0;
            //rep.ByteReceived += (sender, e) =>
            //{
            //    Console.WriteLine(System.Text.Encoding.Default.GetString(e));
            //    rep.Response("word"+num++);
            //};
            rep.StringReceived += (sender, e) =>
            {
                Console.WriteLine(e);
                if (e == "hi")
                {
                    Thread.Sleep(1000);
                }
                rep.Response("word" + num++);
            };

2.异步下的请求回复,类似TCP,支持多请求

  server =new EhoServer();
            server.RouterAddress = "tcp://127.0.0.1:6666";//服务地址,请求的远端地址
            //  server.ByteReceived += Server_ByteReceived;
          // server.StringReceived += Server_StringReceived1; 
            server.Start();
 private static void Server_StringReceived1(object? sender, RspSocket<string> e)
        {
            Console.WriteLine(e.Message);
            if (e.Message == "hi")
            {
              //  Thread.Sleep(4000);
                e.Response("jinyu");
                return;
            }
            e.Response("word");
        }
		 private static  void recvice()
        {
            while (true)
            {
                var ss = server.GetMsg<Person>();
                ss.Message.Description = "回复"+ss.Message.Id;
                ss.Response(ss.Message);
            }
        }

3.订阅发布

	````
	  ZmqSubscriber sub = new ZmqSubscriber();
        sub.Address = new string[] { localaddes };
        sub.Subscribe("A");
       // sub.ByteReceived += Sub_ByteReceived;
        sub.StringReceived += Sub_StringReceived;
		  ZmqPublisher pub = new ZmqPublisher();
        pub.LocalAddress =localaddes;
       // pub.IsProxy = true; 是否使用中间代理
        int num = 0;
        while (true)
        {
           // Thread.Sleep(1000);
            pub.Publish("A", "ssss"+num++);
        }
		static void Proxy()
    {
	//中间代理
        ZmqDDSProxy.PubAddress = "tcp://127.0.0.1:7771";//注意,客户端订阅此地址
        ZmqDDSProxy.SubAddress = "tcp://127.0.0.1:7772";//客户端发布此地址
        ZmqDDSProxy.Start();
    }
    ``````````

4.仿照kafka订阅(pull模式)

	````
	  ZmqSubscriberGroup zmqSubscriber=new ZmqSubscriberGroup();
        zmqSubscriber.Address = "tcp://192.168.237.55:6666";
       // zmqSubscriber.IsDDS= true;//高可用
        zmqSubscriber.DataOffset = DataModel.Earliest;
       // zmqSubscriber.Indenty = "test";//订阅在不同分组
        zmqSubscriber.Subscribe("A");
        zmqSubscriber.StringReceived += ZmqSubscriber_StringReceived;
	 ZmqPublisher pub = new ZmqPublisher();
        pub.LocalAddress =localaddes;
       // pub.IsProxy = true; 是否使用中间代理
        int num = 0;
        while (true)
        {
           // Thread.Sleep(1000);
            pub.Publish("A", "ssss"+num++);
        }
	 static void TestClusterSub()
    {
        ZmqPullProxy.PubAddress = "tcp://192.168.237.55:6666";
        ZmqPullProxy.SubAddress = "tcp://192.168.237.55:6667";
        //ZmqPullProxy.IsCluster = true;//高可用
        // ZmqPullProxy.IsStorage = true;//是否存储数据
        bool isret = false;
        do
        {
            isret = ZmqPullProxy.Start(); 
            Thread.Sleep(1000);
        }
        while (!isret);
    }
    ``````````

中心高可用部署

1.推荐方式 使用IP漂移:

  1. windows
    使用DNS+VLS;Panguha软件 2.Linux 使用keppalive

2.使用封装 该功能前提是可以使用广播,可以允许少量数据丢失;
(1)请求返回模式
中心节点:

   ```````````
      EhoServer eho = new EhoServer();
        eho.IsCluster = true;//高可用
        eho.DealerAddress = "inproc://server";
        eho.RouterAddress = "tcp://127.0.0.1:5550";
        eho.StringReceived += EhoServer_StringReceived;
        eho.Start();

  `````````````

客户端:与单个一致

(2)订阅发布

中心:

      ZmqDDSProxy.PubAddress = "tcp://127.0.0.1:2222";
            ZmqDDSProxy.SubAddress = "tcp://127.0.0.1:4444";
            ZmqDDSProxy.IsCluster=true;
         ZmqDDSProxy.Start();

发布端:

            ZmqPublisher pub = new ZmqPublisher();
            pub.Address = "tcp://127.0.0.1:5678";
            pub.IsProxy = true; //是否使用中间代理
            pub.IsDDS = true;//高可用启动
            int num = 0;
            while (true)
            {
                Thread.Sleep(1000);
                try
                {
                    pub.Publish("A", "ssss" + num++);
                }catch(Exception ex)
                {
                    Console.WriteLine(ex.ToString());
                }

            }



订阅端:

              ZmqSubscriber sub = new ZmqSubscriber();
            sub.Address = new string[] { "tcp://127.0.0.1:1234" };
            sub.IsDDS = true;//高可用启动
            sub.Subscribe("");
           // sub.ByteReceived += Sub_ByteReceived;
            sub.StringReceived += Sub_StringReceived;

对于发布订阅,中心何发布订阅端都需要启动高可用,会刷新地址

(3)负载均衡式订阅发布(该模式是仿照kafka功能的,Pull模式) 中心:

            ZmqPullProxy.PubAddress = "tcp://127.0.0.1:2222";
            ZmqPullProxy.SubAddress = "tcp://127.0.0.1:4444";
            ZmqPullProxy.IsCluster = true;//高可用
            ZmqPullProxy.Start(); //注意方法,启动和另外发布订阅方法不同

发布端:和前面一样
订阅端:

           ZmqSubscriberGroup zmqSubscriber=new ZmqSubscriberGroup();
            zmqSubscriber.Address = "tcp://127.0.0.1:1234";
            zmqSubscriber.IsDDS= true;//高可用
           // zmqSubscriber.Indenty = "test";//订阅在不同分组
            zmqSubscriber.Subscribe("A");
            zmqSubscriber.StringReceived += ZmqSubscriber_StringReceived;

(4)kafka封装

             KafkaPublisher kafkaPublisher = new KafkaPublisher();
                int num = 0;
                while (true)
                {
                    Thread.Sleep(1000);
                    kafkaPublisher.Push("A", "SSSSS"+num++);
                }
                KafkaSubscriber  kafkaSubscriber = new KafkaSubscriber();
                kafkaSubscriber.Subscriber("A");
                kafkaSubscriber.Consume(p =>
                {
                    if(p==null)
                    {
                        return;
                    }
                    Console.WriteLine(string.Format("Received message at {0}:{1}", p.Topic, p.Value));
                
                });

说明 1.接收数据一端,定义了2个事件一个方法,顺序是ByteReceived、StringReceived、GetMsg<T>()方法。一旦前一个实现,后面就无效 2.pull模式订阅增加了数据存储

Product Compatible and additional computed target framework versions.
.NET net7.0 is compatible.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.0.4 258 11/13/2023
1.0.3 140 10/30/2023
1.0.2 146 10/28/2023
1.0.1 138 10/24/2023
1.0.0 136 10/23/2023

优化传输