EDIT2的恆定延遲:最後,我已經用Java做了我自己製作,而且運作良好,因此的問題是在卡夫卡的控制檯製片。卡夫卡控制檯消費者運作良好。卡夫卡+星火流:1秒
編輯:我已經嘗試過版本0.9.0.1,並具有相同的行爲。
我正在研究我的單身漢最終項目,Spark Streaming和Flink之間的比較。在兩個框架之前,我使用Kafka和一個腳本來生成數據(如下所述)。我的第一個測試是比較兩種框架與簡單工作負載之間的延遲,而Kafka給我一個非常高的延遲(持續1秒)。爲了簡單起見,目前我只用一臺機器運行Kafka和Spark。
我已經尋找並發現了類似的問題,並嘗試了他們給出的解決方案,但沒有任何改變。我還檢查了所有的卡夫卡配置的官方文檔中,並把importants在我的配置文件的等待時間,這是我的配置:
卡夫卡0.10.2.1 - 星火2.1.0
server.properties :
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=2
num.recovery.threads.per.data.dir=1
log.flush.interval.messages=1000
log.flush.interval.ms=50
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
flush.messages=100
flush.ms=10
producer.properties:
compression.type=none
max.block.ms=200
linger.ms=50
batch.size=0
火花流程序:(其打印接收到的數據,並將所創建的數據,並且當正在爲函數處理時之間的差)
package com.tfg.spark1.spark1;
import java.util.Map;
import java.util.HashMap;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.apache.spark.streaming.kafka.*;
public final class Timestamp {
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: Timestamp <topics> <numThreads>");
System.exit(1);
}
SparkConf conf = new SparkConf().setMaster("spark://192.168.0.155:7077").setAppName("Timestamp");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.milliseconds(100));
Map<String, Integer> topicMap = new HashMap<String, Integer>();
int numThreads = Integer.parseInt(args[1]);
topicMap.put(args[0], numThreads);
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "192.168.0.155:2181", "grupo-spark", topicMap); //Map<"test", 2>
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
public String call (Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> newLine = lines.map(new Function<String, String>() {
private static final long serialVersionUID = 1L;
public String call(String line) {
String[] tuple = line.split(" ");
String totalTime = String.valueOf(System.currentTimeMillis() - Long.valueOf(tuple[1]));
//String newLine = line.concat(" " + String.valueOf(System.currentTimeMillis()) + " " + totalTime);
return totalTime;
}
});
lines.print();
newLine.print();
jssc.start();
jssc.awaitTermination();
}
}
的生成的數據具有以下格式:
"Random bits" + " " + "current time in ms"
01 1496421618634
11 1496421619044
00 1496421619451
00 1496421618836
10 1496421619247
最後,當我運行火花流程序和腳本生成器,它在本例中生成每200ms的數據,火花(批次間隔= 100毫秒)打印9個空批次,和每一秒(總900毫秒的時刻,如:Tim E:1496421619 MS)這導致:
-------------------------------------------
Time: 1496421619900 ms
-------------------------------------------
01 1496421618634
11 1496421619044
00 1496421619451
00 1496421618836
10 1496421619247
-------------------------------------------
Time: 1496421619900 ms
-------------------------------------------
1416
1006
599
1214
803
另外,如果我運行一個卡夫卡的命令行,製片人和另一個命令行的消費,它總是需要一些時間來打印生產數據消費者。
在此先感謝您的幫助!
嘗試簡單的消費者首先看看它的火花具體或卡夫卡具體。幾乎沒有帖子(也來自linkedin)報告30毫秒的延遲。 –
你的意思是卡夫卡控制檯消費者?我已經嘗試過,它也會延遲收到元素。我也從幾個網站上看到它可以實現這種延遲。我也會嘗試使用較舊的Kafka版本。謝謝! :D – Franmoti
它也可能取決於您的硬件(例如線程數)。也嘗試看到系統處於穩定狀態(不只是一兩個消息)也許需要時間來「預熱」 –