funccreateWorker(id int)chan<- int { //发数据 c := make(chanint) gofunc(){ for{ fmt.Printf("Worked %d received %c\n", id, <-c) } }() return c } funcchanDemo() { //创建10个channel分发给10个worker var channels [10]chan<- int for i := 0; i < 10; i++ { channels[i] = createWorker(i) } //给10个channel分发数据 for i := 0; i < 10; i++ { channels[i] <- 'a' + i }
for i := 0; i < 10; i++ { channels[i] <- 'A' + i }
funcworker(id int, c chanint) { //读完channel内的数据就退出的两种方法 for n := range c { fmt.Printf("Worker %d received %c\n", id, n) } /* for { n, ok := <-c //如果没有值了ok为false if !ok { break } fmt.Printf("Worker %d received %c\n", id, n) } */ }
funcchannelClose() { c := make(chanint) go worker(0, c) c <- 'a' c <- 'b' c <- 'c' c <- 'd' //发送方可以close //接收方有两种判断方法 ok,range close(c) //结束后依旧会接收到数据——(channel具体类型的零值) time.Sleep(time.Millisecond) }
Go语言并发执行理论基础:Communication Sequential Process (CSP)
type worker struct { in chanint done chanbool } funcdoWork(id int, c chanint, done chanbool) { for { fmt.Printf("Worker %d received %c\n", id, <-c) done <- true } } funccreateWorker(id int) worker { w := worker{ in: make(chanint), done: make(chanbool), } go doWork(id, w.in, w.done) return w } //所有channel的发送的都是阻塞式的 funcchanDemo() { var workers [10]worker for i := 0; i < 10; i++ { workers[i] = createWorker(i) } for i, worker := range workers{ worker.in <- 'a' + i } for i, worker := range workers { worker.in <- 'A' + i } for _, worker := range workers { <-worker.done <-worker.done } }
解决方法: 再开一个goroutine并行
1 2 3 4 5 6
funcdoWork(id int, c chanint, done chanbool) { for { fmt.Printf("Worker %d received %c\n", id, <-c) gofunc() { done <- true }() } }
funcdoWork(id int, w worker) { for n := range w.in { fmt.Printf("Worker %d received %c\n", id, n) //函数式编程,只调用done方法,具体执行什么函数由外面的createWorker来控制 w.done() } }
funccreateWorker(id int, wg *sync.WaitGroup) worker { w := worker{ in: make(chanint), done: func() { wg.Done() }, } go doWork(id, w) return w }
//所有channel的发送的都是阻塞式的 funcchanDemo() { var wg sync.WaitGroup
var workers [10]worker for i := 0; i < 10; i++ { workers[i] = createWorker(i, &wg) }
wg.Add(20) for i, worker := range workers { worker.in <- 'a' + i }
for i, worker := range workers { worker.in <- 'A' + i } wg.Wait() }
三、Select
select 是 Go 中的一个控制结构,类似于用于通信的 switch 语句。每个 case 必须是一个通信操作,要么是发送要么是接收。 select 随机执行一个可运行的 case。如果没有 case 可运行,它将阻塞,直到有 case 可运行。一个默认的子句应该总是可运行的。
funcgenerator()chanint { out := make(chanint) gofunc() { i := 0 for { time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond) out <- i i++ } }() return out } funcworker(id int, c chanint) { for n := range c { time.Sleep(time.Second) fmt.Printf("Worker %d received %d\n", id, n) } } funccreateWorker(id int)chan<- int { c := make(chanint) go worker(id, c) return c } funcmain() { var c1, c2 chanint = generator(), generator() worker := createWorker(0) var values []int tm := time.After(10 * time.Second) tick := time.Tick(time.Second) for { var activeWorker chan<- int var activeValue int
//values中存有数据,对activeWorker初始化 iflen(values) > 0 { activeWorker = worker activeValue = values[0] } select { case n := <-c1: values = append(values, n) case n := <-c2: values = append(values, n)
//activeWorker没有值的时候为nil,此时阻塞不会执行 case activeWorker <- activeValue: values = values[1:]
//相邻两个请求之间超过800ms即select阻塞时间超过800ms,则输出timeout case <-time.After(800 * time.Millisecond): fmt.Println("timeout")
//每隔一秒输出长度 case <-tick: fmt.Println("queue len =", len(values))
//总时间10s后退出 case <-tm: fmt.Println("bye") return } } }
funcmsgGen(name string)chanstring { c := make(chanstring) gofunc() { i := 0 for { time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) c <- fmt.Sprintf("service %s message %d", name, i) i++ } }() return c }
//不知道有多少个channel的时候用这种fanIn funcfanIn(chs ...chanstring)chanstring { c := make(chanstring) for _, ch := range chs { //如果直接用ch的话,ch只有一个值,只会取最后一个channel的值送给c //所以要通过一个参数拷贝ch进行传递 gofunc(in chanstring) { for { c <- <-in } }(ch) } /* 变量chCopy在全局有两份,通过chCopy拷贝ch for _, ch := range chs { chCopy := ch go func() { for { c <- <-chCopy } }() } */ return c }
//明确channel个数时用select funcfanInBySelect(c1, c2 chanstring)chanstring { c := make(chanstring) gofunc() { for { select { case m := <-c1: c <- m case m := <-c2: c <- m } } }() return c }