2017-10-29 117 views
0

我在Apache Flink 1.3.2中嘗試了幾個flink translate()函數:一個來自BatchTableEnvironment,另一個來自StreamTableEnvironment。該方案給了我錯誤,我迷路了。下面的程序在運行時拋出異常(不知道它需要什麼TypeInformation)。它只是使用應該與結果所需相匹配的類型信息。但它不起作用,有幫助嗎?我想我提供所有的細節......在Apache Flink的TypeInformation 1.3.2 BatchTableEnvironment的翻譯方法

import org.apache.flink.api.common.typeinfo.TypeInformation; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.table.api.BatchTableEnvironment; 
import org.apache.flink.table.api.Table; 
import org.apache.flink.table.api.TableEnvironment; 

/** 
* 
* @author Paul Z. Wu Oct 28, 2017 
*/ 
public class TableEnv { 

    public static void main(String s[]) throws Exception { 
     ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
     BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); 

     DataSet<WC> input = env.fromElements(
       new WC("Hello", 1), 
       new WC("Ciao", 1), 
       new WC("Hello", 1)); 

     tEnv.registerDataSetInternal("abc", input); 
     Table table = tEnv.scan("abc"); 

     Table wordCounts = table 
       .groupBy("word") 
       .select("word, count.sum as count"); 
     System.out.println(wordCounts.getSchema()); 
     DataSet<WC> a = tEnv.translate(wordCounts, TypeInformation.of(WC.class)); 
     a.print(); 
    } 

    public static class WC { 

     public WC(String word, int count) { 
      this.word = word; 
      this.count = count; 
     } 

     public WC() { 
     } // empty constructor to satisfy POJO requirements 

     public String word; 
     public int count; 
    } 

} 

唯一的例外是:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/scala/typeutils/CaseClassTypeInfo 
    at org.apache.flink.table.codegen.CodeGenUtils$.fieldAccessorFor(CodeGenUtils.scala:236) 
    at org.apache.flink.table.codegen.CodeGenerator.org$apache$flink$table$codegen$CodeGenerator$$generateFieldAccess(CodeGenerator.scala:1654) 
    at org.apache.flink.table.codegen.CodeGenerator.org$apache$flink$table$codegen$CodeGenerator$$generateInputAccess(CodeGenerator.scala:1602) 
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$23.apply(CodeGenerator.scala:875) 
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$23.apply(CodeGenerator.scala:874) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) 
    at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) 
    at org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:874) 
    at org.apache.flink.table.plan.nodes.CommonScan$class.generatedConversionFunction(CommonScan.scala:57) 
    at org.apache.flink.table.plan.nodes.dataset.DataSetScan.generatedConversionFunction(DataSetScan.scala:36) 
    at org.apache.flink.table.plan.nodes.dataset.BatchScan$class.convertToInternalRow(BatchScan.scala:48) 
    at org.apache.flink.table.plan.nodes.dataset.DataSetScan.convertToInternalRow(DataSetScan.scala:36) 
    at org.apache.flink.table.plan.nodes.dataset.DataSetScan.translateToPlan(DataSetScan.scala:65) 
    at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:350) 
    at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:329) 
    at com.att.ariso.TableEnv.main(TableEnv.java:39) 
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.scala.typeutils.CaseClassTypeInfo 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    ... 20 more 
------------------------ 

回答

0

需要一個運行時庫弗林克-階。