2017-04-24 70 views
2

樣品JSON JavaRDD .foreach( 共100條記錄): 數組列表爲空後,火花

{ 「名」: 「開發」, 「工資」:10000, 「職業」:「ENGG 」, 「地址」: 「諾伊達」} { 「名」: 「KARTHIK」, 「工資」:20000, 「職業」: 「ENGG」, 「地址」: 「諾伊達」}

有用的代碼:

final List<Map<String,String>> jsonData = new ArrayList<>(); 

    DataFrame df = sqlContext.read().json("file:///home/dev/data-json/emp.json"); 
    JavaRDD<String> rdd = df.repartition(1).toJSON().toJavaRDD(); 

    rdd.foreach(new VoidFunction<String>() { 
     @Override 
     public void call(String line) { 
      try { 
       jsonData.add (new ObjectMapper().readValue(line, Map.class)); 
       System.out.println(Thread.currentThread().getName()); 
       System.out.println("List size: "+jsonData.size()); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 
     } 
    }); 

    System.out.println(Thread.currentThread().getName()); 
    System.out.println("List size: "+jsonData.size()); 

jsonData最後是空的。

輸出:

Executor task launch worker-1 
List size: 1 
Executor task launch worker-1 
List size: 2 
Executor task launch worker-1 
List size: 3 
. 
. 
. 
Executor task launch worker-1 
List size: 100 

main 
List size: 0 
+1

由於列表在開始時似乎是空的,它可能是對象映射器無法解析它得到的行嗎?你能提供一個[mcve]嗎? – Thomas

+1

什麼是'rdd'? – khelwood

+2

也許'System.out.println'在foreach完成任務之前執行(或者甚至開始)? – freedev

回答

1

我已經測試過這一點也適用 https://github.com/freedev/spark-test

final ObjectMapper objectMapper = new ObjectMapper(); 

List<Map<String, Object>> list = rdd 
     .map(new org.apache.spark.api.java.function.Function<String, Map<String, Object>>() { 
      @Override 
      public Map<String, Object> call(String line) throws Exception { 
       TypeReference<Map<String, Object>> typeRef = new TypeReference<Map<String, Object>>() { 
       }; 
       Map<String, Object> rs = objectMapper.readValue(line, typeRef); 
       return rs; 
      } 
     }).collect(); 

我首選映射Map<String, Object>,因爲這會在你的Json辦案不到哪值部分不是字符串(即"salary":20000)。

+1

這個問題被標記爲'java-7'。 Java 8代碼不太可能有用。 – khelwood

+0

@khelwood謝謝 – freedev

+0

@freedev感謝您的努力。我嘗試了它,但得到'異常在線程「主要」org.apache.spark.SparkException:不可序列化的任務 \t at org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:304)'即使添加'將Serializable'實現到我正在運行的主類中主要方法 –