4

從數據幀獲得一個重複計數我有一個看起來像這樣使用Apache星火

+--------------+---------+-------+---------+ 
|  dataOne|OtherData|dataTwo|dataThree| 
+--------------+---------|-------+---------+ 
|   Best|  tree|  5|  533| 
|   OK|  bush|  e|  3535| 
|   MEH|  cow|  -|  3353| 
|   MEH|  oak| none|  12| 
+--------------+---------+-------+---------+ 

數據,我想進入它的

+--------------+---------+ 
|  dataOne| Count| 
+--------------+---------| 
|   Best|  1| 
|   OK|  1| 
|   Meh|  2| 
+--------------+---------+ 

輸出我沒問題將dataOne自己獲取到數據框中並顯示它的內容以確保我只抓取dataOne列 但是,我似乎無法找到將該sql查詢轉換爲數據的正確語法我需要。我試圖創建從整個數據創建臨時鑑於這種下列數據框設置

Dataset<Row> dataOneCount = spark.sql("select dataOne, count(*) from 
dataFrame group by dataOne"); 
dataOneCount.show(); 

但引發 我能找到這只是演示瞭如何做這類聚集的火花1.6和以前這樣的文檔任何幫助,將不勝感激。

這是我得到的錯誤消息,但是我檢查了數據,並且在那裏沒有索引錯誤。

java.lang.ArrayIndexOutOfBoundsException: 11 

我也嘗試應用功能()方法countDistinct

Column countNum = countDistinct(dataFrame.col("dataOne")); 
Dataset<Row> result = dataOneDataFrame.withColumn("count",countNum); 
result.show(); 

其中dataOneDataFrame是從運行

select dataOne from dataFrame 

創建的數據幀,但它返回一個分析異常,我米仍然是新的火花,所以我不知道如何/當我正在評估countDistinct方法有錯誤

編輯:爲了澄清,顯示的第一個表是我從閱讀的文本文件,並應用自定義模式,以它所創建的數據幀的結果(他們仍然是所有的字符串)

Dataset<Row> dataFrame 

這裏是我的完整代碼

public static void main(String[] args) { 


    SparkSession spark = SparkSession 
      .builder() 
      .appName("Log File Reader") 
      .getOrCreate(); 

    //args[0] is the textfile location 
    JavaRDD<String> logsRDD = spark.sparkContext() 
      .textFile(args[0],1) 
      .toJavaRDD(); 

    String schemaString = "dataOne OtherData dataTwo dataThree"; 

    List<StructField> fields = new ArrayList<>(); 
    String[] fieldName = schemaString.split(" "); 


    for (String field : fieldName){ 
     fields.add(DataTypes.createStructField(field, DataTypes.StringType, true)); 
    } 
    StructType schema = DataTypes.createStructType(fields); 

    JavaRDD<Row> rowRDD = logsRDD.map((Function<String, Row>) record -> { 
     String[] attributes = record.split(" "); 
     return RowFactory.create(attributes[0],attributes[1],attributes[2],attributes[3]); 
    }); 


    Dataset<Row> dF = spark.createDataFrame(rowRDD, schema); 

    //first attempt 
    dF.groupBy(col("dataOne")).count().show(); 

    //Trying with a sql statement 
    dF.createOrReplaceTempView("view"); 
    dF.sparkSession().sql("select command, count(*) from view group by command").show(); 

想到的最可能的事情是使用RowFactory返回行的lambda函數?這個想法聽起來很合理,但我不確定它是如何堅持下去的,或者有另一種方式可以做到。除此之外,我挺納悶的

樣本數據

best tree 5 533 
OK bush e 3535 
MEH cow - 3353 
MEH oak none 12 

回答

2

使用Scala的語法方便。這是非常類似於Java的語法:

// Input data 
val df = { 
    import org.apache.spark.sql._ 
    import org.apache.spark.sql.types._ 
    import scala.collection.JavaConverters._ 

    val simpleSchema = StructType(
    StructField("dataOne", StringType) :: 
    StructField("OtherData", StringType) :: 
    StructField("dataTwo", StringType) :: 
    StructField("dataThree", IntegerType) :: Nil) 

    val data = List(
    Row("Best", "tree", "5", 533), 
    Row("OK", "bush", "e", 3535), 
    Row("MEH", "cow", "-", 3353), 
    Row("MEH", "oak", "none", 12) 
) 

    spark.createDataFrame(data.asJava, simpleSchema) 
} 

df.show 
+-------+---------+-------+---------+ 
|dataOne|OtherData|dataTwo|dataThree| 
+-------+---------+-------+---------+ 
| Best|  tree|  5|  533| 
|  OK|  bush|  e|  3535| 
| MEH|  cow|  -|  3353| 
| MEH|  oak| none|  12| 
+-------+---------+-------+---------+ 
df.groupBy(col("dataOne")).count().show() 
+-------+-----+ 
|dataOne|count| 
+-------+-----+ 
| MEH| 2| 
| Best| 1| 
|  OK| 1| 
+-------+-----+ 

我可以提交以上S3的四列數據文件如下給出的Java代碼它工作正常:

$SPARK_HOME/bin/spark-submit \ 
    --class sparktest.FromStackOverflow \ 
    --packages "org.apache.hadoop:hadoop-aws:2.7.3" \ 
    target/scala-2.11/sparktest_2.11-1.0.0-SNAPSHOT.jar "s3a://my-bucket-name/sample.txt" 
+0

我已經在我的java程序中嘗試了這種方法,並且它返回java.lang.ArrayIndexOutOfBoundsException:11 – Sentinel

+0

你能用一個小的隔離示例來重現它嗎?您使用的是什麼版本的Spark?你嘗試過別人嗎? – clay

+0

使用spark 2.1添加了完整的代碼,我不想退回到以前版本的火花,儘管 – Sentinel