2011-08-25 111 views
27

我正在嘗試編寫一個簡短的腳本來通過clojure(儘管稱爲Mahout類)對我的數據進行聚類。我在這個格式我的輸入數據(這是從一個php script輸出)用Mahout使用Clojure進行聚類(fkmeans)

format: (tag) (image) (frequency) 
tag_sit image_a 0 
tag_sit image_b 1 
tag_lorem image_a 1 
tag_lorem image_b 0 
tag_dolor image_a 0 
tag_dolor image_b 1 
tag_ipsum image_a 1 
tag_ipsum image_b 1 
tag_amit image_a 1 
tag_amit image_b 0 
... (more) 

然後我使用此腳本它們寫入到一個SequenceFile(Clojure的)

#!./bin/clj 
(ns sensei.sequence.core) 

(require 'clojure.string) 
(require 'clojure.java.io) 

(import org.apache.hadoop.conf.Configuration) 
(import org.apache.hadoop.fs.FileSystem) 
(import org.apache.hadoop.fs.Path) 
(import org.apache.hadoop.io.SequenceFile) 
(import org.apache.hadoop.io.Text) 

(import org.apache.mahout.math.VectorWritable) 
(import org.apache.mahout.math.SequentialAccessSparseVector) 

(with-open [reader (clojure.java.io/reader *in*)] 
    (let [hadoop_configuration ((fn [] 
           (let [conf (new Configuration)] 
            (. conf set "fs.default.name" "hdfs://localhost:9000/") 
            conf))) 
     hadoop_fs (FileSystem/get hadoop_configuration)] 
    (reduce 
     (fn [writer [index value]] 
     (. writer append index value) 
     writer) 
     (SequenceFile/createWriter 
     hadoop_fs 
     hadoop_configuration 
     (new Path "test/sensei") 
     Text 
     VectorWritable) 
     (map 
     (fn [[tag row_vector]] 
      (let [input_index (new Text tag) 
       input_vector (new VectorWritable)] 
      (. input_vector set row_vector) 
      [input_index input_vector])) 
     (map 
      (fn [[tag photo_list]] 
      (let [photo_map (apply hash-map photo_list) 
        input_vector (new SequentialAccessSparseVector (count (vals photo_map)))] 
       (loop [frequency_list (vals photo_map)] 
       (if (zero? (count frequency_list)) 
        [tag input_vector] 
        (when-not (zero? (count frequency_list)) 
        (. input_vector set 
         (mod (count frequency_list) (count (vals photo_map))) 
         (Integer/parseInt (first frequency_list))) 
        (recur (rest frequency_list))))))) 
      (reduce 
      (fn [result next_line] 
       (let [[tag photo frequency] (clojure.string/split next_line #" ")] 
       (update-in result [tag] 
        #(if (nil? %) 
        [photo frequency] 
        (conj % photo frequency))))) 
      {} 
      (line-seq reader))))))) 

基本上它接通輸入到序列文件,在該格式

鍵(文本):$ tag_uri 值(VectorWritable):矢量(基數=文檔數目)與數字索引和相應的頻率<0:1 1:0 2:0 3:1 4:0 ...>

然後我開始做實際集羣這個腳本(參照本blog post

#!./bin/clj 

(ns sensei.clustering.fkmeans) 

(import org.apache.hadoop.conf.Configuration) 
(import org.apache.hadoop.fs.Path) 

(import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver) 
(import org.apache.mahout.common.distance.EuclideanDistanceMeasure) 
(import org.apache.mahout.clustering.kmeans.RandomSeedGenerator) 

(let [hadoop_configuration ((fn [] 
           (let [conf (new Configuration)] 
            (. conf set "fs.default.name" "hdfs://127.0.0.1:9000/") 
            conf))) 
     input_path (new Path "test/sensei") 
     output_path (new Path "test/clusters") 
     clusters_in_path (new Path "test/clusters/cluster-0")] 
    (FuzzyKMeansDriver/run 
    hadoop_configuration 
    input_path 
    (RandomSeedGenerator/buildRandom 
     hadoop_configuration 
     input_path 
     clusters_in_path 
     (int 2) 
     (new EuclideanDistanceMeasure)) 
    output_path 
    (new EuclideanDistanceMeasure) 
    (double 0.5) 
    (int 10) 
    (float 5.0) 
    true 
    false 
    (double 0.0) 
    false)) '' runSequential 

當runSequential設置爲true

但是我得到的輸出喜歡這個

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". 
SLF4J: Defaulting to no-operation (NOP) logger implementation 
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 
11/08/25 15:20:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
11/08/25 15:20:16 INFO compress.CodecPool: Got brand-new compressor 
11/08/25 15:20:16 INFO compress.CodecPool: Got brand-new decompressor 
11/08/25 15:20:17 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
11/08/25 15:20:17 INFO input.FileInputFormat: Total input paths to process : 1 
11/08/25 15:20:17 INFO mapred.JobClient: Running job: job_local_0001 
11/08/25 15:20:17 INFO mapred.MapTask: io.sort.mb = 100 
11/08/25 15:20:17 INFO mapred.MapTask: data buffer = 79691776/99614720 
11/08/25 15:20:17 INFO mapred.MapTask: record buffer = 262144/327680 
11/08/25 15:20:17 WARN mapred.LocalJobRunner: job_local_0001 
java.lang.IllegalStateException: No clusters found. Check your -c path. 
     at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansMapper.setup(FuzzyKMeansMapper.java:62) 
     at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142) 
     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) 
     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369) 
     at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210) 
11/08/25 15:20:18 INFO mapred.JobClient: map 0% reduce 0% 
11/08/25 15:20:18 INFO mapred.JobClient: Job complete: job_local_0001 
11/08/25 15:20:18 INFO mapred.JobClient: Counters: 0 
Exception in thread "main" java.lang.RuntimeException: java.lang.InterruptedException: Fuzzy K-Means Iteration failed processing test/clusters/cluster-0/part-randomSeed 
     at clojure.lang.Util.runtimeException(Util.java:153) 
     at clojure.lang.Compiler.eval(Compiler.java:6417) 
     at clojure.lang.Compiler.load(Compiler.java:6843) 
     at clojure.lang.Compiler.loadFile(Compiler.java:6804) 
     at clojure.main$load_script.invoke(main.clj:282) 
     at clojure.main$script_opt.invoke(main.clj:342) 
     at clojure.main$main.doInvoke(main.clj:426) 
     at clojure.lang.RestFn.invoke(RestFn.java:436) 
     at clojure.lang.Var.invoke(Var.java:409) 
     at clojure.lang.AFn.applyToHelper(AFn.java:167) 
     at clojure.lang.Var.applyTo(Var.java:518) 
     at clojure.main.main(main.java:37) 
Caused by: java.lang.InterruptedException: Fuzzy K-Means Iteration failed processing test/clusters/cluster-0/part-randomSeed 
     at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.runIteration(FuzzyKMeansDriver.java:252) 
     at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClustersMR(FuzzyKMeansDriver.java:421) 
     at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(FuzzyKMeansDriver.java:345) 
     at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:295) 
     at sensei.clustering.fkmeans$eval17.invoke(fkmeans.clj:35) 
     at clojure.lang.Compiler.eval(Compiler.java:6406) 
     ... 10 more 

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". 
SLF4J: Defaulting to no-operation (NOP) logger implementation 
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 
11/09/07 14:32:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
11/09/07 14:32:32 INFO compress.CodecPool: Got brand-new compressor 
11/09/07 14:32:32 INFO compress.CodecPool: Got brand-new decompressor 
Exception in thread "main" java.lang.IllegalStateException: Clusters is empty! 
     at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClustersSeq(FuzzyKMeansDriver.java:361) 
     at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(FuzzyKMeansDriver.java:343) 
     at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:295) 
     at sensei.clustering.fkmeans$eval17.invoke(fkmeans.clj:35) 
     at clojure.lang.Compiler.eval(Compiler.java:6465) 
     at clojure.lang.Compiler.load(Compiler.java:6902) 
     at clojure.lang.Compiler.loadFile(Compiler.java:6863) 
     at clojure.main$load_script.invoke(main.clj:282) 
     at clojure.main$script_opt.invoke(main.clj:342) 
     at clojure.main$main.doInvoke(main.clj:426) 
     at clojure.lang.RestFn.invoke(RestFn.java:436) 
     at clojure.lang.Var.invoke(Var.java:409) 
     at clojure.lang.AFn.applyToHelper(AFn.java:167) 
     at clojure.lang.Var.applyTo(Var.java:518) 
     at clojure.main.main(main.java:37) 

我也將fkmeans腳本改寫爲這種形式

#!./bin/clj 

(ns sensei.clustering.fkmeans) 

(import org.apache.hadoop.conf.Configuration) 
(import org.apache.hadoop.fs.Path) 

(import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver) 
(import org.apache.mahout.common.distance.EuclideanDistanceMeasure) 
(import org.apache.mahout.clustering.kmeans.RandomSeedGenerator) 

(let [hadoop_configuration ((fn [] 
           (let [conf (new Configuration)] 
            (. conf set "fs.default.name" "hdfs://localhost:9000/") 
            conf))) 
     driver (new FuzzyKMeansDriver)] 
    (. driver setConf hadoop_configuration) 
    (. driver 
    run 
    (into-array String ["--input" "test/sensei" 
         "--output" "test/clusters" 
         "--clusters" "test/clusters/clusters-0" 
         "--clustering" 
         "--overwrite" 
         "--emitMostLikely" "false" 
         "--numClusters" "3" 
         "--maxIter" "10" 
         "--m" "5"]))) 

,但仍然得到同樣的錯誤作爲第一個初始版本:/

命令行工具運行良好

$ bin/mahout fkmeans --input test/sensei --output test/clusters --clusters test/clusters/clusters-0 --clustering --overwrite --emitMostLikely false --numClusters 10 --maxIter 10 --m 5 

然而,當我嘗試即使clusterdumper不會返回點 - -clustering選項存在於先前的命令中並且此處定義了--pointsDir

$ ./bin/mahout clusterdump --seqFileDir test/clusters/clusters-1 --pointsDir test/clusters/clusteredPoints --output sensei.txt 

Ma豪特版本中使用:0.6快照,Clojure的1.3.0快照

請讓我知道如果我錯過了什麼

+1

你能檢查一下,初始簇是否真的被生成了? –

+1

這是什麼更新? – AlphaMale

+1

是的,它解決了嗎?如果是這樣你可以發表一個答案。 –

回答

2

我的猜測是,Mahout的實現模糊c均值需要初始聚類到首先,你可能沒有提供?

另外它聽起來有點像你正在運行單節點?請注意,對於單節點系統,您應該避免所有Mahout/Hadoop開銷,並使用常規聚類算法。 Hadoop/Mahout的成本相當高,只有當您不再能夠在單個系統上處理數據時,纔會帶來回報。它不是「地圖縮小」,除非您在系統數量較多的情況下這樣做。