Channel 是 Go 语言内建的 first-class 类型,也是 Go 与众不同的特性之一。它不是通过库提供的——而是直接内置在语言规范中,地位之高在编程语言中比较罕见。Channel 的设计源自 CSP(Communicating Sequential Process)模型:不要通过共享内存来通信,要通过通信来共享内存。

一、Channel 基本用法

声明和初始化

// 声明
var ch chan int          // 双向 channel
var sendCh chan<- int    // 只发送
var recvCh <-chan int    // 只接收

// 初始化(必须用 make)
ch = make(chan int)      // 无缓冲
ch = make(chan int, 10)  // 有缓冲,容量 10

三种操作

┌──────────────────────────────────────────────────────────────┐
│                   Channel 的三种操作                          │
├──────────────────────────────────────────────────────────────┤
│  ch <- value     发送:把 value 放入 channel                 │
│  value := <-ch   接收:从 channel 取出值                     │
│  close(ch)       关闭:关闭 channel                          │
└──────────────────────────────────────────────────────────────┘

无缓冲 vs 有缓冲

无缓冲 channel (make(chan int)):

  发送者              channel              接收者
      │                 │                     │
      │── send ────→    │                     │
      │   (阻塞...)     │                     │
      │                 │    ←── recv ────────│
      │   (解除阻塞) ──→│──→ (收到值)         │
      │                 │                     │

  发送和接收必须同时就绪,类似"面对面交接"。


有缓冲 channel (make(chan int, 3)):

  发送者              buffer [_ _ _]         接收者
      │                 │                     │
      │── send ────→ [v _ _] (不阻塞)        │
      │── send ────→ [v v _] (不阻塞)        │
      │── send ────→ [v v v] (不阻塞)        │
      │── send ────→ [v v v] (阻塞! 满了)    │
      │                 │    ←── recv ────────│
      │   (解除阻塞) ──→│──→ (取出一个值)     │
      │                 │                     │

  缓冲未满时发送不阻塞,缓冲为空时接收阻塞。

二、Channel 的行为表

对 channel 执行不同操作,在不同状态下的行为:

操作nil channel已关闭 channel正常 channel
ch <-永久阻塞panic阻塞或成功发送
<-ch永久阻塞返回零值 + false阻塞或成功接收
close()panicpanic成功关闭
len()0缓冲中剩余元素数缓冲中当前元素数
cap()0缓冲容量缓冲容量

三个 panic 要牢记

┌───────────────────────────────────────────────────────┐
│  会 panic 的操作                                      │
├───────────────────────────────────────────────────────┤
│  1. 向已关闭的 channel 发送数据                       │
│  2. close 一个 nil channel                            │
│  3. close 一个已经关闭的 channel                      │
└───────────────────────────────────────────────────────┘

判断 channel 是否关闭

// 方式 1:comma-ok 模式
v, ok := <-ch
if !ok {
    // channel 已关闭
}

// 方式 2:for range(channel 关闭时自动退出循环)
for v := range ch {
    fmt.Println(v)
}
// 到这里说明 ch 已关闭

三、select 多路复用

select 可以同时等待多个 channel 操作,哪个就绪就执行哪个:

select {
case v := <-ch1:
    fmt.Println("从 ch1 收到:", v)
case v := <-ch2:
    fmt.Println("从 ch2 收到:", v)
case ch3 <- 42:
    fmt.Println("向 ch3 发送成功")
default:
    fmt.Println("没有 channel 就绪")
}

select 的规则:

┌────────────────────────────────────────────────────────────┐
│  select 的行为规则                                         │
├────────────────────────────────────────────────────────────┤
│  • 多个 case 就绪 → 随机选择一个执行(伪随机)            │
│  • 没有 case 就绪 + 有 default → 执行 default             │
│  • 没有 case 就绪 + 无 default → 阻塞等待                 │
│  • 空 select{} → 永久阻塞                                 │
└────────────────────────────────────────────────────────────┘

超时控制

select {
case v := <-ch:
    fmt.Println("收到:", v)
case <-time.After(3 * time.Second):
    fmt.Println("超时")
}

使用反射动态 select

当 channel 数量在运行时才确定(比如处理 N 个 channel),可以用 reflect.Select

cases := make([]reflect.SelectCase, len(channels))
for i, ch := range channels {
    cases[i] = reflect.SelectCase{
        Dir:  reflect.SelectRecv,
        Chan: reflect.ValueOf(ch),
    }
}

chosen, value, ok := reflect.Select(cases)
fmt.Printf("从 channel %d 收到: %v (ok=%v)\n", chosen, value, ok)

这种方式可以动态处理几百几千个 channel,解决了编译时 case 数量不确定的问题。

四、实现原理

hchan 结构

Channel 在运行时对应的数据结构是 hchan

┌──────────────────────────────────────────────────────────┐
│                      hchan 结构                           │
├──────────────────────────────────────────────────────────┤
│                                                          │
│  buf      unsafe.Pointer  ← 环形缓冲区                  │
│  ┌────┬────┬────┬────┬────┐                              │
│  │ v0 │ v1 │ v2 │    │    │  dataqsiz = 5               │
│  └────┴────┴────┴────┴────┘                              │
│        ↑              ↑                                   │
│      recvx          sendx                                │
│     (读位置)       (写位置)                               │
│                                                          │
│  qcount    uint    ← 当前缓冲区中的元素数                │
│  dataqsiz  uint    ← 缓冲区容量(make 的第二个参数)     │
│  elemsize  uint16  ← 元素大小                            │
│  closed    uint32  ← 是否已关闭                          │
│                                                          │
│  sendq   waitq    ← 等待发送的 goroutine 队列           │
│  recvq   waitq    ← 等待接收的 goroutine 队列           │
│                                                          │
│  lock    mutex    ← 保护 hchan 的锁                     │
│                                                          │
└──────────────────────────────────────────────────────────┘

发送流程

ch <- value 的执行路径:

  ┌─ recvq 中有等待的接收者?
  │    ├─ 是 ──→ 直接把值拷贝给接收者,唤醒它(无需经过 buf)
  │    │
  │    └─ 否
  │         ├─ 缓冲区未满?
  │         │    ├─ 是 ──→ 值拷贝到 buf[sendx],sendx++
  │         │    │
  │         │    └─ 否
  │         │         └─ 当前 goroutine 加入 sendq,休眠
  │         │
  └─────────┘

接收流程

value := <-ch 的执行路径:

  ┌─ sendq 中有等待的发送者?
  │    ├─ 是 + 无缓冲 ──→ 直接从发送者拷贝值
  │    ├─ 是 + 有缓冲 ──→ 从 buf[recvx] 取值,
  │    │                   同时把发送者的值放入 buf[sendx]
  │    │
  │    └─ 否
  │         ├─ 缓冲区非空?
  │         │    ├─ 是 ──→ 从 buf[recvx] 取值,recvx++
  │         │    │
  │         │    └─ 否
  │         │         └─ 当前 goroutine 加入 recvq,休眠
  │         │
  └─────────┘

关键优化:当有等待的接收者时,发送者直接把值拷贝给接收者,跳过缓冲区,减少一次拷贝。

五、应用模式

Channel 的强大不只是"发送和接收",而在于它可以组合出丰富的并发模式。

模式 1:信号通知

1-to-N 广播——利用 close 通知所有 goroutine:

func main() {
    quit := make(chan struct{})

    for i := 0; i < 5; i++ {
        go func(id int) {
            <-quit // 阻塞等待
            fmt.Printf("Worker %d: 收到退出信号\n", id)
        }(i)
    }

    time.Sleep(time.Second)
    close(quit) // ✅ 一次 close 通知所有 worker
    time.Sleep(time.Second)
}
close(quit) 的广播效果:

  close(quit) ──→ Worker 0 <-quit 立即返回 ✅
               ──→ Worker 1 <-quit 立即返回 ✅
               ──→ Worker 2 <-quit 立即返回 ✅
               ──→ Worker 3 <-quit 立即返回 ✅
               ──→ Worker 4 <-quit 立即返回 ✅

模式 2:数据流水线(Pipeline)

将数据处理拆分为多个阶段,每个阶段是一个 goroutine,通过 Channel 连接:

  生成器 ──→ ch1 ──→ 过滤器 ──→ ch2 ──→ 平方器 ──→ ch3 ──→ 消费者
// 阶段 1:生成数据
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// 阶段 2:过滤偶数
func filterEven(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            if n%2 == 0 {
                out <- n
            }
        }
    }()
    return out
}

// 阶段 3:平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func main() {
    // 组装流水线
    ch := generate(1, 2, 3, 4, 5, 6, 7, 8)
    ch = filterEven(ch)
    ch = square(ch)

    for v := range ch {
        fmt.Println(v) // 4, 16, 36, 64
    }
}

模式 3:扇入扇出(Fan-In / Fan-Out)

Fan-Out(一个输入分发给多个 worker):

                     ┌──→ Worker 1 ──→ out1
  in ──→ channel ────┼──→ Worker 2 ──→ out2
                     └──→ Worker 3 ──→ out3

Fan-In(多个输出汇聚成一个):

  out1 ──┐
  out2 ──┼──→ merged channel ──→ 消费者
  out3 ──┘
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()
    return merged
}

模式 4:信号量(限制并发度)

用带缓冲的 channel 限制同时运行的 goroutine 数量:

var sem = make(chan struct{}, 10) // 最多 10 个并发

func doWork(id int) {
    sem <- struct{}{}        // 获取信号量(缓冲满了就阻塞)
    defer func() { <-sem }() // 释放信号量

    fmt.Printf("Worker %d: 开始\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d: 完成\n", id)
}
信号量原理:

  缓冲容量 = 10

  Worker 1 : sem <- ✅ (1/10)
  Worker 2 : sem <- ✅ (2/10)
  ...
  Worker 10: sem <- ✅ (10/10)
  Worker 11: sem <- 阻塞... (满了)
  Worker 1 完成: <-sem (9/10)
  Worker 11: sem <- ✅ (10/10,进入)

提示:Go 官方扩展库提供了 golang.org/x/sync/semaphore,支持加权信号量。

模式 5:Or-Done

等待多个 channel,任意一个完成就返回。可以用 reflect.Select 实现动态版本:

func orDone(channels ...<-chan struct{}) <-chan struct{} {
    done := make(chan struct{})
    go func() {
        defer close(done)
        cases := make([]reflect.SelectCase, len(channels))
        for i, ch := range channels {
            cases[i] = reflect.SelectCase{
                Dir:  reflect.SelectRecv,
                Chan: reflect.ValueOf(ch),
            }
        }
        reflect.Select(cases) // 任意一个就绪就返回
    }()
    return done
}

六、使用 Channel 最常踩的 5 个坑

坑 1:向已关闭的 channel 发送

ch := make(chan int)
close(ch)
ch <- 1 // panic: send on closed channel 💀

原则:只由发送方关闭 channel,接收方永远不要关闭。

坑 2:goroutine 泄漏

channel 操作阻塞时,如果没有对应的接收/发送方,goroutine 会永远挂起:

// ❌ goroutine 泄漏
func leaky() <-chan int {
    ch := make(chan int)
    go func() {
        val := longComputation()
        ch <- val // 如果没人接收,这个 goroutine 永远阻塞 💀
    }()
    return ch
}

解决方案:用 context 控制生命周期:

// ✅ 用 context 控制
func safe(ctx context.Context) <-chan int {
    ch := make(chan int)
    go func() {
        val := longComputation()
        select {
        case ch <- val:
        case <-ctx.Done(): // 被取消时退出
        }
    }()
    return ch
}

坑 3:nil channel 的永久阻塞

var ch chan int // nil channel
ch <- 1        // 永久阻塞,不是 panic!
<-ch           // 永久阻塞,不是 panic!

nil channel 在 select 中可以被利用——动态禁用某个 case:

ch1 = nil // select 中 ch1 的 case 再也不会被选中

坑 4:for range channel 忘记 close

// ❌ 忘记 close,for range 永远不会结束
ch := make(chan int)
go func() {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    // 忘记 close(ch) 💀
}()

for v := range ch {
    fmt.Println(v) // 打印 0-4 后永久阻塞
}

坑 5:缓冲区大小选择不当

同步交接(需要确认对方收到)      ──→ 无缓冲(0)
解耦生产者和消费者速度差异        ──→ 小缓冲(1-100)
已知的固定任务数                  ──→ 等于任务数(避免阻塞)

七、Channel 的关闭原则

┌──────────────────────────────────────────────────────────────┐
│  Channel 关闭原则                                            │
├──────────────────────────────────────────────────────────────┤
│                                                              │
│  1. 只由发送方关闭,接收方不关闭                             │
│                                                              │
│  2. 有多个发送方时,不要直接 close                           │
│     → 用 sync.Once 保证只关闭一次                            │
│     → 或者引入一个额外的 done channel                        │
│                                                              │
│  3. 如果不确定是否还有发送方,不要 close                     │
│     → 让 GC 回收(channel 不一定要 close)                   │
│                                                              │
└──────────────────────────────────────────────────────────────┘

重要认知:close 不是释放资源,而是发信号。 不 close 的 channel 不会泄漏——只要没有 goroutine 持有引用,GC 会回收它。

八、Channel 应用模式速查

模式场景Channel 类型
信号通知通知事件发生chan struct{}
数据传递goroutine 间传数据chan T
Pipeline多阶段数据处理<-chan T 链式
Fan-Out/Fan-In并行处理 + 汇总多读一写 / 多写一读
信号量限制并发度chan struct{}(有缓冲)
Or-Done任意一个完成就返回chan struct{}

九、实战建议

  1. 优先使用 Channel 而不是 Mutex——当你需要在 goroutine 之间传递数据或信号时
  2. 牢记行为表——nil/关闭/正常三种状态下的 send/recv/close 行为
  3. 用 context 管理 goroutine 生命周期——防止 goroutine 泄漏
  4. 只由发送方关闭——多发送方场景用 sync.Once 或 done channel
  5. 信号通知用 chan struct{}——零内存开销,语义清晰
  6. Pipeline 每个阶段负责 close 自己的输出 channel——谁创建谁关闭
# 两个应该加入 CI 的命令
go vet ./...
go test -race ./...

Channel 是 Go 并发编程的核心——它既是数据传输的管道,也是同步协调的信号。理解 hchan 的环形缓冲区和等待队列机制,牢记三种状态下的行为表,掌握 Pipeline、Fan-In/Fan-Out 等并发模式,你就能用 Go 的方式优雅地解决各种并发编排问题。