2016-12-29 24 views
2

我們正在研究一個應用程序消耗使用火花流kafka消息..運行在hadoop /紗火花羣....我有log4j屬性配置驅動程序和工人的......但我仍然沒有看到日誌foreachRDD ..我內部消息確實看到了「開始每個RDD」和如何在流媒體應用程序中登錄foreachRDD?

val broadcaseLme=sc.broadcast(lme) 
logInfo("start for each rdd: ") 
val lines: DStream[MetricTypes.InputStreamType] = myConsumer.createDefaultStream() 
      lines.foreachRDD(rdd => { 
      if ((rdd != null) && (rdd.count() > 0) && (!rdd.isEmpty())) { 
       **logInfo("filteredLines: " + rdd.count())** 
       **logInfo("start loop")** 
       rdd.foreach{x => 
       val lme = broadcastLme.value  
       lme.aParser(x).get 
        } 
       logInfo("end loop") 
      } }) 

     logInfo("end of for each rdd ") 

       lines.print(10) 

我使用這個

spark-submit --verbose --class DevMain --master yarn-cluster --deploy-mode cluster --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.p‌​roperties" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j‌​.properties" --files "hdfs://hdfs-name-node:8020/user/hadoopuser/log4j.properties‌​" hdfs://hdfs-name-node:8020/user/hadoopuser/streaming_2.10-1.‌​0.0-SNAPSHOT.jar hdfs://hdfs-name-node:8020/user/hadoopuser/enriched.properti‌​es 

運行在集羣上的應用「的每個RDD結束」我是新來的spark可能有人請幫助爲什麼我沒有看到foreachrdd裏面的日誌消息這是log4j.properties

log4j.rootLogger=WARN, rolling 

log4j.appender.rolling=org.apache.log4j.RollingFileAppender 
log4j.appender.rolling.layout=org.apache.log4j.PatternLayout 
log4j.appender.rolling.layout.conversionPattern=[%p] %d %c %M - %m%n 
log4j.appender.rolling.maxFileSize=100MB 
log4j.appender.rolling.maxBackupIndex=10 
log4j.appender.rolling.file=${spark.yarn.app.container.log.dir}/titanium-spark-enriched.log 
#log4j.appender.rolling.encoding=URF-8 

log4j.logger.org.apache,spark=WARN 
log4j.logger.org.eclipse.jetty=WARN 

log4j.logger.com.x1.projectname=INFO 

#log4j.appender.console=org.apache.log4j.ConsoleAppender 
#log4j.appender.console.target=System.err 
#log4j.appender.console.layout=org.apache.log4j.PatternLayout 
#log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n 

# Settings to quiet third party logs that are too verbose 
#log4j.logger.org.spark-project.jetty=WARN 
#log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR 
#log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO 
#log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO 

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support 
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL 
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR 

#log4j.appender.RollingAppender=org.apache.log4j.DailyRollingFileAppender 
#log4j.appender.RollingAppender.File=./logs/spark/enriched.log 
#log4j.appender.RollingAppender.DatePattern='.'yyyy-MM-dd 
#log4j.appender.RollingAppender.layout=org.apache.log4j.PatternLayout 
#log4j.appender.RollingAppender.layout.ConversionPattern=[%p] %d %c %M - %m%n 


#log4j.rootLogger=INFO, RollingAppender, console 
+0

部署模式是集羣,因此驅動程序的日誌將位於集羣節點之一。你檢查過節點的日誌嗎? –

+0

我正在使用紗線來聚合日誌,我沒有看到裏面的日誌消息foreachRdd – user2359997

+0

你是如何創建myConsumer的?線DStream呢?顯示你後來發佈的代碼。 –

回答

0

使用Spark流應用中的問題似乎是一個丟失了start你建立你的流計算後,即

ssc.start() 

引述scaladoc of StreamingContext

創建後並且轉換DS流,則流式計算可以分別使用context.start()context.stop()來開始和停止。

context.awaitTermination()允許當前線程通過stop()或異常等待上下文的終止。

+0

謝謝你的回答Jacek ...但我有ssc.start沒有它我不會得到日誌消息的我已經越來越....... – user2359997

+0

所以我現在可以拿出原因的唯一原因(rdd!= null)&&(rdd.count()> 0)&&(!rdd.isEmpty()'你爲什麼要這樣做? –

+0

這就是爲了確保我不要爲空或空rdd ...因爲...有時我們在流式傳輸時會得到空批次,如果我在我的假設中不正確,請糾正我 – user2359997

相關問題