2017-04-21 28 views
-1

我有一個scala代碼,它將csv作爲輸入,讀取每一行,執行每一行的文檔分類並將預測的文檔標籤存儲到MySQL數據庫中。轉換斯卡拉序列化代碼來執行並行操作

問題與片段是,有時csv有3200行,它需要很多時間來完成整個操作。我需要將這些代碼轉換爲執行者之間分發的csv,執行文檔預測並存儲標籤。

以下是代碼片段 -

val reader = new CSVReader(new FileReader(args(4))) 
    var readFirstLine = false; 

    for (row <- reader.readAll) { 
     if(readFirstLine) { 
      var date = row(1).split(" "); 
      var split_date = date(0).split('-').toList; 
      val documentTransformed = tf.transform(row(2).split(" ")) 
      val emotionPredicted = model.predict(documentTransformed) 
      val emotionMapped = emotionMaps(emotionPredicted);   

      //Insert Emotions    
      var query = "insert into emotions_values(user_id, year, month, day, emotion)" + "values ('"+ args(5) +"', '"+ split_date(0) +"', '"+ split_date(1) +"', '"+ split_date(2) +"', '"+ emotionMapped +"')"; 
      statement.executeUpdate(query) 

      val polarityPredicted = polarityModel.predict(documentTransformed) 
      val polarityMapped = polarityMaps(polarityPredicted); 

      //Insert Polarity 
      var polarityQuery = "insert into polarity_values(user_id, year, month, day, polarity)" + "values ('"+ args(5) +"', '"+ split_date(0) +"', '"+ split_date(1) +"', '"+ split_date(2) +"', '"+ polarityMapped +"')"; 
      statement.executeUpdate(polarityQuery) 
     } 
     else { 
      readFirstLine = true; 
     } 
    } 
+1

這是一個相當廣泛的問題 - 您似乎在要求我們爲您編寫整個Spark工作。如果您可以縮小到需要幫助的特定Spark問題,您可能會得到更好的答案。 – DNA

+0

我需要做什麼來將我的行從csv分發給執行程序,執行文檔標記並將預測標記插入到mySQL中?我已經在做文檔標籤並將數據插入到mysql中。我只需要了解如何將csv行分發給執行者? – user2738965

+1

有一個火花的csv閱讀器:https://github.com/databricks/spark-csv 雖然我不確定它是否過時。 – vefthym

回答

0

所有你需要做的是使用內置的CSV功能在星火:

sparkSession.read 
    .option("header", "true") 
    .option("inferSchema", "true") //Maybe 
    .csv(args(4)) 
    .rdd { row => 
     ... 
    } 

這將打開您的CSV的內容爲RDD,然後您可以根據需要進行操作。請注意,只需將header選項設置爲true即可忽略第一行。

我會建議尋找到你是否能由csv方法返回的DataFrame工作 - 這將使你採取星火Catalyst optimizations的優勢 - 而不是由rdd方法返回的RDD