2017-12-27 865 views
2

我最近嘗試從Flink 1.3.2升級到1.4.0,我遇到了一些問題,無法再導入org.apache.hadoop.fs.{FileSystem, Path}。問題是發生在兩個地方:從Flink 1.3.2升級到1.4.0 hadoop文件系統和路徑問題

ParquetWriter:

import org.apache.avro.Schema 
import org.apache.avro.generic.GenericRecord 
import org.apache.hadoop.fs.{FileSystem, Path} 
import org.apache.flink.streaming.connectors.fs.Writer 
import org.apache.parquet.avro.AvroParquetWriter 
import org.apache.parquet.hadoop.ParquetWriter 
import org.apache.parquet.hadoop.metadata.CompressionCodecName 

class AvroWriter[T <: GenericRecord]() extends Writer[T] { 

    @transient private var writer: ParquetWriter[T] = _ 
    @transient private var schema: Schema = _ 

    override def write(element: T): Unit = { 
    schema = element.getSchema 
    writer.write(element) 
    } 

    override def duplicate(): AvroWriter[T] = new AvroWriter[T]() 

    override def close(): Unit = writer.close() 

    override def getPos: Long = writer.getDataSize 

    override def flush(): Long = writer.getDataSize 

    override def open(fs: FileSystem, path: Path): Unit = { 
    writer = AvroParquetWriter.builder[T](path) 
     .withSchema(schema) 
     .withCompressionCodec(CompressionCodecName.SNAPPY) 
     .build() 
    } 

} 

CustomBucketer:

import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer 
import org.apache.flink.streaming.connectors.fs.Clock 
import org.apache.hadoop.fs.{FileSystem, Path} 
import java.io.ObjectInputStream 
import java.text.SimpleDateFormat 
import java.util.Date 

import org.apache.avro.generic.GenericRecord 

import scala.reflect.ClassTag 

class RecordFieldBucketer[T <: GenericRecord: ClassTag](dateField: String = null, dateFieldFormat: String = null, bucketOrder: Seq[String]) extends Bucketer[T] { 

    @transient var dateFormatter: SimpleDateFormat = _ 

    private def readObject(in: ObjectInputStream): Unit = { 
    in.defaultReadObject() 
    if (dateField != null && dateFieldFormat != null) { 
     dateFormatter = new SimpleDateFormat(dateFieldFormat) 
    } 
    } 

    override def getBucketPath(clock: Clock, basePath: Path, element: T): Path = { 
    val partitions = bucketOrder.map(field => { 
     if (field == dateField) { 
     field + "=" + dateFormatter.format(new Date(element.get(field).asInstanceOf[Long])) 
     } else { 
     field + "=" + element.get(field) 
     } 
    }).mkString("/") 
    new Path(basePath + "/" + partitions) 
    } 

} 

我注意到,弗林克現在有:

import org.apache.flink.core.fs.{FileSystem, Path} 

但新Path不似乎與AvroParquetWritergetBucketPath方法。我知道Flink的FileSystem和Hadoop依賴關係發生了一些變化,我只是不確定需要導入的內容才能讓我的代碼再次運行。

我是否甚至需要使用Hadoop依賴關係,或者現在有不同的方式來編寫Parquet文件並將其分區爲s3?

build.sbt:

val flinkVersion = "1.4.0" 

libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-scala" % flinkVersion % Provided, 
    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided, 
    "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion, 
    "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion, 
    "org.apache.flink" % "flink-metrics-core" % flinkVersion, 
    "org.apache.flink" % "flink-metrics-graphite" % flinkVersion, 
    "org.apache.kafka" %% "kafka" % "0.10.0.1", 
    "org.apache.avro" % "avro" % "1.7.7", 
    "org.apache.parquet" % "parquet-hadoop" % "1.8.1", 
    "org.apache.parquet" % "parquet-avro" % "1.8.1", 
    "io.confluent" % "kafka-avro-serializer" % "3.2.2", 
    "com.fasterxml.jackson.core" % "jackson-core" % "2.9.2" 
) 

回答

0

hadoop-commons項目中找到必要的org.apache.hadoop.fs.{FileSystem, Path}類。

1

建設 「Hadoop的自由弗林克」 是1.4版本的一個主要特點。 所有您需要做的是包括Hadoop的依賴關係到類路徑或引用的changelogs

...這也意味着,在情況下,您使用的連接器到HDFS,如BucketingSink或RollingSink,你現在必須確保您使用帶有捆綁Hadoop依賴項的Flink分發版,或者在爲應用程序構建jar文件時確保包含Hadoop依賴項。

+0

好的很有意義我會盡力追蹤我需要包含的依賴關係 - 除非您非正式知道。我還想知道的是,如果我甚至需要將依賴關係寫入到s3中,或者現在在Flink 1.4中還有其他方法嗎? – moku