2013-03-21 45 views
0

我想將數據存儲在Storm Spout發出的hdfs中。我已經在Bolt Class中添加了hadoop FS API代碼,但它正在引發編譯錯誤。如何在java中使用Storm Bolt中的Hadoop FS API

以下是風暴之錘類:

package bolts; 
import java.io.*; 
import java.util.*; 
import java.net.*; 
import org.apache.hadoop.fs.*; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 
import backtype.storm.topology.BasicOutputCollector; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseBasicBolt; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Values; 

public class DataNormalizer extends BaseBasicBolt { 

    public void execute(Tuple input, BasicOutputCollector collector) { 
    String sentence = input.getString(0); 
    String[] process = sentence.split(" "); 
    int n = 1; 
    String rec = ""; 
    try { 
     String filepath = "/root/data/top_output.csv"; 
     String dest = "hdfs://localhost:9000/user/root/nishu/top_output/top_output_1.csv"; 

     Configuration conf = new Configuration(); 
     FileSystem fileSystem = FileSystem.get(conf); 
     System.out.println(fileSystem); 
     Path srcPath = new Path(source); 
     Path dstPath = new Path(dest); 
     String filename = source.substring(source.lastIndexOf('/') + 1, 
       source.length()); 
     try { 
      if (!(fileSystem.exists(dstPath))) { 
       FSDataOutputStream out = fileSystem.create(dstPath, true); 
       InputStream in = new BufferedInputStream(
         new FileInputStream(new File(source))); 
       byte[] b = new byte[1024]; 
       int numBytes = 0; 
       while ((numBytes = in.read(b)) > 0) { 
        out.write(b, 0, numBytes); 
       } 
       in.close(); 
       out.close(); 

      } else { 
       fileSystem.copyFromLocalFile(srcPath, dstPath); 
      } 
     } catch (Exception e) { 
      System.err.println("Exception caught! :" + e); 
      System.exit(1); 
     } finally { 
      fileSystem.close(); 
     } 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
} 

}

我在CLASSPATH還增加了Hadoop的罐子.. 以下是classpath的值:

$STORM_HOME/storm-0.8.1.jar:$JAVA_HOME/lib/:$HADOOP_HOME/hadoop-core-1.0.4.jar:$HADOOP_HOME/lib/:$STORM_HOME/lib/ 

還複製hadoop庫:hadoop-cor-1.0.4.jar,commons-collection-3.2.1.jarcommons-cli-1.2.jar in Storm/lib目錄。

當我建立這個項目,它拋出以下錯誤:

3006 [Thread-16] ERROR backtype.storm.daemon.executor - 
java.lang.NoClassDefFoundError: org/apache/commons/configuration/Configuration 
     at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<init>(DefaultMetricsSystem.java:37) 
     at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<clinit>(DefaultMetricsSystem.java:34) 
     at org.apache.hadoop.security.UgiInstrumentation.create(UgiInstrumentation.java:51) 
     at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:216) 
     at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:184) 
     at org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled(UserGroupInformation.java:236) 
     at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:466) 
     at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:452) 
     at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:1494) 
     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1395) 
     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) 
     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:123) 
     at bolts.DataNormalizer.execute(DataNormalizer.java:67) 
     at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:32) 
     ...................... 

回答

3

錯誤消息告訴你的Apache Commons Configuration也不見了。您必須將其添加到類路徑中。

更一般地說,您應該將所有Hadoop依賴關係添加到您的類路徑中。您可以使用依賴管理器(Maven,Ivy,Gradle等)查找它們,或者在安裝了Hadoop的計算機上查看/usr/lib/hadoop/lib

+0

我已經在類路徑中添加了所有Hadoop/lib jar。你可以檢查上面給出的類路徑.. – 2013-03-21 08:23:40

+0

':$ HADOOP_HOME/lib /:'不是有效的類路徑條目。它應該是':$ HADOOP_HOME/lib/*:',參見[瞭解類路徑通配符](http://docs.oracle.com/javase/6/docs/technotes/tools/solaris/classpath.html)。 – 2013-03-21 12:49:00