我發現了一種方法,可以隨時從JVM中隨時停止啓動自定義接收器。它是原型的,但沒有經過壓力測試。它並沒有關閉接收器管理員等的層次結構,但是在自定義接收器重新啓動之前,它們基本上變爲空閒狀態。
似乎要尊重星火API
- 創建一個擁有單
HashMap
對象的意圖。在地圖中保持自定義接收器及其所需的狀態(啓用或禁用)。
;;;
case class IKodaTextSocketReceiverStatus(receiver:IKodaTextSocketReceiver,enabled:Boolean)
{
}
object IKodaTextSocketReceiver extends Logging
{
val receiverMap:mutable.HashMap[String,IKodaTextSocketReceiverStatus]= mutable.HashMap[String,IKodaTextSocketReceiverStatus]()
def restartReceiver(recId:String):Boolean=
{
if(receiverMap.get(recId).isDefined)
{
logger.info("Found existing receiver")
receiverMap.put(recId,new IKodaTextSocketReceiverStatus(receiverMap.get(recId).get.receiver,true))
receiverMap.get(recId).get.receiver.onStart()
true
}
else
{
false
}
}
}
啓用的布爾值可以從任何地方設置。從接收器的內部或外部。我從receive
方法中設置。方法onStart
檢查接收器的啓用狀態。如果是假的,它不會重新啓動一個新的線程接收方法.....並且有沉默:)
class IKodaTextSocketReceiver(host: String, port: Int, receiverId:String)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
IKodaTextSocketReceiver.receiverMap.put(receiverId,new IKodaTextSocketReceiverStatus(this,true))
def isEnabled(): Boolean =
{
val bo = IKodaTextSocketReceiver.receiverMap.get(receiverId)
if(bo.isDefined)
{
bo.get.enabled
}
else
{
false
}
}
def onStart() {
// Start the thread that receives data over a connection
if(isEnabled())
{
logger.info("Starting IKodaTextSocketReceiver")
if(!super.isStopped()) {
new Thread("IKodaTextSocketReceiver") {
override def run() {
receive()
}
}.start()
}
else
{
logger.info("Receiver disabled")
}
}
else
{
logger.warn("Restarting after stop set")
}
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
var keepReceiving=true;
try {
logger.info(s"Connecting to $host : $port")
socket = new Socket(host, port)
logger.info(s"Connected to $host : $port")
val reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
if(userInput.contains(StreamingConstants.IKODA_END_STREAM))
{
logger.info(s"${StreamingConstants.IKODA_END_STREAM}: Calling disabling receiver")
IKodaTextSocketReceiver.receiverMap.put(receiverId,new IKodaTextSocketReceiverStatus(this,false))
//stop("Exiting, hopefully with elegance and dignity.")
}
}
reader.close()
socket.close()
logger.info("Stopped receiving")
restart("Trying to connect again",keepReceiving)
} catch {
case e: java.net.ConnectException =>
restart(s"Error connecting to $host : $port"+e.getMessage,keepReceiving)
case t: Throwable =>
restart("Error receiving data"+t.getMessage,keepReceiving)
}
}
通過簡單地檢查它是否在HashMap
註冊開始自定義接收器。如果不是,則創建一個新的。如果是,只需設置爲啓用。
def doStream(ip: String, port: Int): Unit = {
try {
if(!IKodaTextSocketReceiver.restartReceiver("MyFirstReceiver"))
{
val ssc = getSparkStreamingContext(fieldVariables)
ssc.checkpoint("./ikoda/cp")
logger.info("open stream to " + ip + " " + port)
val ikReceiver = new IKodaTextSocketReceiver(ip, port, "MyFirstReceiver")
val lines: DStream[String] = ssc.receiverStream(ikReceiver)
etc etc etc
不幸的是,現在不推薦使用 – Jake