2013-04-18 42 views
4

我在Go中做了一些流處理,並試圖弄清楚如何在沒有鎖的情況下執行此操作。去併發切片訪問

這個人爲的例子顯示了我面臨的問題。

  • 我們一次得到一個thing
  • 有一個goroutine將它們緩衝到一個稱爲things的切片中。
  • things滿len(things) == 100然後以某種方式處理和復位
  • n一些需要訪問things它的全
  • 獲得了「不完整的」 things從其它夠程前併發夠程是不可預測的。
  • 無論doSomethingWithPartial也不doSomethingWithComplete需求變異things

代碼:

var m sync.Mutex 
var count int64 
things := make([]int64, 0, 100) 

// slices of data are constantly being generated and used 
go func() { 
    for { 
    m.Lock() 
    if len(things) == 100 { 
     // doSomethingWithComplete does not modify things 
     doSomethingWithComplete(things) 
     things = make([]int64, 0, 100) 
    } 
    things = append(things, count) 
    m.Unlock() 
    count++ 
    } 
}() 

// doSomethingWithPartial needs to access the things before they're ready 
for { 
    m.Lock() 
    // doSomethingWithPartial does not modify things 
    doSomethingWithPartial(things) 
    m.Unlock() 
} 
  1. 我知道片是不可變這是否意味着我可以刪除互斥鎖,並期望它仍工作(我假設不是)

  2. 我該如何重構這個使用通道而不是互斥量。

編輯:這是我想出瞭解決方案不使用互斥

package main 

import (
    "fmt" 
    "sync" 
    "time" 
) 

func Incrementor() chan int { 
    ch := make(chan int) 
    go func() { 
     count := 0 
     for { 
      ch <- count 
      count++ 
     } 
    }() 
    return ch 
} 

type Foo struct { 
    things []int 
    requests chan chan []int 
    stream chan int 
    C  chan []int 
} 

func NewFoo() *Foo { 
    foo := &Foo{ 
     things: make([]int, 0, 100), 
     requests: make(chan chan []int), 
     stream: Incrementor(), 
     C:  make(chan []int), 
    } 
    go foo.Launch() 
    return foo 
} 

func (f *Foo) Launch() { 
    for { 
     select { 
     case ch := <-f.requests: 
      ch <- f.things 
     case thing := <-f.stream: 
      if len(f.things) == 100 { 
       f.C <- f.things 
       f.things = make([]int, 0, 100) 
      } 
      f.things = append(f.things, thing) 
     } 
    } 
} 

func (f *Foo) Things() []int { 
    ch := make(chan []int) 
    f.requests <- ch 
    return <-ch 
} 

func main() { 

    foo := NewFoo() 

    var wg sync.WaitGroup 
    wg.Add(10) 

    for i := 0; i < 10; i++ { 
     go func(i int) { 
      time.Sleep(time.Millisecond * time.Duration(i) * 100) 
      things := foo.Things() 
      fmt.Println("got things:", len(things)) 
      wg.Done() 
     }(i) 
    } 

    go func() { 
     for _ = range foo.C { 
      // do something with things 
     } 
    }() 

    wg.Wait() 
} 
+1

切片不是不可變的。字符串是。 – fuz

+1

切片是可變的。字符串是不可變的。 – peterSO

+0

@FUZxxl切片指向的數組是可變的,但切片本身不是。 (AFAIK) –

回答

1

應該指出的是,「去的方式」可能只是使用互斥這個。研究如何使用通道來實現它是很有趣的,但互斥對於這個特定的問題可能更簡單也更容易推理。