go实现rocket全流程操作:发送/消费普通消息、发送延时/事务消息

go实现rocket全流程操作:发送/消费普通消息、发送延时/事务消息


发送普通消息:

package main

import (
   "context"
   "fmt"
   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/primitive"
   "github.com/apache/rocketmq-client-go/v2/producer"
   "time"
)

func main() {
   p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.0.104:9876"}))
   if err != nil {
      panic(err)
      return
   }

   err = p.Start()
   if err != nil {
      panic(err)
      return
   }
   for i := 0; i < 1000; i++ {

      res, err := p.SendSync(context.Background(), primitive.NewMessage("imook", []byte("hello000000")))
      if err != nil {
         panic(err)
         return
      }
      fmt.Println(res)
      time.Sleep(1 * time.Second)
   }

   err = p.Shutdown()
   if err != nil {
      panic(err)
      return
   }

}

消费普通消息:

package main

import (
   "context"
   "fmt"
   "time"

   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/consumer"
   "github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
   c, err := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"192.168.0.104:9876"}), consumer.WithGroupName("mxshop"))
   if err != nil {
      panic(err)
      return
   }

   err = c.Start()
   if err != nil {
      panic(err)
      return
   }

   if err = c.Subscribe("imook", consumer.MessageSelector{}, func(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
      for i := range ext {
         fmt.Printf("获取到:%+v\n", string(ext[i].Body))
      }
      return consumer.ConsumeSuccess, nil
   }); err != nil {
      panic(err)
      return
   }

   time.Sleep(time.Hour)
}

发送延时消息:

package main

import (
   "context"
   "fmt"
   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/primitive"
   "github.com/apache/rocketmq-client-go/v2/producer"
   "time"
)

func main() {
   p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.0.104:9876"}))
   if err != nil {
      panic(err)
      return
   }

   err = p.Start()
   if err != nil {
      panic(err)
      return
   }
   for i := 0; i < 1000; i++ {

      msg := primitive.NewMessage("imook", []byte("hello012345678900000"))
      msg.WithDelayTimeLevel(2)
      res, err := p.SendSync(context.Background(), msg)
      if err != nil {
         panic(err)
         return
      }
      fmt.Println(res)
      time.Sleep(1 * time.Second)
   }

   err = p.Shutdown()
   if err != nil {
      panic(err)
      return
   }

}

发送/处理事务消息:

package main

import (
   "context"
   "fmt"
   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/primitive"
   "github.com/apache/rocketmq-client-go/v2/producer"
   "time"
)

type OrderListen struct {
}

func (o *OrderListen) ExecuteLocalTransaction(message *primitive.Message) primitive.LocalTransactionState {
   fmt.Println("本地逻辑执行了")
   time.Sleep(3 * time.Second)
   fmt.Println("本地逻辑执行完了")
   return primitive.UnknowState
}

func (o *OrderListen) CheckLocalTransaction(ext *primitive.MessageExt) primitive.LocalTransactionState {
   fmt.Println("回查执行了")
   return primitive.CommitMessageState
}

func main() {
   p, err := rocketmq.NewTransactionProducer(&OrderListen{}, producer.WithNameServer([]string{"192.168.0.104:9876"}))
   if err != nil {
      panic(err)
      return
   }

   err = p.Start()
   if err != nil {
      panic(err)
      return
   }

   res, err := p.SendMessageInTransaction(context.Background(), primitive.NewMessage("imook", []byte("1111111111111")))
   if err != nil {
      panic(err)
      return
   }
   fmt.Println(res)

   time.Sleep(time.Hour)
   err = p.Shutdown()
   if err != nil {
      panic(err)
      return
   }
}


最后编辑于:2022/02/26作者: 牛逼PHP

发表评论