-2

我在使用Apache Spark處理一組規則時遇到的一個用例需要幫助。需要使用Apache Spark根據一組規則過濾記錄時需要幫助

由於實際數據有太多的領域,例如,你可以把數據如下圖所示(爲簡單起見給出JSON格式的數據),

records : [{ 
      "recordId": 1, 
      "messages": [{"name": "Tom","city": "Mumbai"}, 
         {"name": "Jhon","address": "Chicago"}, .....] 
      },....] 

rules : [{ 
      ruleId: 1, 
      ruleName: "rule1", 
      criterias: { 
          name: "xyz", 
          address: "Chicago, Boston" 
         } 
      }, ....] 

我要匹配所有規則的所有記錄。這裏是僞代碼:

var matchedRecords = [] 
for(record <- records) 
    for(rule <- rules) 
     for(message <- record.message) 
      if(!isMatch(message, rule.criterias)) 
       break; 
     if(allMessagesMatched) // If loop completed without break 
      matchedRecords.put((record.id, ruleId)) 



def isMatch(message, criteria) = 
      for(each field in crieteria) 
       if(field.value contains comma) 
        if(! message.field containsAny field.value) 
         return false 
       else if(!message.field equals field.value) // value doesnt contain comma 
        return false 
      return true // if loop completed that means all criterias are matched 

有成千上萬的記錄包含成千上萬的消息,並且有這樣的規則hundreads。

解決此類問題的方法有哪些?任何特定的模塊都會有幫助,如(SparkSQL,Spark Mlib,Spark GraphX)?我需要使用任何第三方庫嗎?

方法1:

  • 有無列表[規則] & RDD [記錄]

  • 廣播列表[規則]因爲他們人數少。

  • 將每條記錄與所有規則進行匹配。

仍然在這種情況下沒有parallize計算髮生匹配每條消息與標準。

回答

0

我認爲你的建議方法是很好的方向。如果非要解決這個任務,我會從實現通用特質與負責匹配方法入手:

trait FilterRule extends Serializable { 
    def match(record: Record): Boolean 
} 

然後我會實現特定的過濾器如:

class EqualsRule extends FilterRule 
class RegexRule extends FilterRule 

然後,我將實現複合過濾器如:

class AndRule extends FilterRule 
class OrRule extends FilterRule 
... 

然後您可以篩選RDD或數據集有:

// constructing rule - in reality reading json from configuration, parsing json and creating FilterRule object 
val rule = AndRule(EqualsRule(...), EqualsRule(...), ...) 

// applying rule 
rdd.filter(record => rule.match(r)) 

第二種方法是嘗試使用現有的Spark SQL函數和DataFrame進行過濾,您可以在其中使用和或多列構建相當複雜的表達式。這種方法的缺點是它不是安全的,單元測試會更復雜。