go协程

go 使用协程很简单,直接前边加关键字go即可

什么是channel

channel是有类型的管道,可以用channel操作符 <- 对其发送或者接收值。

创建channel

c := make(chan int)

注意:管道必须指定类型,比如这里是int,即往管道里传送的数据只能是int类型。当然可以是interface{}这种空接口方式,这样就可以传送各种数据类型了

如何确定管道方向

ch <- v 
v := <-ch 

“箭头”就是数据流的方向

注意接受时候<-和chan变量之间没有空格,虽然有空格也不会报错,但是ide会提示,因此还是依照规范不空格比较好

管道和协程使用

默认情况下,在另一端准备好之前,发送和接收都会阻塞。这使得 goroutine 可以在没有明确的锁或竞态变量的情况下进行同步。

发动和接收数据应当在并行线上,而不能是串行的,因为发送和接收都会阻塞,如果串行,就会死锁(就是一个一直阻塞在那等对端),但不用为此操心,因为go在执行时候(编译会通过)会报错。

演示没有管道与有管道协程区别

不实用管道

func add(a int, b int) {
	fmt.Println(a + b)
}

func main() {
	go add(1, 8)
}

使用管道

func add_channel(a int, b int, c chan int) {
	c <- (a + b)
}

func main() {
	c := make(chan int)
	go add_channel(1, 23, c)
	res := <-c
	fmt.Println(res)
}

备注:数据哪去了,为啥没有使用管道的没有输出结果!

buffer

channel可以带buffer:

make(chan int, 2)

表示创建一个length为2的管道,注意这里的length表示数量,而非具体的内存容积,比如

make(chan string, 2)
  • 表示可以容纳2个string的内容,比如”abcdefg”和”h”这样2个string,而不是指只能存”a”和”b”。
  • 当不设置length时候即为buffer=0

说明:

  • 往chan发送数据时候,若buffer没满的时候,则发送数据即刻成功,不会被阻塞。当buffer满了就会被阻塞
  • 从chan接受数据时候,若buffer是空的则会被阻塞,当buffer不是空的时候则即刻完成,不会被阻塞

这个是重点,想一想你开始需要设置多少容量呢?如果不设置是不是没有缓存,容量进入出来有顺序吗?我就是不讲,大家实验下吧!

close


close(ch)

有且只有发送者可以关闭管道,接收者不能关闭管道

接收者可以通过v, ok := <-ch这种方式来测试管道是否关闭,若ok为false则表示管道已关闭

结合range


可以用for range来循环取出管道里的数据,range当遇到管道关闭时候就会自动结束循环,例子:

package main

import (
    "fmt"
)

func foo(c chan string) {
    c <- "a"
    c <- "b"
    close(c)
}

func main() {
    c := make(chan string)
    go foo(c)
    for i := range c {
        fmt.Println(i)
    }
}

输出

a
b

Closing is only necessary when the receiver must be told there are no more values coming, such as to terminate a range loop.

只有必须告知接收者没有数据要接收了才需要关闭管道,例如上面例子中需要告知range循环该结束了。如果不告知就会造成死锁

生产者close()结合消费者<- 就可以实现类似多进程、多线程的wait、join这种等待效果

channel结合select和case

select和case的组合可以使哪个管道就绪(对端已阻塞),就读取该管道数据并执行相应case的代码块。

官网译: select 会阻塞,直到条件分支中的某个可以继续执行,这时就会执行那个条件分支。当多个都准备好的时候,会随机选择一个。

package main

import (
    "fmt"
)

func receive(ch1, ch2, ch3, quit chan int) {
    for i := 0; i < 2; i++ {
        fmt.Printf("receive %d from ch1\n", <-ch1)
        fmt.Printf("receive %d from ch2\n", <-ch2)
        fmt.Printf("receive %d from ch3\n", <-ch3)
    }
    quit <- 0
}

func send(ch1, ch2, ch3, quit chan int) {
    for i := 0; i < 10; i++ {
        select {
        case ch1 <- i:
            fmt.Printf("send %d to ch1\n", i)
        case ch2 <- i:
            fmt.Printf("send %d to ch2\n", i)
        case ch3 <- i:
            fmt.Printf("send %d to ch3\n", i)
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    ch3 := make(chan int)
    quit := make(chan int)
    go receive(ch1, ch2, ch3, quit)
    send(ch1, ch2, ch3, quit)
}

输出:输出不固定,因为是并行的,大概100次执行结果中有3-7次和其他不一样,大部分结果是

send 0 to ch1
receive 0 from ch1
receive 1 from ch2
send 1 to ch2
send 2 to ch3
receive 2 from ch3
receive 3 from ch1
send 3 to ch1
send 4 to ch2
receive 4 from ch2
receive 5 from ch3
send 5 to ch3
quit

select的随机性

package main

import (
    "fmt"
    "time"
)

func receive(ch chan int) {
    for {
        <-ch
    }
}

func send(ch1, ch2, ch3 chan int) {
    for i := 0; i < 10; i++ {
        // sleep是为了保证所有的管道receiver都已阻塞等待数据
        time.Sleep(1000 * time.Millisecond)
        select {
        case ch1 <- i:
            fmt.Printf("send %d to ch1\n", i)
        case ch2 <- i:
            fmt.Printf("send %d to ch2\n", i)
        case ch3 <- i:
            fmt.Printf("send %d to ch3\n", i)
        }
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    ch3 := make(chan int)
    go receive(ch1)
    go receive(ch2)
    go receive(ch3)
    send(ch1, ch2, ch3)
}

每次输出都不一样,且找不到规律,说明是随机性的

select支持default


当所有case都阻塞的时候,执行default

select {
case ch1 <- i:
    fmt.Printf("send %d to ch1\n", i)
case ch2 <- i:
    fmt.Printf("send %d to ch2\n", i)
case ch3 <- i:
    fmt.Printf("send %d to ch3\n", i)
default:
    fmt.Println("default")
}

可以在channel里使用select的default来告知channe已满

读写类型

channel支持3种类型(通过%T看到的):

  • 读写类型:chan int
  • 只读类型:<-chan int,叫做receive-only
  • 只写类型:chan<- int,叫做send-only

通过函数参数传递时候,若原来是读写,则可以转成只读或只写,但如果已经是只读或只写,则只能保持类型,无法转为其他类型(比如原来是只读,则只能是只读,无法转成只写,也无法转为读写),例子如下:

package main

import (
    "fmt"
    "time"
)

func send(c chan<- int) {
    fmt.Printf("send: %T\n", c)
    c <- 1
}

func recv(c <-chan int) {
    fmt.Printf("recv: %T\n", c)
    fmt.Println(<-c)
}

func main() {
    c := make(chan int)
    fmt.Printf("%T\n", c)
    go send(c)
    go recv(c)
    time.Sleep(1 * time.Second)
}

输出

chan int
send: chan<- int
recv: <-chan int
1

互斥锁: sync.Mutex

用于主动控制Mutex类型的变量或者将Mutex类型作为struct的元素的变量在同一时间只被一个routine访问(即执行Lock()方法的代码块),这个Mutex带有2个方法:Lock()和Unlock()。互斥锁不区分读和写,即无论是print打印还是写操作都是互斥的

package main

import (
    "fmt"
    "sync"
)

func main() {
    var mutex sync.Mutex
    fmt.Printf("%+v\n", mutex)

    mutex.Lock()
    fmt.Printf("%+v\n", mutex)

    mutex.Unlock()
    fmt.Printf("%+v\n", mutex)
}

可以看出当Lock()时,state为1,Unlock()时,state为0。

这段代码其实是为后面Lock()使用做了一个工作原理铺垫(个人猜测):Lock()时候会判断state,若为0,则将state改为1;若为1,则阻塞等待,直到state变为0,然后再将state改为1。sync.Mutex是并发安全的。

演示互斥

// 结构体默认是不安全的,我们增加锁字段来确保安全
type SafeCounter struct {
	v   map[string]int
	mux sync.Mutex
}

// 给给定键值增加计数
func (c *SafeCounter) Inc(key string, id int, num chan int) {
	c.mux.Lock()
	fmt.Printf("%d. Inc lock.\n", id)
	// 获取锁定后,一次只能有一个操作
	c.v[key]++
	c.mux.Unlock()
	num <- 1
	fmt.Printf("%d. Inc unlock.\n", id)
}

// 返回值的结果
func (c *SafeCounter) Value(key string) int {
	c.mux.Lock()
	fmt.Println("Value lock.")
	// 获取锁定后,一次只能有一个操作
	defer fmt.Println("Value unlock.")
	defer c.mux.Unlock()
	return c.v[key]
}

func main() {
	chanel := make(chan int, 11)
	c := SafeCounter{v: make(map[string]int)}
	for i := 0; i < 10; i++ {
		go c.Inc("somekey", i, chanel)
		<-chanel
	}
	fmt.Println(c.Value("somekey"))
}
  • 我们一般会在锁定互斥锁之后紧接着就用defer语句来保证该互斥锁的及时解锁,因为defer是先进后出
  • Inc和Value方法的receiver是指针,这是必须的,因为Mutex锁对应的就是一个具体的变量(可以是struct,也可以是其他普通类型)
  • Lock只是一种人为的互斥,是一种协议,并不是强制。不调用既不会排斥

sync.Map

Go语言在 1.9 版本中提供了一种效率较高的并发安全的 sync.Map,sync.Map 和 map 不同,不是以语言原生形态提供,而是在 sync 包下的特殊结构。

sync.Map 有以下特性:

  • 无须初始化,直接声明即可。
  • sync.Map 不能使用 map 的方式进行取值和设置等操作,而是使用 sync.Map 的方法进行调用,Store 表示存储,Load 表示获取,Delete 表示删除。
  • 使用 Range 配合一个回调函数进行遍历操作,通过回调函数返回内部遍历出来的值,Range 参数中回调函数的返回值在需要继续迭代遍历时,返回 true,终止迭代遍历时,返回 false。

    // 给给定键值增加计数
    func (c *SafeCounter) Inc(key string, id int, num chan int) {
    	fmt.Printf("%d. Inc lock.\n", id)
    	// 获取锁定后,一次只能有一个操作
    	// c.v[key]++
    	tempKeyValue, isBe := c.v.Load(key)
    	if isBe {
    		c.v.Store(key, tempKeyValue.(int)+1)
    	} else {
    		c.v.Store(key, 1)
    	}
    	num <- 1
    	fmt.Printf("%d. Inc unlock.\n", id)
    }
    
    // 返回值的结果
    func (c *SafeCounter) Value(key string) int {
    	fmt.Println("Value lock.")
    	// 获取锁定后,一次只能有一个操作
    	defer fmt.Println("Value unlock.")
    	tempKeyValue, _ := c.v.Load(key)
    	return tempKeyValue.(int)
    }
    
    func main() {
    	chanel := make(chan int, 11)
    	c := SafeCounter{}
    	for i := 0; i < 10; i++ {
    		go c.Inc("somekey", i, chanel)
    		<-chanel
    	}
    	fmt.Println(c.Value("somekey"))
    }
    

RWMutex(读写锁)

  • RWMutex 是单写多读锁,该锁可以加多个读锁或者一个写锁
  • 读锁占用的情况下会阻止写,不会阻止读,多个 goroutine 可以同时获取读锁
  • 写锁会阻止其他 goroutine(无论读和写)进来,整个锁由该 goroutine 独占
  • 适用于读多写少的场景

Lock() 和 Unlock()

  • Lock() 加写锁,Unlock() 解写锁
  • 如果在加写锁之前已经有其他的读锁和写锁,则 Lock() 会阻塞直到该锁可用,为确保该锁可用,已经阻塞的 Lock() 调用会从获得的锁中排除新的读取器,即写锁权限高于读锁,有写锁时优先进行写锁定
  • 在 Lock() 之前使用 Unlock() 会导致 panic 异常

RLock() 和 RUnlock()

  • RLock() 加读锁,RUnlock() 解读锁
  • RLock() 加读锁时,如果存在写锁,则无法加读锁;当只有读锁或者没有锁时,可以加读锁,读锁可以加载多个
  • RUnlock() 解读锁,RUnlock() 撤销单词 RLock() 调用,对于其他同时存在的读锁则没有效果
  • 在没有读锁的情况下调用 RUnlock() 会导致 panic 错误
  • RUnlock() 的个数不得多余 RLock(),否则会导致 panic 错误

示例

Lock() 和 Unlock()

package main

import (
    "sync"
    "fmt"
    "time"
)

func main() {
    var mutex *sync.RWMutex
    mutex = new(sync.RWMutex)
    fmt.Println("Lock the lock")
    mutex.Lock()
    fmt.Println("The lock is locked")

    channels := make([]chan int, 4)
    for i := 0; i < 4; i++ {
        channels[i] = make(chan int)
        go func(i int, c chan int) {
            fmt.Println("Not lock: ", i)
            mutex.Lock()
            fmt.Println("Locked: ", i)
            fmt.Println("Unlock the lock: ", i)
            mutex.Unlock()
            c <- i
        }(i, channels[i])
    }
    time.Sleep(time.Second)
    fmt.Println("Unlock the lock")
    mutex.Unlock()
    time.Sleep(time.Second)

    for _, c := range channels {
        <-c
    }
}

程序输出:

Lock the lock
The lock is locked
Not lock:  0
Not lock:  1
Not lock:  2
Not lock:  3
Unlock the lock
Locked:  0
Unlock the lock:  0
Locked:  2
Unlock the lock:  2
Locked:  3
Unlock the lock:  3
Locked:  1
Unlock the lock:  1

Lock() 和 RLock()

package main

import (
    "sync"
    "fmt"
    "time"
)

func main() {
    var mutex *sync.RWMutex
    mutex = new(sync.RWMutex)
    fmt.Println("Lock the lock")
    mutex.Lock()
    fmt.Println("The lock is locked")

    channels := make([]chan int, 4)
    for i := 0; i < 4; i++ {
        channels[i] = make(chan int)
        go func(i int, c chan int) {
            fmt.Println("Not read lock: ", i)
            mutex.RLock()
            fmt.Println("Read Locked: ", i)
            fmt.Println("Unlock the read lock: ", i)
            time.Sleep(time.Second)
            mutex.RUnlock()
            c <- i
        }(i, channels[i])
    }
    time.Sleep(time.Second)
    fmt.Println("Unlock the lock")
    mutex.Unlock()
    time.Sleep(time.Second)

    for _, c := range channels {
        <-c
    }
}

程序输出:

Lock the lock
The lock is locked
Not read lock:  2
Not read lock:  3
Not read lock:  1
Not read lock:  0
Unlock the lock
Read Locked:  2
Read Locked:  1
Unlock the read lock:  2
Unlock the read lock:  1
Read Locked:  0
Read Locked:  3
Unlock the read lock:  0
Unlock the read lock:  3

Unlock() 使用之前不存在 Lock()

package main

import (
    "sync"
)

func main(){
    var rwmutex *sync.RWMutex
    rwmutex = new(sync.RWMutex)
    rwmutex.Unlock()
}

程序输出:

panic: sync: Unlock of unlocked RWMutex

RWMutex 使用不当导致的死锁

示例1:

package main

import (
    "sync"
)

func main(){
    var rwmutex *sync.RWMutex
    rwmutex = new(sync.RWMutex)
    rwmutex.Lock()
    rwmutex.Lock()
}

程序输出:

fatal error: all goroutines are asleep - deadlock!

示例2:

package main

import (
    "sync"
)

func main(){
    var rwmutex *sync.RWMutex
    rwmutex = new(sync.RWMutex)
    rwmutex.Lock()
    rwmutex.RLock()
}

程序输出:

fatal error: all goroutines are asleep - deadlock!

RUnlock() 之前不存在 RLock()

package main

import (
    "sync"
)

func main(){
    var rwmutex *sync.RWMutex
    rwmutex = new(sync.RWMutex)
    rwmutex.RUnlock()
}

程序输出:

panic: sync: RUnlock of unlocked RWMutex

RUnlock() 个数多于 RLock()

package main

import (
    "sync"
)

func main(){
    var rwmutex *sync.RWMutex
    rwmutex = new(sync.RWMutex)
    rwmutex.RLock()
    rwmutex.RLock()
    rwmutex.RUnlock()
    rwmutex.RUnlock()
    rwmutex.RUnlock()
}

程序输出:

panic: sync: RUnlock of unlocked RWMutex

##WaitGroup

WaitGroup 用于等待一组 goroutine 结束,用法很简单。它有三个方法:

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
  • Add 用来添加 goroutine 的个数
  • Done 执行一次数量减 1
  • Wait 用来等待结束

例子

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    fmt.Printf("init:             %+v\n", wg)

    for i := 1; i < 10; i++ {
        // 计数加 1
        wg.Add(1)
        go func(i int) {
            fmt.Printf("goroutine%d start: %+v\n", i, wg)
            time.Sleep(11 * time.Second)
            // 计数减 1
            wg.Done()
            fmt.Printf("goroutine%d end:   %+v\n", i, wg)
        }(i)
        time.Sleep(time.Second)
    }

    // 等待执行结束
    wg.Wait()
    fmt.Printf("over:             %+v\n", wg)
}

##条件变量(Cond)

与互斥量不同,条件变量的作用并不是保证在同一时刻仅有一个线程访问某一个共享数据,而是在对应的共享数据的状态发生变化时,通知其他因此而被阻塞的线程。条件变量总是与互斥量组合使用。互斥量为共享数据的访问提供互斥支持,而条件变量可以就共享数据的状态的变化向相关线程发出通知。

声明

lock := new(sync.Mutex)
cond := sync.NewCond(lock)

也可以写成一行

cond := sync.NewCond(new(sync.Mutex))

####方法

cond.L.Lock()
cond.L.Unlock()
cond.Wait()
cond.Signal()
cond.Broadcast()

Note

  • cond.L.Lock()cond.L.Unlock():也可以使用lock.Lock()lock.Unlock(),完全一样,因为是指针转递
  • cond.Wait():Unlock()->*阻塞等待通知(即等待Signal()或Broadcast()的通知)->收到通知*->Lock()
  • cond.Signal():通知一个Wait()了的,若没有Wait(),也不会报错。Signal()通知的顺序是根据原来加入通知列表(Wait())的先入先出
  • cond.Broadcast(): 通知所有Wait()了的,若没有Wait(),也不会报错

    var locker = new(sync.Mutex)
    var cond = sync.NewCond(locker)
    
    func test(x int) {
    	cond.L.Lock() //获取锁
    	fmt.Println("in: ", x)
    	cond.Wait() //等待通知  暂时阻塞
    	fmt.Println("do: ", x)
    	time.Sleep(time.Second * 2)
    	cond.L.Unlock()//释放锁
    	fmt.Println("out: ", x)
    }
    
    func main() {
    	for i := 0; i < 5; i++ {
    		go test(i) //所有线程都获取锁
    	}
    	fmt.Println("start all")
    	time.Sleep(time.Second * 10)
    	fmt.Println("Signal") // 下发一个通知给已经获取锁的goroutine
    	cond.Signal()
    	time.Sleep(time.Second * 5)
    	fmt.Println("Signal") // 下发一个通知给已经获取锁的goroutine
    	cond.Signal()
    	time.Sleep(time.Second * 5)
    	fmt.Println("broadcast")
    	cond.Broadcast()
    	time.Sleep(time.Second * 10)
    	fmt.Println("finish all")
    }
    

一般不建议用,还是用channel比较方便

临时对象池

临时对象池是一些可以分别存储和取出的临时对象。池中的对象会在没有任何通知的情况下被移出(释放或者重新取出使用)。如果 pool 中持有某个对象的唯一引用,则该对象很可能会被回收。Pool 在多 goroutine 使用环境中是安全的。Pool 是用来缓存已经申请了的 目前未使用的 接下来可能会使用的 内存,以此缓解 GC 压力。使用它可以方便高效的构建线程安全的 free list(一种用于动态内存申请的数据结构)。然而,它并不适合所有场景的 free list。在同一 package 中独立运行的多个独立线程之间静默共享一组临时元素才是 pool 的合理使用场景。Pool 提供在多个独立 client 之间共享临时元素的机制。在 fmt 包中有一个使用 Pool 的例子,它维护了一个动态大小的输出 buffer。另外,一些短生命周期的对象不适合使用 pool 来维护,这种情况下使用 pool 不划算。这是应该使用它们自己的 free list(这里可能指的是 go 内存模型中用于缓存 <32k小对象的 free list) 更高效。Pool 一旦使用,不能被复制。

对比1(起步阶段):


package main

import (
    "fmt"
    "sync"
    "time"
)

// 一个[]byte的对象池,每个对象为一个[]byte
var bytePool = sync.Pool{
    New: func() interface{} {
        b := make([]byte, 1)
        return &b
    },
}

func main() {
    a := time.Now().Unix()
    // 不使用对象池
    for i := 0; i < 1000000000; i++ {
        obj := make([]byte, 1)
        _ = obj
    }
    b := time.Now().Unix()
    // 使用对象池
    for i := 0; i < 1000000000; i++ {
        obj := bytePool.Get().(*[]byte)
        bytePool.Put(obj)
    }
    c := time.Now().Unix()
    fmt.Println("without pool ", b-a, "s")
    fmt.Println("with    pool ", c-b, "s")
}

输出

without pool  0 s
with    pool  17 s

可以看到,当[]byte只有1个元素时候,用pool性能反而更差

对比2(追赶阶段):

package main

import (
    "fmt"
    "sync"
    "time"
)

// 一个[]byte的对象池,每个对象为一个[]byte
var bytePool = sync.Pool{
    New: func() interface{} {
        b := make([]byte, 800)
        return &b
    },
}

func main() {
    a := time.Now().Unix()
    // 不使用对象池
    for i := 0; i < 1000000000; i++ {
        obj := make([]byte, 800)
        _ = obj
    }
    b := time.Now().Unix()
    // 使用对象池
    for i := 0; i < 1000000000; i++ {
        obj := bytePool.Get().(*[]byte)
        bytePool.Put(obj)
    }
    c := time.Now().Unix()
    fmt.Println("without pool ", b-a, "s")
    fmt.Println("with    pool ", c-b, "s")
}

输出

without pool  16 s
with    pool  17 s

可以看到,对象池优势了

对比3(超越阶段):

package main

import (
    "fmt"
    "sync"
    "time"
)

// 一个[]byte的对象池,每个对象为一个[]byte
var bytePool = sync.Pool{
    New: func() interface{} {
        b := make([]byte, 8000)
        return &b
    },
}

func main() {
    a := time.Now().Unix()
    // 不使用对象池
    for i := 0; i < 1000000000; i++ {
        obj := make([]byte, 8000)
        _ = obj
    }
    b := time.Now().Unix()
    // 使用对象池
    for i := 0; i < 1000000000; i++ {
        obj := bytePool.Get().(*[]byte)
        bytePool.Put(obj)
    }
    c := time.Now().Unix()
    fmt.Println("without pool ", b-a, "s")
    fmt.Println("with    pool ", c-b, "s")
}

输出

without pool  128 s
with    pool  17 s
  1. 当每个对象的内存小于一定量的时候,不使用pool的性能秒杀使用pool;当内存处于某个量的时候,不使用pool和使用pool性能相当;当内存大于某个量的时候,使用pool的优势就显现出来了
  2. 不使用pool,那么对象占用内存越大,性能下降越厉害;使用pool,无论对象占用内存大还是小,性能都保持不变。可以看到pool有点像飞机,虽然起步比跑车慢,但后劲十足。

即:pool适合占用内存大且并发量大的场景。当内存小并发量少的时候,使用pool适得其反

Pool的正确用法

在Put之前重置,在Get之后重置