2013-10-29 29 views
4

信號燈在Golang與信道來實現:Golang:如何超時信號量?

一個例子是這樣的: https://sites.google.com/site/gopatterns/concurrency/semaphores

語境:

我們有幾百個服務器,並有共享,我們要限制資源進入。因此,對於給定的資源,我們希望使用信號量來限制對這些服務器只有5個併發訪問的訪問。爲了做到這一點,我們計劃使用鎖服務器。當一臺機器訪問資源時,它將首先向鎖服務器註冊它正在通過密鑰訪問資源。然後,當它完成時,它會向鎖服務器發送另一個請求,表示完成並釋放信號量。這確保我們將對這些資源的訪問限制爲最大數量的併發訪問。

問題:想要優雅地處理這個,如果出現錯誤。

問題

你如何去對信號實現超時?

例子:

比方說,我有5旗語大小有同時存在10個進程試圖獲取信號量的鎖所以在這種情況下,只有5將收購它。

有時候,進程會死而沒有響應(真正的原因有點複雜的解釋,但基本上有時進程可能無法解鎖它),因此信號量中的空間現在永久鎖定,從而導致問題。

所以我想對這個超時。這裏有一些問題:

的過程將從秒2之間的任何地方運行長達60分鐘。

我們有一些競爭條件,因爲如果超時,然後進程試圖解開它,那麼我們已經揭開了信號,而不是一次兩次。反之亦然,我們先解鎖它,然後超時。

如何採取張貼以上,並把它變成與超時線程安全的旗語的建議圖案?

+0

您的要求有多嚴格?您是否想要限制對資源的訪問,或者如果超過5臺服務器同時訪問共享資源,是否存在硬故障模式? –

+0

有多種轉到這裏計數信號的例子:https://github.com/tarndt/sema – voidlogic

回答

1

這是一個有點難以弄清楚你想實現什麼,但我可以告訴你想擁有的併發夠程訪問共享資源,並處理它優雅,如果事情不順利。我對你如何處理這個問題有幾點建議。

1)從同步包使用WaitGroup:http://golang.org/pkg/sync/#example_WaitGroup

使用此策略,你每次調用之前基本上添加到櫃檯到一個新的goroutine並使用延遲,以確保它從櫃檯中刪除(因此,無論它會因爲另一個原因超時或返回,它仍然會從櫃檯上移除)。然後,您使用wg.Wait()命令來確保它在所有返回例程返回之前不會再繼續。下面是一個例子:http://play.golang.org/p/wnm24TcBZg請注意,如果沒有wg.Wait(),它將不會在正在返回主程序和終止之前等待結束程序完成。

2)使用time.Ticker自動超時:http://golang.org/pkg/time/#Ticker

這種方法基本上都會設置一個計時器,這將在一定的時間間隔斷火。您可以使用此計時器來控制基於時間的事件。基本上這必須在for循環中運行,等待通道被打勾,如下例所示:http://play.golang.org/p/IHeqmiFBSS

再次,不完全確定您要完成的工作,但您可以考慮將這兩個以便如果您的過程超時並且處於循環中,則股票將捕獲它並在一段時間後返回並調用延遲函數,以便等待它的代碼部分繼續前進。希望這至少有一點幫助。

+0

對不起,我會盡量解釋更好。但是這基本上是一個分佈式信號量服務器。限制在幾百臺機器上同時訪問資源。因此推遲/ wait.Syncgroup不起作用。 time.AfterFunc或time.Ticker是一個好主意,但是,如果超時,然後處理回來,解開它呢? – samol

+0

弗蘭,我只是修改我的問題,我希望現在更有意義? – samol

+0

好的,這裏是我想出來的:http://play.golang.org/p/Q2VX25ov4T 它不是那麼一回事,但我認爲它確實有點你所要求的。評論幾乎可以解釋所發生的一切,但隨時問問你是否有更多問題。代碼有點複雜,所以它並不實際在遊樂場中運行,但它會在我的系統上運行,直至遇到死鎖,但您應該可以根據自己的目的對其進行修改以避免這種情況。 – Verran

0

一些假設:

  • 你需要各地 5個服務器,讓過去在鎖定時間服務器。
  • 訪問該資源的時間較短且類似。

使用配額服務器而不是鎖定服務器。以5倍的平均(平均值,第75等)訪問/鎖定時間補充配額(一個簡單的計數器)。只有在小於最大值的情況下才補充配額。平均來說,您將保持大約5個併發訪問/鎖定。

一些高級功能:

  • 如果共享資源可以檢測到它自己的負載它可以告訴它可能需要更多或更少的併發訪問配額服務器。
  • 當配額服務器完成後,服務器可以對配額服務器執行ping操作。這不是必需的,但是可以更快地釋放資源。
1

由於您正在製作分佈式鎖定服務,我假設您的鎖定服務器在端口上進行偵聽,並且當您接受()您循環的連接時,將等待每個連接的goroutine中的命令。當套接字丟失時,該goroutine退出(即:遠程節點崩潰)

因此,假設這是事實,您可以做幾件事情。

1)創建與深度匹配的溝道多少個併發鎖 2)時鎖定,將消息發送到該信道(它將如果全框) 3)當解鎖,剛讀從一個消息通道 4)你可以「推遲釋放()」(如果你已經鎖定,釋放會消耗一條消息)

這裏是一個粗略的工作示例,除socket之外的所有東西。 希望它是有道理的。 http://play.golang.org/p/DLOX7m8m6q

package main 

import "fmt" 

import "time" 

type Locker struct { 
    ch chan int 
    locked bool 
} 

func (l *Locker) lock(){ 
    l.ch <- 1 
    l.locked=true 
} 
func (l *Locker) unlock() { 
    if l.locked { // called directly or via defer, make sure we don't unlock if we don't have the lock 
     l.locked = false // avoid unlocking twice if socket crashes after unlock 
     <- l.ch 
    } 
} 

func dostuff(name string, locker Locker) { 
    locker.lock() 
    defer locker.unlock() 
    fmt.Println(name,"Doing stuff") 
    time.Sleep(1 * time.Second) 
} 

func main() { 
    ch := make(chan int, 2) 
    go dostuff("1",Locker{ch,false}) 
    go dostuff("2",Locker{ch,false}) 
    go dostuff("3",Locker{ch,false}) 
    go dostuff("4",Locker{ch,false}) 
    time.Sleep(4 * time.Second) 
} 
+0

嘿大衛,謝謝你的回答。然而,dostuff部分實際上是在客戶端。我們需要超時信號量的原因是客戶端可能死機(服務器可能停機)。所以信號量本身必須知道和超時鎖定。不知道這是否有道理? – samol

+0

@samol所以有'dostuff'等待在客戶端告訴說,它的完成*或*客戶端的連接走開,在後一種情況下,你可以假設它的失敗,並釋放資源(或之後釋放額外的超時或在看到客戶端是否重新連接之後,無論如何)。即要求客戶至少在使用資源的時間內保持TCP連接。 –

0

也許這會有所幫助,但我認爲這是實現擴張太
我會感謝有關代碼的任何建議。

package main 

import (
    "fmt" 
    "time" 
    "math/rand" 
    "strconv" 
) 

type Empty interface{} 

type Semaphore struct { 
    dur time.Duration 
    ch chan Empty 
} 

func NewSemaphore(max int, dur time.Duration) (sem *Semaphore) { 
    sem = new(Semaphore) 
    sem.dur = dur 
    sem.ch = make(chan Empty, max) 
    return 
} 

type Timeout struct{} 

type Work struct{} 

var null Empty 
var timeout Timeout 
var work Work 

var global = time.Now() 

func (sem *Semaphore) StartJob(id int, job func()) { 
    sem.ch <- null 
    go func() { 
     ch := make(chan interface{}) 
     go func() { 
      time.Sleep(sem.dur) 
      ch <- timeout 
     }() 
     go func() { 
      fmt.Println("Job ", strconv.Itoa(id), " is started", time.Since(global)) 
      job() 
      ch <- work 
     }() 
     switch (<-ch).(type) { 
     case Timeout: 
      fmt.Println("Timeout for job ", strconv.Itoa(id), time.Since(global)) 
     case Work: 
      fmt.Println("Job ", strconv.Itoa(id), " is finished", time.Since(global)) 
     } 
     <-sem.ch 
    }() 
} 

func main() { 
    rand.Seed(time.Now().Unix()) 
    sem := NewSemaphore(3, 3*time.Second) 
    for i := 0; i < 10; i++ { 
     id := i 
     go sem.StartJob(i, func() { 
      seconds := 2 + rand.Intn(5) 
      fmt.Println("For job ", strconv.Itoa(id), " was allocated ", seconds, " secs") 
      time.Sleep(time.Duration(seconds) * time.Second) 
     }) 
    } 
    time.Sleep(30 * time.Second) 
}