4
我在Go中做了一些流處理,並試圖弄清楚如何在沒有鎖的情況下執行此操作。去併發切片訪問
這個人爲的例子顯示了我面臨的問題。
- 我們一次得到一個
thing
。 - 有一個goroutine將它們緩衝到一個稱爲
things
的切片中。 - 當
things
滿len(things) == 100
然後以某種方式處理和復位 - 有
n
一些需要訪問things
它的全 - 獲得了「不完整的」
things
從其它夠程前併發夠程是不可預測的。 - 無論
doSomethingWithPartial
也不doSomethingWithComplete
需求變異things
代碼:
var m sync.Mutex
var count int64
things := make([]int64, 0, 100)
// slices of data are constantly being generated and used
go func() {
for {
m.Lock()
if len(things) == 100 {
// doSomethingWithComplete does not modify things
doSomethingWithComplete(things)
things = make([]int64, 0, 100)
}
things = append(things, count)
m.Unlock()
count++
}
}()
// doSomethingWithPartial needs to access the things before they're ready
for {
m.Lock()
// doSomethingWithPartial does not modify things
doSomethingWithPartial(things)
m.Unlock()
}
我知道片是不可變這是否意味着我可以刪除互斥鎖,並期望它仍工作(我假設不是)。我該如何重構這個使用通道而不是互斥量。
編輯:這是我想出瞭解決方案不使用互斥
package main
import (
"fmt"
"sync"
"time"
)
func Incrementor() chan int {
ch := make(chan int)
go func() {
count := 0
for {
ch <- count
count++
}
}()
return ch
}
type Foo struct {
things []int
requests chan chan []int
stream chan int
C chan []int
}
func NewFoo() *Foo {
foo := &Foo{
things: make([]int, 0, 100),
requests: make(chan chan []int),
stream: Incrementor(),
C: make(chan []int),
}
go foo.Launch()
return foo
}
func (f *Foo) Launch() {
for {
select {
case ch := <-f.requests:
ch <- f.things
case thing := <-f.stream:
if len(f.things) == 100 {
f.C <- f.things
f.things = make([]int, 0, 100)
}
f.things = append(f.things, thing)
}
}
}
func (f *Foo) Things() []int {
ch := make(chan []int)
f.requests <- ch
return <-ch
}
func main() {
foo := NewFoo()
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(time.Millisecond * time.Duration(i) * 100)
things := foo.Things()
fmt.Println("got things:", len(things))
wg.Done()
}(i)
}
go func() {
for _ = range foo.C {
// do something with things
}
}()
wg.Wait()
}
切片不是不可變的。字符串是。 – fuz
切片是可變的。字符串是不可變的。 – peterSO
@FUZxxl切片指向的數組是可變的,但切片本身不是。 (AFAIK) –