2012-09-06 116 views
1

假設我有大量的整數(比如說有50,000,000個)。如何在Erlang的大列表中找到「最接近」的值

我想寫一個函數,它返回集合中最大的整數,它不超過作爲函數參數傳遞的值。例如。如果值分別爲:

Values = [ 10, 20, 30, 40, 50, 60] 

然後find(Values, 25)應該返回20

該函數將被調用很多次第二和收集大。假設蠻力搜索的執行速度太慢,那麼執行它的有效方法是什麼?這些整數很少會改變,所以它們可以存儲在一個數據結構中,從而實現最快的訪問。

我看過gb_trees,但我不認爲你可以獲得「插入點」,然後獲取以前的條目。

我意識到我可以通過構建我自己的樹結構或二元斬波排序的數組從頭開始這樣做,但是有沒有一些內置的方法可以做到這一點,我忽略了?

+0

如果值是*不*排序,這將如果樣本數據是*不*排序,則更清楚。 (排序的方法絕對是最簡單的,並且具有很好的時間複雜性,尤其是在多次調用時分期付款。) – 2012-09-06 17:45:56

回答

2

這是另一個使用ets的代碼示例。我相信,查找會在大約恆定的時間進行:

1> ets:new(tab,[named_table, ordered_set, public]). 
2> lists:foreach(fun(N) -> ets:insert(tab,{N,[]}) end, lists:seq(1,50000000)). 
3> timer:tc(fun() -> ets:prev(tab, 500000) end). 
{21,499999} 
4> timer:tc(fun() -> ets:prev(tab, 41230000) end). 
{26,41229999} 

代碼周圍會比這當然有點多但比較整齊

+0

這很有趣,我認爲ets:prev/2要求鑰匙是現有的鑰匙,我會試試 - 謝謝! –

+0

太簡單了!正如保羅所說,我記得鑰匙你應該添加一個小的值來得到正確的答案ets:prev(tab,500000.1):o) – Pascal

+0

@PaulCager不需要鍵不存在** IFF **表的類型是* ordered_set *。對於* set *和* bag *鍵必須存在。 – rvirding

1

因此,如果輸入未排序,你可以通過執行獲得線性的版本:

closest(Target, [Hd | Tl ]) -> 
     closest(Target, Tl, Hd). 

closest(_Target, [], Best) -> Best; 
closest(Target, [ Target | _ ], _) -> Target; 
closest(Target, [ N | Rest ], Best) -> 
    CurEps = erlang:abs(Target - Best), 
    NewEps = erlang:abs(Target - N), 
    if NewEps < CurEps -> 
      closest(Target, Rest, N); 
     true -> 
      closest(Target, Rest, Best) 
    end. 

你應該能夠在輸入排序做的更好。

我在這裏發明了我自己的「最接近」指標,因爲我允許最接近的值高於目標值 - 如果您喜歡,您可以將其改爲「最接近但不大於」。

4

要查找大量無序列表,我建議你使用分而治之的策略最近的價值 - 並行處理列表中的不同部分。但足夠的列表的小部分可能會按順序處理

這裏是你的代碼:

-module(finder). 
-export([ nearest/2 ]). 

-define(THRESHOLD, 1000). 

%% 
%% sequential finding of nearest value 
%% 
%% if nearest value doesn't exists - return null 
%% 
nearest(Val, List) when length(List) =< ?THRESHOLD -> 
     lists:foldl(
       fun 
       (X, null) when X < Val -> 
         X; 
       (_X, null) -> 
         null; 
       (X, Nearest) when X < Val, X > Nearest -> 
         X; 
       (_X, Nearest) -> 
         Nearest 
       end, 
       null, 
       List); 
%% 
%% split large lists and process each part in parallel 
%% 
nearest(Val, List) -> 
     { Left, Right } = lists:split(length(List) div 2, List), 
     Ref1 = spawn_nearest(Val, Left), 
     Ref2 = spawn_nearest(Val, Right), 
     Nearest1 = receive_nearest(Ref1), 
     Nearest2 = receive_nearest(Ref2), 
     %% 
     %% compare nearest values from each part 
     %% 
     case { Nearest1, Nearest2 } of 
       { null, null } -> 
         null; 
       { null, Nearest2 } -> 
         Nearest2; 
       { Nearest1, null } -> 
         Nearest1; 
       { Nearest1, Nearest2 } when Nearest2 > Nearest1 -> 
         Nearest2; 
       { Nearest1, Nearest2 } when Nearest2 =< Nearest1 -> 
         Nearest1 
     end. 

spawn_nearest(Val, List) -> 
     Ref = make_ref(), 
     SelfPid = self(), 
     spawn(
       fun() -> 
         SelfPid ! { Ref, nearest(Val, List) } 
       end), 
     Ref. 

receive_nearest(Ref) -> 
     receive 
       { Ref, Nearest } -> Nearest 
     end. 

enter image description here

測試殼:

1> c(finder). 
{ok,finder} 
2> 
2> List = [ random:uniform(1000) || _X <- lists:seq(1,100000) ]. 
[444,724,946,502,312,598,916,667,478,597,143,210,698,160, 
559,215,458,422,6,563,476,401,310,59,579,990,331,184,203|...] 
3> 
3> finder:nearest(500, List). 
499 
4> 
4> finder:nearest(-100, lists:seq(1,100000)). 
null 
5> 
5> finder:nearest(40000, lists:seq(1,100000)). 
39999 
6> 
6> finder:nearest(4000000, lists:seq(1,100000)). 
100000 

性能:(單節點)

7> 
7> timer:tc(finder, nearest, [ 40000, lists:seq(1,10000) ]). 
{3434,10000} 
8> 
8> timer:tc(finder, nearest, [ 40000, lists:seq(1,100000) ]). 
{21736,39999} 
9> 
9> timer:tc(finder, nearest, [ 40000, lists:seq(1,1000000) ]). 
{314399,39999} 

對戰平原迭代:

1> 
1> timer:tc(lists, foldl, [ fun(_X, Acc) -> Acc end, null, lists:seq(1,10000) ]). 
{14994,null} 
2> 
2> timer:tc(lists, foldl, [ fun(_X, Acc) -> Acc end, null, lists:seq(1,100000) ]). 
{141951,null} 
3> 
3> timer:tc(lists, foldl, [ fun(_X, Acc) -> Acc end, null, lists:seq(1,1000000) ]). 
{1374426,null} 

所以,喲可以看到,與百萬元素列表,功能finder:nearest不是通過列表中以純迭代與lists:foldl

對於您的情況,您可能會發現THRESHOLD的最佳值。

此外,如果在不同節點上生成進程,您可以提高性能。

+0

+1。偉大的,完整的迴應。非常好。 –

1

在我看來,如果你有大量的數據不經常改變,你應該考慮組織它。 我寫了一個簡單的基於有序列表,包括插入刪除功能。它爲插入和搜索提供了很好的結果。

-module(finder). 

-export([test/1,find/2,insert/2,remove/2,new/0]). 

-compile(export_all). 

new() -> []. 

insert(V,L) -> 
    {R,P} = locate(V,L,undefined,-1), 
    insert(V,R,P,L). 

find(V,L) -> 
    locate(V,L,undefined,-1). 

remove(V,L) -> 
    {R,P} = locate(V,L,undefined,-1), 
    remove(V,R,P,L). 

test(Max) -> 
    {A,B,C} = erlang:now(), 
    random:seed(A,B,C), 
    L = lists:seq(0,100*Max,100), 
    S = random:uniform(100000000), 
    I = random:uniform(100000000), 
    io:format("start insert at ~p~n",[erlang:now()]), 
    L1 = insert(I,L), 
    io:format("start find at ~p~n",[erlang:now()]), 
    R = find(S,L1), 
    io:format("end at ~p~n result is ~p~n",[erlang:now(),R]). 

remove(_,_,-1,L) -> L; 
remove(V,V,P,L) -> 
    {L1,[V|L2]} = lists:split(P,L), 
    L1 ++ L2; 
remove(_,_,_,L) ->L. 

insert(V,V,_,L) -> L; 
insert(V,_,-1,L) -> [V|L]; 
insert(V,_,P,L) -> 
    {L1,L2} = lists:split(P+1,L), 
    L1 ++ [V] ++ L2. 

locate(_,[],R,P) -> {R,P}; 
locate (V,L,R,P) -> 
    %% io:format("locate, value = ~p, liste = ~p, current result = ~p, current pos = ~p~n",[V,L,R,P]), 
    {L1,[M|L2]} = lists:split(Le1 = (length(L) div 2), L), 
    locate(V,R,P,Le1+1,L1,M,L2). 

locate(V,_,P,Le,_,V,_) -> {V,P+Le}; 
locate(V,_,P,Le,_,M,L2) when V > M -> locate(V,L2,M,P+Le); 
locate(V,R,P,_,L1,_,_) -> locate(V,L1,R,P). 

其中得到下列結果

(EXEC @ WXFRB1824L)6>取景器:試驗(10000000)。在{} 1347,28177,618000

開始

開始插入找到{} 1347,28178,322000

末在{} 1347,28178,728000

結果是{72983500, 729836}

即704ms在10萬個元素列表中插入一個新值並在406ms中插入一個新值以在同一列表中找到最接近的值。

+0

但是,在我的筆記本電腦上,當我嘗試生成50 000 000個整數列表時(eheap_alloc:無法分配298930300個字節的內存),卡住了。所以它不是正確的存儲使用:o(。 – Pascal

+0

謝謝。這是我想到的那種 - 如何構造數據以便於查找。我正在考慮固定數組和二進制搜索 –

0

我試圖得到一個關於我上面提出的算法性能的更準確的信息,閱讀Stemm非常有趣的解決方案,我決定使用tc:timer/3函數。大欺騙:o)。在我的筆記本電腦上,我的時間精確度很差。所以我決定離開我的corei5(2核心* 2線程)+ 2Gb DDR3 + Windows XP 32位以使用我的家用PC:Phantom(6核心)+ 8Gb + Linux 64bit。

現在tc:計時器按預期工作,我可以操縱100 000 000個整數列表。我能看到,我失去了很多時間打電話在每一步的長度的功能,所以我重新分解代碼一點,以避免它:

-module(finder). 

-export([test/2,find/2,insert/2,remove/2,new/0]). 

%% interface 

new() -> {0,[]}. 

insert(V,{S,L}) -> 
    {R,P} = locate(V,L,S,undefined,-1), 
    insert(V,R,P,L,S). 

find(V,{S,L}) -> 
    locate(V,L,S,undefined,-1). 

remove(V,{S,L}) -> 
    {R,P} = locate(V,L,S,undefined,-1), 
    remove(V,R,P,L,S). 

remove(_,_,-1,L,S) -> {S,L}; 
remove(V,V,P,L,S) -> 
    {L1,[V|L2]} = lists:split(P,L), 
    {S-1,L1 ++ L2}; 
remove(_,_,_,L,S) ->{S,L}. 

%% local 

insert(V,V,_,L,S) -> {S,L}; 
insert(V,_,-1,L,S) -> {S+1,[V|L]}; 
insert(V,_,P,L,S) -> 
    {L1,L2} = lists:split(P+1,L), 
    {S+1,L1 ++ [V] ++ L2}. 

locate(_,[],_,R,P) -> {R,P}; 
locate (V,L,S,R,P) -> 
    S1 = S div 2, 
    S2 = S - S1 -1, 
    {L1,[M|L2]} = lists:split(S1, L), 
    locate(V,R,P,S1+1,L1,S1,M,L2,S2). 

locate(V,_,P,Le,_,_,V,_,_) -> {V,P+Le}; 
locate(V,_,P,Le,_,_,M,L2,S2) when V > M -> locate(V,L2,S2,M,P+Le); 
locate(V,R,P,_,L1,S1,_,_,_) -> locate(V,L1,S1,R,P). 

%% test 

test(Max,Iter) -> 
    {A,B,C} = erlang:now(), 
    random:seed(A,B,C), 
    L = {Max+1,lists:seq(0,100*Max,100)}, 
    Ins = test_insert(L,Iter,[]), 
    io:format("insert:~n~s~n",[stat(Ins,Iter)]), 
    Fin = test_find(L,Iter,[]), 
    io:format("find:~n ~s~n",[stat(Fin,Iter)]). 

test_insert(_L,0,Res) -> Res; 
test_insert(L,I,Res) -> 
    V = random:uniform(1000000000), 
    {T,_} = timer:tc(finder,insert,[V,L]), 
    test_insert(L,I-1,[T|Res]). 

test_find(_L,0,Res) -> Res; 
test_find(L,I,Res) -> 
    V = random:uniform(1000000000), 
    {T,_} = timer:tc(finder,find,[V,L]), 
    test_find(L,I-1,[T|Res]). 

stat(L,N) -> 
    Aver = lists:sum(L)/N, 
    {Min,Max,Var} = lists:foldl(fun (X,{Mi,Ma,Va}) -> {min(X,Mi),max(X,Ma),Va+(X-Aver)*(X-Aver)} end, {999999999999999999999999999,0,0}, L), 
    Sig = math:sqrt(Var/N), 
    io_lib:format(" average: ~p,~n minimum: ~p,~n maximum: ~p,~n sigma : ~p.~n",[Aver,Min,Max,Sig]). 

這裏有一些結果。

1> finder:test(1000,10)。 刀片:

平均:266.7,

最小:216,

最大:324,

西格瑪:36.98121144581393。

發現:

average: 136.1, 

最小:105,

最大:162,

西格瑪:15.378231367748375。

ok

2> finder:test(100000,10)。

插入:

平均值:10096。5,

最小:9541,

最大:12222,

西格瑪:762.5642595873478。

發現:

average: 5077.4, 

最低:4666,

最大:6937,

西格瑪:627.126494417195。

ok

3> finder:test(1000000,10)。

刀片:

平均:109871.1,

最小:94747,

最大:139916,

西格瑪:13852.211285206417。

發現: 平均:40428.0,

最低:31297,

最大:56965,

西格瑪:7797.425562325042。

ok

4> finder:test(100000000,10)。

刀片:

平均:8067547.8,

最小:6265625,

最大:16590349,

西格瑪:3199868.809140206。

發現:

average: 8484876.4, 

最低:5158504,

最大:15950944,

西格瑪:4044848.707872872。

確定

在100 000 000列表,它是緩慢的,並且,多進程的解決方案在這種二分法算法不能幫助...這是該解決方案的不足之處,但如果你有幾個並行處理請求找到最接近的值,無論如何,它將能夠使用多核。

帕斯卡。

相關問題