2017-07-07 32 views
-1

我解釋我的問題與一個假想的表有兩列(COL1,COL2)是這樣的:需要快速的算法來找到的關係記錄萬億互斥的網絡(關係)

col1 col2 
A <--> B 
A <--> C 
E <--> F 
B <--> D 
E <--> G 

此表是在一份文件。在上述情況下,我想將其分解爲互斥關係文件。因此,對於上述表中的結果將是兩個文件(表):

col1 col2 
A <--> B 
A <--> C  
B <--> D 

col1 col2 
E <--> F  
E <--> G 

真正的文件有重複記錄萬億(關係),我希望把它分成相互排斥關係文件。需要任何智能算法的幫助。我正在使用pyspark從鑲木地板文件中讀取表格。所以,任何pyspark代碼都會非常好,但不是必需的(算法更重要)。

+0

你到目前爲止試過了什麼?向我們展示一些代碼! – MrSmith42

+2

我想你正在尋找圖中的* connected components *(你稱之爲網絡)。這是一個深入研究的問題,我相信有線性複雜性的解決方案。從您最喜愛的搜索引擎中找到幫助應該不會有太大困難。 –

+0

謝謝。我試着用谷歌,但不知道搜索的實際條件。 –

回答

1

通過「互斥」你可能意味着沒有一個共同的節點。這個問題被稱爲圖中連接組件的枚舉。你可以用Union-Find技術來解決它。

對於每個節點,您都會將鏈接關聯到屬於同一組件的另一個節點。當考慮一個新的關係時,讓其中一個成員鏈接到另一個成員。

有關詳細信息,請參閱https://en.wikipedia.org/wiki/Disjoint-set_data_structure。 這個過程很快,您可以通過所謂的路徑壓縮技術加速它。

最後,每個組件都有一個沒有鏈接到任何其他節點的單個節點,列出它們包含的所有組件和節點並不是什麼大不了的事情。

步驟中給出的示例的處理可能是

A -> B 
C -> A -> B 
C -> A -> B, E -> F 
C -> A -> B -> D, E -> F 
C -> A -> B -> D, E -> F -> G 

(你得到線性表的事實是純屬意外)。

0

使用Spark,速度非常快。對於那些處理大數據的人來說,它可能是有用的:

from pyspark import SparkContext 
from pyspark.sql.types import * 
from pyspark.sql import Row 
from pyspark.sql import SparkSession 
from pyspark.sql.functions import col, size, length 
from pyspark.sql.functions import * 


spark = SparkSession.builder \ 
    .appName("Python Spark SQL") \ 
    .config("spark.driver.memory", "32g") \ 
    .config("spark.executor.memory", "32g") \ 
    .config("spark.executor.cores", "12") \ 
    .config("spark.local.dir", "~/mytmp") \ 
    .config("spark.jars.packages", "graphframes:graphframes:0.5.0-spark2.1-s_2.11") \ 
    .getOrCreate() 
SparkContext.setSystemProperty('spark.executor.memory', '32g') 
sc = spark.sparkContext 

df = spark.read.parquet("my_table_file.parquet") 
df.registerTempTable("mytable") 
# df has two columns : col1 and col2. Similar to hypothetical table presented in my question 

v = spark.sql("select col1 as id from mytable union select col2 as id from mytable ") 
e = spark.sql("select col1 as src, col2 as dst from mytable") 
sc.setCheckpointDir("~/cc_checkpoint") 

from graphframes import * 
g = GraphFrame(v, e) 
connected = g.connectedComponents() 
connected.registerTempTable("mytable_connected_components") 
connected.select("component").distinct().count()