-1
我有一個scala代碼,它將csv作爲輸入,讀取每一行,執行每一行的文檔分類並將預測的文檔標籤存儲到MySQL數據庫中。轉換斯卡拉序列化代碼來執行並行操作
問題與片段是,有時csv有3200行,它需要很多時間來完成整個操作。我需要將這些代碼轉換爲執行者之間分發的csv,執行文檔預測並存儲標籤。
以下是代碼片段 -
val reader = new CSVReader(new FileReader(args(4)))
var readFirstLine = false;
for (row <- reader.readAll) {
if(readFirstLine) {
var date = row(1).split(" ");
var split_date = date(0).split('-').toList;
val documentTransformed = tf.transform(row(2).split(" "))
val emotionPredicted = model.predict(documentTransformed)
val emotionMapped = emotionMaps(emotionPredicted);
//Insert Emotions
var query = "insert into emotions_values(user_id, year, month, day, emotion)" + "values ('"+ args(5) +"', '"+ split_date(0) +"', '"+ split_date(1) +"', '"+ split_date(2) +"', '"+ emotionMapped +"')";
statement.executeUpdate(query)
val polarityPredicted = polarityModel.predict(documentTransformed)
val polarityMapped = polarityMaps(polarityPredicted);
//Insert Polarity
var polarityQuery = "insert into polarity_values(user_id, year, month, day, polarity)" + "values ('"+ args(5) +"', '"+ split_date(0) +"', '"+ split_date(1) +"', '"+ split_date(2) +"', '"+ polarityMapped +"')";
statement.executeUpdate(polarityQuery)
}
else {
readFirstLine = true;
}
}
這是一個相當廣泛的問題 - 您似乎在要求我們爲您編寫整個Spark工作。如果您可以縮小到需要幫助的特定Spark問題,您可能會得到更好的答案。 – DNA
我需要做什麼來將我的行從csv分發給執行程序,執行文檔標記並將預測標記插入到mySQL中?我已經在做文檔標籤並將數據插入到mysql中。我只需要了解如何將csv行分發給執行者? – user2738965
有一個火花的csv閱讀器:https://github.com/databricks/spark-csv 雖然我不確定它是否過時。 – vefthym