怎么使用go带缓冲chan实现消息队列功能

其他教程   发布日期:2023年07月17日   浏览次数:559

本篇内容介绍了“怎么使用go带缓冲chan实现消息队列功能”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

1、Channels 定义

通道是一种支持多类型的管道,您可以通过它使用通道运算符 <- 发送和接收值。

数据沿箭头方向流动。

  1. ch <- v // Send v to channel ch.
  2. v := <-ch // Receive from ch, and
  3. // assign value to v.

与 maps 和 slices 一样,通道必须在使用前创建:

  1. ch := make(chan int)

默认情况下,发送和接收阻塞,直到另一方准备就绪。
这允许 goroutines 在没有显式锁或条件变量的情况下进行同步。

  1. package main
  2. import "fmt"
  3. func sum(s []int, c chan int) {
  4. sum := 0
  5. for _, v := range s {
  6. sum += v
  7. }
  8. c <- sum // send sum to c
  9. }
  10. func main() {
  11. s := []int{7, 2, 8, -9, 4, 0}
  12. c := make(chan int)
  13. go sum(s[:len(s)/2], c)
  14. go sum(s[len(s)/2:], c)
  15. x, y := <-c, <-c // receive from c
  16. fmt.Println(x, y, x+y)
  17. }

2、chan 常用操作

  • 无缓冲区: 存入读取一次,存入后未取,再存入就会堵塞,同样未存,就取也会堵塞。

  • 有缓冲区: 只有当缓冲区满了,才会堵塞存;只有缓冲区空时,才会堵塞取。

  • len(channel) 返回缓冲区现有数据长度

  • cap(channel) 返回缓冲区的大小

  • close(channel) 关闭 channel,关闭后,读取不到数据。如下,如果其他协程关掉 channel 则会跳出循环

3、带缓冲chan实现消息队列功能

  1. // 监测数据结构体
  2. type Msg struct {
  3. Timestamp int64
  4. Content string
  5. }
  6. // 用 chan 模拟队列,队列的元素为 Msg 类型
  7. var SyncQueen chan Msg
  8. // 必须初始化才能使用。初始化一个容量为1024的 chan。chan 满时会阻塞
  9. func init() {
  10. SyncQueen = make(chan Msg, 1024)
  11. }
  1. // 队列消费者
  2. func Consumer() {
  3. defer func() {
  4. if err := recover(); err != nil {
  5. fmt.Println(err)
  6. }
  7. }()
  8. for {
  9. // chan 内无消息则阻塞
  10. msg := <-SyncQueen
  11. fmt.Println(msg.Content)
  12. }
  13. }
  1. // 队列生产者
  2. func Producer() {
  3. for {
  4. msg := Msg(time.now().Unix(), "hello")
  5. // 发送消息到 chan
  6. SyncQueen <- msg
  7. time.Sleep(2 time.Second)
  8. }
  9. }

重点

多协程使用chan是并发安全的,以下展示一个简单的例子:

  1. // 定义类型为 int 的 chan
  2. var chanNums chan int
  3. // chan 的消费者,用户后续多协程
  4. // 目的:数组里存储了10000个数字,多个协程并行计算后,把和加起来
  5. func consumer(sum *int) int {
  6. for {
  7. v := <-chanNums
  8. *sum += v
  9. }
  10. }
  11. //-------------------------------------
  12. func main() {
  13. var a [10000]int
  14. for i := 0; i < 10000; i++ {
  15. a[i] = i + 1
  16. }
  17. chanNums = make(chan int, 10000)
  18. for i := 0; i < 10000; i++ {
  19. chanNums <- (i + 1)
  20. }
  21. var s1, s2, s3, s4, s5 int = 0, 0, 0, 0, 0
  22. go consumer(&s1)
  23. go consumer(&s2)
  24. go consumer(&s3)
  25. go consumer(&s4)
  26. go consumer(&s5)
  27. for {
  28. time.Sleep(5 * time.Second)
  29. break
  30. }
  31. fmt.Println("s1=", s1, "s2=", s2, "s3=", s3, "s4=", s4, "s5=", s5)
  32. fmt.Println("sum=", s1+s2+s3+s4+s5)
  33. }
  34. // 输出
  35. s1= 10818438 s2= 12073966 s3= 9044041 s4= 11509634 s5= 6558921
  36. sum= 50005000

以上就是怎么使用go带缓冲chan实现消息队列功能的详细内容,更多关于怎么使用go带缓冲chan实现消息队列功能的资料请关注九品源码其它相关文章!