2016-07-21 67 views
0

我是新的火花,我想要保存到Hbase表的recommendProductsForUsers的輸出。我發現一個示例(https://sparkkb.wordpress.com/2015/05/04/save-javardd-to-hbase-using-saveasnewapihadoopdataset-spark-api-java-coding/)顯示使用JavaPairRDD和saveAsNewAPIHadoopDataset進行保存。如何保存火花MatrixFactorizationModel recommendedProductsForUsers到Hbase

如何將JavaRDD<Tuple2<Object, Rating[]>>轉換爲JavaPairRDD<ImmutableBytesWritable, Put>以便我可以使用saveAsNewAPIHadoopDataset?

//Loads the data from hdfs 
    MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(), trainedDataPath); 

//Get recommendations for all users 
    JavaRDD<Tuple2<Object, Rating[]>> ratings3 = sameModel.recommendProductsForUsers(noOfProductsToReturn).toJavaRDD(); 
+0

您想保存模型或建議嗎? – eliasah

+0

@eliasah我想保存推薦 – Ani

回答

0

這就是我如何解決上述問題,希望這會對某人有所幫助。

JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts1 = ratings3 
        .mapToPair(new PairFunction<Tuple2<Object, Rating[]>, ImmutableBytesWritable, Put>() { 

         @Override 
         public Tuple2<ImmutableBytesWritable, Put> call(Tuple2<Object, Rating[]> arg0) 
           throws Exception { 
          Rating[] userAndProducts = arg0._2; 
          System.out.println("***********" + userAndProducts.length + "**************"); 
          List<Item> items = new ArrayList<Item>(); 
          Put put = null 
          String recommendedProduct = "";       
          for (Rating r : userAndProducts) { 

//Some logic here to convert Ratings into appropriate put command 
// recommendedProduct = r.product; 

} 

          put.addColumn(Bytes.toBytes("recommendation"), Bytes.toBytes("product"),Bytes.toBytes(recommendedProduct));      Bytes.toBytes("product"),Bytes.toBytes(response.getItems().toString())); 

          return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put); 
         } 
        }); 

      System.out.println("*********** Number of records in JavaPairRdd: "+ hbasePuts1.count() +"**************"); 
      hbasePuts1.saveAsNewAPIHadoopDataset(newApiJobConfig.getConfiguration()); 
      jsc.stop();   
0

通過使用mapToPair。從你提供的示例相同的源(我改變了手工類型):

JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts = javaRDD.mapToPair(
    new PairFunction<Tuple2<Object, Rating[]>, ImmutableBytesWritable, Put>() { 
@Override 
public Tuple2<ImmutableBytesWritable, Put> call(Tuple2<Object, Rating[]> row) throws Exception { 

    Put put = new Put(Bytes.toBytes(row.getString(0))); 
    put.add(Bytes.toBytes("columFamily"), Bytes.toBytes("columnQualifier1"), Bytes.toBytes(row.getString(1))); 
    put.add(Bytes.toBytes("columFamily"), Bytes.toBytes("columnQualifier2"), Bytes.toBytes(row.getString(2))); 

     return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);  
} 
}); 

它是這樣的,你cretne看跌的構造排密鑰提供它的新實例,然後爲每列調用補充。然後你返回創建的put。