2014-12-04 64 views
0

我正在從紅寶石背景中學習Erlang,並且難以理解思維過程。我試圖解決的問題如下:我需要對api發出同樣的請求,每次我在響應中收到一個唯一的ID,我需要傳入下一個請求,直到沒有返回ID。從每個響應中我需要提取某些數據並將其用於其他事情。erlang,從元組列表中提取值連續

首先得到該迭代器:

ShardIteratorResponse = kinetic:get_shard_iterator(GetShardIteratorPayload). 
{ok,[{<<"ShardIterator">>, 
     <<"AAAAAAAAAAGU+v0fDvpmu/02z5Q5OJZhPo/tU7fjftFF/H9M7J9niRJB8MIZiB9E1ntZGL90dIj3TW6MUWMUX67NEj4GO89D"...>>}]} 

解析出shard_iterator ..

{_, [{_, ShardIterator}]} = ShardIteratorResponse. 

請對室壁運動的流記錄的請求......

GetRecordsPayload = [{<<"ShardIterator">>, <<ShardIterator/binary>>}]. 
[{<<"ShardIterator">>, 
    <<"AAAAAAAAAAGU+v0fDvpmu/02z5Q5OJZhPo/tU7fjftFF/H9M7J9niRJB8MIZiB9E1ntZGL90dIj3TW6MUWMUX67NEj4GO89DETABlwVV"...>>}] 
14> RecordsResponse = kinetic:get_records(GetRecordsPayload). 
{ok,[{<<"NextShardIterator">>, 
     <<"AAAAAAAAAAFy3dnTJYkWr3gq0CGo3hkj1t47ccUS10f5nADQXWkBZaJvVgTMcY+nZ9p4AZCdUYVmr3dmygWjcMdugHLQEg6x"...>>}, 
    {<<"Records">>, 
     [{[{<<"Data">>,<<"Zmlyc3QgcmVjb3JkISEh">>}, 
     {<<"PartitionKey">>,<<"BlanePartitionKey">>}, 
     {<<"SequenceNumber">>, 
      <<"49545722516689138064543799042897648239478878787235479554">>}]}]}]} 

我我正在努力的是如何編寫一個循環,不斷打擊該流的kinesis端點,直到沒有mor e分片迭代器...又名我想要所有記錄。因爲我不能重新分配變量,因爲我會在紅寶石...任何幫助指導讚賞!

回答

1

警告:我的代碼可能會被竊聽,但它是「close 」。我從來沒有跑過它,也沒有看到最後一個迭代器的樣子。

我看到你正在嘗試完全在shell中完成你的工作。這是可能的,但很難。您可以使用命名函數和遞歸(since release 17.0 it's easier),例如:

F = fun (ShardIteratorPayload) -> 
    {_, [{_, ShardIterator}]} = kinetic:get_shard_iterator(ShardIteratorPayload), 
    FunLoop = 
     fun Loop(<<>>, Accumulator) -> % no clue how last iterator can look like 
       lists:reverse(Accumulator); 
      Loop(ShardIterator, Accumulator) -> 
       {ok, [{_, NextShardIterator}, {<<"Records">>, Records}]} = 
        kinetic:get_records([{<<"ShardIterator">>, <<ShardIterator/binary>>}]), 
       Loop(NextShardIterator, [Records | Accumulator]) 
     end, 
    FunLoop(ShardIterator, []) 
end. 
AllRecords = F(GetShardIteratorPayload). 

但它太複雜,請在外殼...

這是很容易把它模塊代碼。 erlang中的常見模式是產生另一個進程或進程來獲取您的數據。爲了保持簡單,您可以撥打spawn or spawn_link來產生另一個進程,但現在不要打擾鏈接,只使用spawn/3。 讓我們編譯簡單的消費模塊:

-module(kinetic_simple_consumer). 

-export([start/1]). 

start(GetShardIteratorPayload) -> 
    Pid = spawn(kinetic_simple_fetcher, start, [self(), GetShardIteratorPayload]), 
    consumer_loop(Pid). 

consumer_loop(FetcherPid) -> 
    receive 
     {FetcherPid, finished} -> 
      ok; 
     {FetcherPid, {records, Records}} -> 
      consume(Records), 
      consumer_loop(FetcherPid); 
     UnexpectedMsg -> 
      io:format("DROPPING:~n~p~n", [UnexpectedMsg]), 
      consumer_loop(FetcherPid) 
    end. 

consume(Records) -> 
    io:format("RECEIVED:~n~p~n",[Records]). 

而且提取程序:

-module(kinetic_simple_fetcher). 

-export([start/2]). 

start(ConsumerPid, GetShardIteratorPayload) -> 
    {ok, [ShardIterator]} = kinetic:get_shard_iterator(GetShardIteratorPayload), 
    fetcher_loop(ConsumerPid, ShardIterator). 

fetcher_loop(ConsumerPid, {_, <<>>}) -> % no clue how last iterator can look like 
    ConsumerPid ! {self(), finished}; 

fetcher_loop(ConsumerPid, ShardIterator) -> 
    {ok, [NextShardIterator, {<<"Records">>, Records}]} = 
     kinetic:get_records(shard_iterator(ShardIterator)), 
    ConsumerPid ! {self(), {records, Records}}, 
    fetcher_loop(ConsumerPid, NextShardIterator). 

shard_iterator({_, ShardIterator}) -> 
    [{<<"ShardIterator">>, <<ShardIterator/binary>>}]. 

正如你可以看到這兩個進程可以同時做他們的工作。從shell 嘗試:

kinetic_simple_consumer:start(GetShardIteratorPayload). 

現在你看到你的shell進程變成消費者,你將有你的shell提取程序後回將發送{ItsPid, finished}

接下來的時間,而不是

kinetic_simple_consumer:start(GetShardIteratorPayload). 

運行:

spawn(kinetic_simple_consumer, start, [GetShardIteratorPayload]). 

你應該產卵過程中發揮 - 這是二郎神的主要力量。

0

在Erlang中,您可以使用尾遞歸函數編寫循環。我不知道動態API,所以爲了簡單起見,我只是假設,當沒有更多的碎片時,kinetic:next_iterator/1返回{ok, NextIterator}{error, Reason}

loop({error, Reason}) -> 
    ok; 
loop({ok, Iterator}) -> 
    do_something_with(Iterator), 
    Result = kinetic:next_iterator(Iterator), 
    loop(Result). 

您正在用迭代替換循環。第一個條款處理的情況下,沒有更多的碎片離開(總是開始遞歸與結束條件)。第二個條款處理案例,我們得到了一些迭代器,我們用它做了一些事情,然後再調用。

遞歸調用是函數體中的最後一條指令,稱爲尾遞歸。 Erlang優化了這種調用 - 它們不使用調用堆棧,因此它們可以在常量內存中無限運行(您不會得到類似「堆棧級別太深」的任何內容)