2016-07-07 103 views
1

我儘量做到以下幾點:斯卡拉:類型不匹配MapFunction [Tuple2 [文本,文本],NotInferedR]

env 
    .readHadoopFile(new TeraInputFormat(), classOf[Text], classOf[Text], inputPath) 
    .map(tp => tp) 

但後來我在我的編輯得到一個類型不匹配錯誤:

Expected: MapFunction[Tuple2[Text, Text], NotInferedR], actual: (Nothing) => Nothing 

我該如何解決這個問題?

這是完整的代碼:

import org.apache.flink.api.common.functions.Partitioner 
import org.apache.flink.api.common.operators.Order 
import org.apache.flink.api.java.ExecutionEnvironment 
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat 
import org.apache.hadoop.fs.Path 
import org.apache.hadoop.io.Text 
import org.apache.hadoop.mapred.JobConf 
import org.apache.hadoop.mapreduce.Job 

class OptimizedFlinkTeraPartitioner(underlying:TotalOrderPartitioner) extends Partitioner[OptimizedText] { 
    def partition(key:OptimizedText, numPartitions:Int):Int = { 
    underlying.getPartition(key.getText()) 
    } 
} 


object FlinkTeraSort { 

    implicit val textOrdering = new Ordering[Text] { 
    override def compare(a:Text, b:Text) = a.compareTo(b) 
    } 

    def main(args: Array[String]){ 
    if(args.size != 4){ 
     println("Usage: FlinkTeraSort hdfs inputPath outputPath #partitions ") 
     return 
    } 

    val env = ExecutionEnvironment.getExecutionEnvironment 
    env.getConfig.enableObjectReuse() 

    val hdfs = args(0) 
    val inputPath= hdfs+args(1) 
    val outputPath = hdfs+args(2) 
    val partitions = args(3).toInt 

    val mapredConf = new JobConf() 
    mapredConf.set("fs.defaultFS", hdfs) 
    mapredConf.set("mapreduce.input.fileinputformat.inputdir", inputPath) 
    mapredConf.set("mapreduce.output.fileoutputformat.outputdir", outputPath) 
    mapredConf.setInt("mapreduce.job.reduces", partitions) 

    val partitionFile = new Path(outputPath, TeraInputFormat.PARTITION_FILENAME) 
    val jobContext = Job.getInstance(mapredConf) 
    TeraInputFormat.writePartitionFile(jobContext, partitionFile) 
    val partitioner = new OptimizedFlinkTeraPartitioner(new TotalOrderPartitioner(mapredConf, partitionFile)) 

    val data = env.readHadoopFile(new TeraInputFormat(), classOf[Text], classOf[Text], inputPath) 

    data.map(tp => tp) 

    data.output(new HadoopOutputFormat[Text, Text](new TeraOutputFormat(), jobContext)) 

    env.execute("TeraSort") 
    } 
} 

(build.sbt):

name := "terasort" 

version := "0.0.1" 

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" 

libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.0.3" 

fork in run := true 
+0

你從哪裏得到'TeraInputFormat'?你可以發佈完整的程序嗎?我剛剛用'SequenceFileInputFormat'測試了一個類似的程序,似乎所有東西都可以工作。 –

+0

當然:)!它現在更新。 –

回答

1
data.map{case (t1: Text, t2: Text) => t1} 

解決了這個問題。