以下情形:GenStage製作人處理Twitter流(使用Stream API和ExTwitter),並向GenStage使用者提供一組推文(最大需求消費者要求)。消費者然後只是打印它們。GenStage:如何處理生產者無法提供事件的情況?
以下問題:我在尋找特定的推文,所以並不總是有新的推文可用。如果GenStage製作者正在返回一個空的事件列表,則消費者將停止詢問。有關更多信息,請參閱this issue and José Valims reply。
我不知道如何解決這個問題。任何幫助是極大的讚賞。這是我到目前爲止:
defmodule MyApp.TwitterProducer do
use GenStage
alias MyApp.TwitterStream
def start_link(:ok) do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
# This creates a regular Elixir Stream
# I use this as the state so that not every
# time the consumer asks for new data
# a new stream is initiated
stream = TwitterStream.get_stream
{:producer, stream}
end
def handle_demand(demand, stream) do
# Take tweets from the stream and
# turn them into a list. Then return
# them to the consumer
chunk = Stream.take(stream, demand)
events = Enum.to_list(chunk)
{:noreply, events, stream}
end
def handle_info(_msg, state) do
# I as getting an "wrong message" error
# before I implemented this function myself
# It does nothing special to my case
{:noreply, [], state}
end
end
defmodule MyApp.TwitterConsumer do
use GenStage
def start_link() do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
Process.sleep(3000)
IO.inspect(events)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
# Let's fire this thing up
{:ok, p} = GenStage.start_link(MyApp.TwitterProducer, :ok, name: MyApp.TwitterProducer)
{:ok, c} = GenStage.start_link(MyApp.TwitterConsumer, :ok, name: MyApp.TwitterConsumer)
GenStage.sync_subscribe(c, to: p, max_demand: 3)
會發生什麼是:它運行一段時間,然後停止。據我瞭解,只要生產者返回一個空的事件列表。
編輯:有趣的是:如果我將需求設置爲1,它會繼續運行。但它比直接查詢Twitter Stream API要慢很多。這意味着我收到的推文減少了十倍。我的理論是,這是由於重複的Stream.take
調用,而不是隻爲整個流調用Enum.to_list
。但我發現它仍然很混亂。任何想法我失蹤?
如何建立一個特殊的退貨單,表示沒有發現微博? –
問題在於:流是懶惰的。我沒有看到檢查是否存在元素然後返回特殊列表的方法。例如,如果我調用Enum.count/1,代碼會等到實際上有元素。這將導致超時。 –
好吧,如果它停止,只需重新啓動它。 –