2017-09-17 74 views
0

我有1百萬條記錄,我想試試這個。我有項目列表,並希望使用此列表項目在記錄中執行查找。如何在分佈式環境中運行pyspark代碼

l = ['domestic',"private"] 
text = ["On the domestic front, growth seems to have stalled, private investment and credit off-take is feeble, inflation seems to be bottoming out and turning upward, current account situation is not looking too promising, FPI inflows into debt and equity have slowed, and fiscal deficit situation of states is grim.", "Despite the aforementioned factors, rupee continues to remain strong against the USD and equities continue to outperform.", "This raises the question as to whether the asset prices are diverging from fundamentals and if so when are they expected to fall in line. We examine each of the above factors in a little more detail below.Q1FY18 growth numbers were disappointing with the GVA, or the gross value added, coming in at 5.6 percent. Market participants would be keen to ascertain whether the disappointing growth in Q1 was due to transitory factors such as demonetisation and GST or whether there are structural factors at play. There are silver linings such as a rise in core GVA (GVA excluding agri and public services), a rise in July IIP (at 1.2%), pickup in activity in the cash-intensive sectors, pick up in rail freight and containers handled by ports.However, there is a second school of thought as well, which suggests that growth slowdown could be structural. With demonetisation and rollout of GST, a number of informal industries have now been forced to enter the formal setup."] 
res = {} 
for rec in text: 
    for word in l: 
     if word in rec: 
      res[rec] = 1 
      break 
print res 

這是簡單的Python腳本和同樣的邏輯我想用分佈式的方式pyspark(將相同的代碼工作?),以減少執行時間來執行。

你能指導我如何做到這一點。我很抱歉,因爲我是非常新的火花,你的幫助將會大大增加。

回答

1

instanciating經過spark context和/或spark session,你有你的記錄列表轉換爲dataframe

df = spark.createDataFrame(
    sc.parallelize(
     [[rec] for rec in text] 
    ), 
    ["text"] 
) 
df.show() 

    +--------------------+ 
    |    text| 
    +--------------------+ 
    |On the domestic f...| 
    |Despite the afore...| 
    |This raises the q...| 
    +--------------------+ 

現在你可以檢查每行如果l話存在或不:

sc.broadcast(l) 
res = df.withColumn("res", df.text.rlike('|'.join(l)).cast("int")) 
res.show() 

    +--------------------+---+ 
    |    text|res| 
    +--------------------+---+ 
    |On the domestic f...| 1| 
    |Despite the afore...| 0| 
    |This raises the q...| 0| 
    +--------------------+---+ 
  • rlike是用於執行正則表達式匹配
  • sc.broadcast是用於複製對象l到每一個節點,使他們不必去得到它的驅動程序

希望這有助於

相關問題