go实现rocket全流程操作:发送/消费普通消息、发送延时/事务消息
发送普通消息:
package main import ( "context" "fmt" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" "time" ) func main() { p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.0.104:9876"})) if err != nil { panic(err) return } err = p.Start() if err != nil { panic(err) return } for i := 0; i < 1000; i++ { res, err := p.SendSync(context.Background(), primitive.NewMessage("imook", []byte("hello000000"))) if err != nil { panic(err) return } fmt.Println(res) time.Sleep(1 * time.Second) } err = p.Shutdown() if err != nil { panic(err) return } }
消费普通消息:
package main import ( "context" "fmt" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { c, err := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"192.168.0.104:9876"}), consumer.WithGroupName("mxshop")) if err != nil { panic(err) return } err = c.Start() if err != nil { panic(err) return } if err = c.Subscribe("imook", consumer.MessageSelector{}, func(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for i := range ext { fmt.Printf("获取到:%+v\n", string(ext[i].Body)) } return consumer.ConsumeSuccess, nil }); err != nil { panic(err) return } time.Sleep(time.Hour) }
发送延时消息:
package main import ( "context" "fmt" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" "time" ) func main() { p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.0.104:9876"})) if err != nil { panic(err) return } err = p.Start() if err != nil { panic(err) return } for i := 0; i < 1000; i++ { msg := primitive.NewMessage("imook", []byte("hello012345678900000")) msg.WithDelayTimeLevel(2) res, err := p.SendSync(context.Background(), msg) if err != nil { panic(err) return } fmt.Println(res) time.Sleep(1 * time.Second) } err = p.Shutdown() if err != nil { panic(err) return } }
发送/处理事务消息:
package main import ( "context" "fmt" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" "time" ) type OrderListen struct { } func (o *OrderListen) ExecuteLocalTransaction(message *primitive.Message) primitive.LocalTransactionState { fmt.Println("本地逻辑执行了") time.Sleep(3 * time.Second) fmt.Println("本地逻辑执行完了") return primitive.UnknowState } func (o *OrderListen) CheckLocalTransaction(ext *primitive.MessageExt) primitive.LocalTransactionState { fmt.Println("回查执行了") return primitive.CommitMessageState } func main() { p, err := rocketmq.NewTransactionProducer(&OrderListen{}, producer.WithNameServer([]string{"192.168.0.104:9876"})) if err != nil { panic(err) return } err = p.Start() if err != nil { panic(err) return } res, err := p.SendMessageInTransaction(context.Background(), primitive.NewMessage("imook", []byte("1111111111111"))) if err != nil { panic(err) return } fmt.Println(res) time.Sleep(time.Hour) err = p.Shutdown() if err != nil { panic(err) return } }
发表评论