我試圖將數十億數據值的MultiMap轉換爲Spark DataFrame來運行計算,然後將結果寫入cassandra表。Spark - MultiMap可以在JAVA中轉換爲DataFrame
我從下面的cassandra查詢和循環生成multimap。如果能有更好的方式來獲取和處理這些數據到DataFrame中,就像我使用循環一樣,我會很樂意接受建議。
代碼更新了與答案:
//Build ResultSet from cassandra query for data manipulation.
Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
//Statement stmt = new SimpleStatement("SELECT power, bandwidth, start_frequency FROM model.reports;");
stmt.setFetchSize(1000);
ResultSet results = session.execute(stmt);
// Get the Variables from each Row of Cassandra Data
Multimap<Double, Float> data = LinkedListMultimap.create();
for (Row row : results){
// Column Names in Cassandra (Case Sensitive)
start_frequency = row.getDouble("Start_Frequency");
power = row.getFloat("Power");
bandwidth = row.getDouble("Bandwidth");
// Create Channel Power Buckets, place information into prepared statement binding, write to cassandra.
for(channel = 1.6000E8; channel <= channel_end; ){
if((channel >= start_frequency) && (channel <= (start_frequency + bandwidth))) {
data.put(channel, power);
} // end if
channel+=increment;
} // end for
} // end "row" for
// Create Spark List for DataFrame
List<Value> values = data.asMap().entrySet()
.stream()
.flatMap(x -> x.getValue()
.stream()
.map(y -> new Value(x.getKey(), y)))
.collect(Collectors.toList());
// Create DataFrame and Calculate Results
sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
} // end session
} // End Compute
public class Value implements Serializable {
public Value(Double channel, Float power) {
this.channel = channel;
this.power = power;
}
Double channel;
Float power;
public void setChannel(Double channel) {
this.channel = channel;
}
public void setPower(Float power) {
this.power = power;
}
public Double getChannel() {
return channel;
}
public Float getPower() {
return power;
}
@Override
public String toString() {
return "[" +channel +","+power+"]";
}
}
樣品多重映射具有類型{雙} = [浮點]其中可以存在多個浮動項對於每個雙
例
{1.50E8=[10, 20], 1.51E8=[-10, -13, -14, -15], 1.52E8=[-10, -11]
我需要使用火花來獲得每個這些的最小值,最大值和平均值。例如,對於第一個1.50ED將分10,最高20,平均15
我已經有,我可以用一次,我可以在一個不是Temptable得到它和操作上的數據幀代碼:
queryMV.groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
我將不勝感激關於如何使用JAVA將multimap轉換爲DataFrame的一些技巧。我一直無法找到任何有關使用multimaps火花的文檔。
我目前正在使用一個解決方案,執行初始查詢並使用for循環將原始數據寫入新表,我可以直接映射到一個臨時/數據框,但這需要很長時間,因爲我必須寫入計算之前數十億行到cassandra。我想使用一個multimap或類似的東西,並直接轉換爲火花進行計算。
我想使用火花,因爲這個計算將被處理超過十億個不同的值。該表格將如下所示:'key:value,value,value'我需要獲取關鍵值並獲取值的最小值,最大值和平均值。例如,如果我的密鑰是1.50E8,我的值是10,20我的輸出應該是1.50E8最小10,最大20,平均15 – mithrix