2016-12-14 39 views
0

我的CSV文件:如何在火花的Java上應用的地圖功能RDD操作

YEAR,UTILITY_ID,UTILITY_NAME,OWNERSHIP,STATE_CODE,AMR_METERING_RESIDENTIAL,AMR_METERING_COMMERCIAL,AMR_METERING_INDUSTRIAL,AMR_METERING_TRANS,AMR_METERING_TOTAL,AMI_METERING_RESIDENTIAL,AMI_METERING_COMMERCIAL,AMI_METERING_INDUSTRIAL,AMI_METERING_TRANS,AMI_METERING_TOTAL,ENERGY_SERVED_RESIDENTIAL,ENERGY_SERVED_COMMERCIAL,ENERGY_SERVED_INDUSTRIAL,ENERGY_SERVED_TRANS,ENERGY_SERVED_TOTAL 
2011,34,City of Abbeville - (SC),M,SC,880,14,,,894,,,,,,,,,, 
2011,84,A & N Electric Coop,C,MD,135,25,,,160,,,,,,,,,, 
2011,84,A & N Electric Coop,C,VA,31893,2107,0,,34000,,,,,,,,,, 
2011,97,Adams Electric Coop,C,IL,8334,190,,,8524,,,,,0,,,,,0 
2011,108,Adams-Columbia Electric Coop,C,WI,33524,1788,709,,36021,,,,,,,,,, 
2011,118,Adams Rural Electric Coop, Inc,C,OH,7457,20,,,7477,,,,,,,,,, 
2011,122,Village of Arcade,M,NY,3560,498,100,,4158,,,,,,,,,, 
2011,155,Agralite Electric Coop,C,MN,4383,227,315,,4925,,,,,,,,,, 

這兒星火代碼讀取CSV文件:

import org.apache.spark.api.java.JavaSparkContext; 

public class RddCsv 
{ 
    public static void main(String[] args) 
    { 
    SparkConf conf = new SparkConf().setAppName("CSV Reader").setMaster("local"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaRDD<String> allRows = sc.textFile("file:///home/kumar/Desktop/Eletricaldata/file8_2011.csv");//read csv file 
    System.out.println(allRows.take(5)); 
    } 
} 

我新學sparkJava, 如何從該CsvDataset中選擇Perticuler字段值以及如何執行聚合操作,以及如何使用給定數據集的轉換和操作。以及如何選擇合適的字段值

+0

[如何使用Apache Spark解析CSV或JSON文件]可能的副本(http://stackoverflow.com/questions/25362942/how-to-parsing-csv-or-json-file-with-apache-spark ) – Jobin

回答

0
public static void main(String[] args) 
{ 
    SparkConf conf = new SparkConf().setAppName("CSV Reader").setMaster("local"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaRDD<String> allRows = sc.textFile("file:///home/abhishek/Desktop/file8_2011.csv"); 
    System.out.println(allRows.take(5)); 
    List<String> headers= Arrays.asList(allRows.take(1).get(0).split(",")); 
    String field="YEAR"; 
    //Skip Header 
    JavaRDD<String>dataWithoutHeaders=allRows.filter(x -> !(x.split(",")[headers.indexOf(field)]).equals(field)); 
    //Take one field as integer 
    JavaRDD<Integer> years=dataWithoutHeaders.map(x -> Integer.valueOf(x.split(",")[headers.indexOf(field)])); 
    //Aggregate operation getTotal aggregate() arguments are initial value for a partition,aggregating function for a partition 
    //and aggregating function for results from different partition 
    int total=years.aggregate(0,RddCsv::sum,RddCsv::sum); 
    for (Integer i:years.collect()){ 
     System.out.println("year :: "+i); 
    } 
    System.out.println(total); 
} 

private static int sum(int a,int b){ 
    return a+b; 
} 

這是一個基本的程序。您應該閱讀spark的java apis瞭解詳細信息。

+0

不工作,compitime錯誤正在提供此行.aggregate(0,RddCsv :: sum,RddCsv :: sum); – kumar

+0

我剛剛運行它。 –

+0

輸出:16/12/14 16:26:31 INFO TaskSetManager:在階段3.0(TID 3)中完成的任務0.0在本地主機上的25 ms(1/1) 16/12/14 16:26:31信息TaskSchedulerImpl:已從池中刪除已完成任務的TaskSet 3.0 16/12/14 16:26:31 INFO DAGScheduler:Job 3 finished:collect at RDDCsv.java:32,took 0.049729 s year :: 2011 year :: 2011年 :: 2011年 :: 2011年 :: 2011年 :: 2011年 :: 2011年 :: 2011 16/12/14十六點26分31秒INFO SparkContext:調用停止( )從關機鉤 –