2017-02-12 37 views
2

以下情形:GenStage製作人處理Twitter流(使用Stream APIExTwitter),並向GenStage使用者提供一組推文(最大需求消費者要求)。消費者然後只是打印它們。GenStage:如何處理生產者無法提供事件的情況?

以下問題:我在尋找特定的推文,所以並不總是有新的推文可用。如果GenStage製作者正在返回一個空的事件列表,則消費者將停止詢問。有關更多信息,請參閱this issue and José Valims reply

我不知道如何解決這個問題。任何幫助是極大的讚賞。這是我到目前爲止:

defmodule MyApp.TwitterProducer do 
    use GenStage 
    alias MyApp.TwitterStream 

    def start_link(:ok) do 
    GenStage.start_link(__MODULE__, :ok) 
    end 

    def init(:ok) do 
    # This creates a regular Elixir Stream 
    # I use this as the state so that not every 
    # time the consumer asks for new data 
    # a new stream is initiated 
    stream = TwitterStream.get_stream 
    {:producer, stream} 
    end 

    def handle_demand(demand, stream) do 
    # Take tweets from the stream and 
    # turn them into a list. Then return 
    # them to the consumer 
    chunk = Stream.take(stream, demand) 
    events = Enum.to_list(chunk) 
    {:noreply, events, stream} 
    end 


    def handle_info(_msg, state) do 
    # I as getting an "wrong message" error 
    # before I implemented this function myself 
    # It does nothing special to my case 
    {:noreply, [], state} 
    end 

end 

defmodule MyApp.TwitterConsumer do 
    use GenStage 

    def start_link() do 
    GenStage.start_link(__MODULE__, :ok) 
    end 

    def init(:ok) do 
    {:consumer, :the_state_does_not_matter} 
    end 

    def handle_events(events, _from, state) do 

    Process.sleep(3000) 
    IO.inspect(events) 

    # We are a consumer, so we would never emit items. 
    {:noreply, [], state} 
    end 

end 

# Let's fire this thing up 
{:ok, p} = GenStage.start_link(MyApp.TwitterProducer, :ok, name: MyApp.TwitterProducer) 
{:ok, c} = GenStage.start_link(MyApp.TwitterConsumer, :ok, name: MyApp.TwitterConsumer) 
GenStage.sync_subscribe(c, to: p, max_demand: 3) 

會發生什麼是:它運行一段時間,然後停止。據我瞭解,只要生產者返回一個空的事件列表。

編輯:有趣的是:如果我將需求設置爲1,它會繼續運行。但它比直接查詢Twitter Stream API要慢很多。這意味着我收到的推文減少了十倍。我的理論是,這是由於重複的Stream.take調用,而不是隻爲整個流調用Enum.to_list。但我發現它仍然很混亂。任何想法我失蹤?

+0

如何建立一個特殊的退貨單,表示沒有發現微博? –

+0

問題在於:流是懶惰的。我沒有看到檢查是否存在元素然後返回特殊列表的方法。例如,如果我調用Enum.count/1,代碼會等到實際上有元素。這將導致超時。 –

+0

好吧,如果它停止,只需重新啓動它。 –

回答

5

有文檔中一個顯著(但不幸的是在大膽不表達)的句子上GenStage.handle_demand/2

生產者必須或者存儲需求或返回的事件請求

這就是說,而不是在Stream.take一個塊可能會明確地意識到任務的可能阻止和處理的情況下,收集使用Task.await/2有合理的超時在這種情況下的需求(也許Task.yield/2可以使用在更復雜的檢查,但在這裏它似乎是矯枉過正)

從文檔:

如果你不希望任務失敗,那麼你必須以同樣的方式改變heavy_fun/0代碼你會實現它如果你沒有異步調用。例如,要返回{:ok, val} | :error結果或在更極端的情況下,通過使用try/rescue

雖然這個文檔沒有這個例子。 OTOH,在這裏它可能會更容易剛剛返回空列表,忘了收集需求:

def handle_demand(demand, stream) do 
    try do 
    task = Task.async(fn -> 
     stream 
     |> Stream.take(demand) 
     |> Enum.to_list() 
    end) 
    Task.await(task, 1000) # one sec 
    catch 
    :exit, {:timeout, {Task, :await, [_, 1000]}} -> 
     {:noreply, [], stream} 
    else 
    events when is_list(events) -> 
     {:noreply, events, stream} 
    end 
end 
相關問題