2012-10-25 51 views
4

我們怎樣才能使我們的MapReduce的查詢速度更快?如何使Riak MapReduce查詢更快?

我們已經建立了使用了五點了Riak數據庫集羣的應用程序。 我們的數據模型由三個桶組成:比賽,聯賽和球隊。

匹配包含鏈接到聯賽和球隊:

型號

var match = { 
     id: matchId, 
     leagueId: meta.leagueId, 
     homeTeamId: meta.homeTeamId, 
     awayTeamId: meta.awayTeamId, 
     startTime: m.match.startTime, 
     firstHalfStartTime: m.match.firstHalfStartTime, 
     secondHalfStartTime: m.match.secondHalfStartTime, 
     score: { 
      goals: { 
       a: 1*safeGet(m.match, 'score.goals.a'), 
       b: 1*safeGet(m.match, 'score.goals.b') 
      }, 
      corners: { 
       a: 1*safeGet(m.match, 'score.corners.a'), 
       b: 1*safeGet(m.match, 'score.corners.b') 
      } 
     } 
    }; 

var options = { 
     index: { 
      leagueId: match.leagueId, 
      teamId: [match.homeTeamId, match.awayTeamId], 
      startTime: match.startTime || match.firstHalfStartTime || match.secondHalfStartTime 
     }, 
     links: [ 
      { bucket: 'leagues', key: match.leagueId, tag: 'league' }, 
      { bucket: 'teams', key: match.homeTeamId, tag: 'home' }, 
      { bucket: 'teams', key: match.awayTeamId, tag: 'away' } 
     ] 
    }; 
    match.model = 'match'; 
    modelCache.save('matches', match.id, match, options, callback); 

查詢

我們寫了幾個水桶返回結果的查詢,一個辦法就是查詢每個桶分開。另一種方法是使用鏈接來合併單個查詢的結果。查詢

兩個版本,我們都嘗試接管第二,無論多麼小的我們的桶大小。 第一個版本使用了兩個地圖階段,我們在這篇文章之後建模(Practical Map-Reduce: Forwarding and Collecting)。

#!/bin/bash 
curl -X POST \ 
-H "content-type: application/json" \ 
-d @- \ 
http://localhost:8091/mapred \ 
<<EOF 
{ 
    "inputs":{ 
     "bucket":"matches", 
     "index":"startTime_bin", 
     "start":"2012-10-22T23:00:00", 
     "end":"2012-10-24T23:35:00" 
    }, 
    "query": [ 
     {"map":{"language": "javascript", "source":" 
       function(value, keydata, arg){ 
        var match = Riak.mapValuesJson(value)[0]; 
        var links = value.values[0].metadata.Links; 
        var result = links.map(function(l) { 
         return [l[0], l[1], match]; 
        }); 
        return result; 
       } 
      "} 
     }, 
     {"map":{"language": "javascript", "source": " 
       function(value, keydata, arg) { 
        var doc = Riak.mapValuesJson(value)[0]; 
        return [doc, keydata]; 
       } 
      "} 
     }, 
     {"reduce":{ 
      "language": "javascript", 
       "source":" 
        function(values) { 
         var merged = {}; 
         values.forEach(function(v) { 
          if(!merged[v.id]) { 
           merged[v.id] = v; 
          } 
         }); 
         var results = []; 
         for(key in merged) { 
          results.push(merged[key]); 
         } 
         return results; 
        } 
       " 
      } 
     } 
    ] 
} 
EOF 

在第二個版本中,我們做四個單獨的map-reduce查詢來獲得來自三個桶的對象:

async.series([ 
     //First get all matches 
     function(callback) { 
      db.mapreduce 
       .add(inputs) 
       .map(function (val, key, arg) { 
        var data = Riak.mapValuesJson(val)[0]; 
        if(arg.leagueId && arg.leagueId != data.leagueId) { 
         return []; 
        } 
        var d = new Date(); 
        var date = data.startTime || data.firstHalfStartTime || data.secondHalfStartTime; 
        d.setFullYear(date.substring(0, 4)); 
        d.setMonth(date.substring(5, 7) - 1); 
        d.setDate(date.substring(8, 10)); 
        d.setHours(date.substring(11, 13)); 
        d.setMinutes(date.substring(14, 16)); 
        d.setSeconds(date.substring(17, 19)); 
        d.setMilliseconds(0); 
        startTimestamp = d.getTime(); 
        var short = { 
         id: data.id, 
         l: data.leagueId, 
         h: data.homeTeamId, 
         a: data.awayTeamId, 
         t: startTimestamp, 
         s: data.score, 
         c: startTimestamp 
        }; 
        return [short]; 
       }, {leagueId: query.leagueId, page: query.page}).reduce(function (val, key) { 
        return val; 
       }).run(function (err, matches) { 
        matches.forEach(function(match) { 
         result.match[match.id] = match; //Should maybe filter this 
         leagueIds.push(match.l); 
         teamIds.push(match.h); 
         teamIds.push(match.a); 
        }); 
        callback(); 
       }); 
     }, 
     //Then get all leagues, teams and lines in parallel 
     function(callback) { 
      async.parallel([ 
       //Leagues 
       function(callback) { 
        db.getMany('leagues', leagueIds, function(err, leagues) { 
         if (err) { callback(err); return; } 
         leagues.forEach(function(league) { 
          visibleLeagueIds[league.id] = true; 
          result.league[league.id] = { 
           r: league.regionId, 
           n: league.name, 
           s: league.name 
          }; 
         }); 
         callback(); 
        }); 
       }, 
       //Teams 
       function(callback) { 
        db.getMany('teams', teamIds, function(err, teams) { 
         if (err) { callback(err); return; } 
         teams.forEach(function(team) { 
          result.team[team.id] = { 
           n: team.name, 
           h: team.name, 
           s: team.stats 
          }; 
         }); 
         callback(); 
        }); 
       } 
      ], callback); 
     } 
    ], function(err) { 
     if (err) { callback(err); return; } 
     _.each(regionModel.getAll(), function(region) { 
      result.region[region.id] = { 
       id: region.id, 
       c: 'https://d1goqbu19rcwi8.cloudfront.net/icons/silk-flags/' + region.icon + '.png', 
       n: region.name 
      }; 
     }); 
     var response = { 
      success: true, 
      result: { 
       modelRecords: result, 
       paging: { 
        page: query.page, 
        pageSize: 50, 
        total: result.match.length 
       }, 
       time: moment().diff(a)/1000.00, 
       visibleLeagueIds: visibleLeagueIds 
      } 
     }; 
     callback(null, JSON.stringify(response, null, '\t')); 
    }); 

我們如何讓這些查詢速度更快?

附加信息:

我們使用了Riak-js和node.js中來運行我們的查詢。

回答

7

的一種方式,使其至少快一點將是JavaScript的MapReduce函數部署到服務器上,而不是將它們通過作爲工作的一部分。 (見的js_source_dir參數here描述)。如果您有重複運行的JavaScript函數,通常建議使用這種方法。

由於存在相比於二郎山實現原生的,使用非JavaScript函數在可能情況下也可以幫助運行的JavaScript MapReduce函數相關的一些開銷。

在你第一次查詢中的兩個地圖相功能似乎被設計來解決的限制,即一個正常的鏈接階段(我認爲是更有效的),不通過被處理的記錄(比賽記錄) 。第一個函數包含所有鏈接,並以JSON形式傳遞匹配數據作爲附加數據,而第二個函數傳遞匹配數據以及JSON形式的鏈接記錄。

我寫了一個簡單的二郎山功能,包括所有的鏈接,以及在通過記錄的ID這可能連同本地二郎功能riak_kv_mapreduce使用:map_object_value,以取代在兩個地圖相功能的第一個例子,刪除一些JavaScript的用法。正如在現有的解決方案中,我希望你會收到一些重複的數據,因爲幾場比賽可能會鏈接到同一個聯賽/球隊。

-module(riak_mapreduce_example). 

-export([map_link/3]). 

%% @spec map_link(riak_object:riak_object(), term(), term()) -> 
%%     [{{Bucket :: binary(), Key :: binary()}, Props :: term()}] 
%% @doc map phase function for adding linked records to result set 
map_link({error, notfound}, _, _) -> 
    []; 
map_link(RiakObject, Props, _) -> 
    Bucket = riak_object:bucket(RiakObject), 
    Key = riak_object:key(RiakObject), 
    Meta = riak_object:get_metadata(RiakObject), 
    Current = [{{Bucket, Key}, Props}], 
    Links = case dict:find(<<"Links">>, Meta) of 
     {ok, List} -> 
      [{{B, K}, Props} || {{B, K}, _Tag} <- List]; 
     error -> 
      [] 
    end, 
    lists:append([Current, Links]). 

這些結果可以發送回客戶端進行聚合或傳遞到縮減階段函數,如您提供的示例。

的例子功能將需要編譯和安裝所有節點上,可能需要重新啓動。

另一種方式來提高性能(這非常可能不適合你的選擇),或許應該改變的數據模型,以避免乾脆使用MapReduce的查詢性能關鍵查詢。