2016-03-03 96 views
0

我最近開始使用spark-kernel。使用spark-kernel comm API

正如在教程和示例代碼中給出的那樣,我能夠設置客戶端並使用它在spark-kernel上執行代碼片段並獲取example code中給出的結果。

現在,我需要使用與spark-kernel一起提供的comm API。我試過這個tutorial,但我無法使它工作。事實上,我不瞭解如何做這項工作。

我試過下面的代碼,但是當我運行這段代碼時,在內核上出現了這個錯誤:「接收到通信打開的無效目標:my_target」。

package examples 
import scala.runtime.ScalaRunTime._ 
import scala.collection.mutable.ListBuffer 
import com.ibm.spark.kernel.protocol.v5.MIMEType 
import com.ibm.spark.kernel.protocol.v5.client.boot.ClientBootstrap 
import com.ibm.spark.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization} 
import com.ibm.spark.kernel.protocol.v5.content._ 
import com.typesafe.config.{Config, ConfigFactory} 
import Array._ 

object commclient extends App{ 
val profileJSON: String = """ 
{ 
     "stdin_port" : 48691, 
     "control_port" : 44808, 
     "hb_port" : 49691, 
     "shell_port" : 40544, 
     "iopub_port" : 43462, 
     "ip" : "127.0.0.1", 
     "transport" : "tcp", 
     "signature_scheme" : "hmac-sha256", 
     "key" : "" 
} 
""".stripMargin 

val config: Config = ConfigFactory.parseString(profileJSON) 
val client = (new ClientBootstrap(config) 
    with StandardSystemInitialization 
    with StandardHandlerInitialization).createClient() 

def printResult(result: ExecuteResult) = { 
    println(s"${result.data.get(MIMEType.PlainText).get}") 
} 
def printStreamContent(content:StreamContent) = { 
    println(s"${content.text}") 
} 
def printError(reply:ExecuteReplyError) = { 
    println(s"Error was: ${reply.ename.get}") 
} 

client.comm.register("my_target").addMsgHandler { 
(commWriter, commId, data) => 
    println(data) 
    commWriter.close() 
} 

// Initiate the Comm connection 
client.comm.open("my_target") 

} 

有人能告訴我如何將我運行這段代碼:

// Register the callback to respond to being opened from the client 
kernel.comm.register("my target").addOpenHandler { 
    (commWriter, commId, targetName, data) => 
     commWriter.writeMsg(Map("response" -> "Hello World!")) 
} 

我會很感激,如果有人可以點我完成COMM API的使用工作的例子。

任何幫助將不勝感激。謝謝

回答

0

您可以使用您的客戶端在一個程序中運行此服務器(內核)端註冊一旦。然後你的其他程序可以使用這個通道與內核通信。 這是我在上面提到的第一個程序中執行註冊的一種方式:

client.execute(
""" 
// Register the callback to respond to being opened from the client 
kernel.comm.register("my target"). 
    addOpenHandler { 
     (commWriter, commId, targetName, data) => 
      commWriter.writeMsg(org.apache.toree.kernel.protocol.v5.MsgData("response" -> "Toree Hello World!")) 
    }. 
    addMsgHandler { 
     (commWriter, _, data) => 
      if (!data.toString.contains("closing")) { 
       commWriter.writeMsg(data) 
      } else { 
       commWriter.writeMsg(org.apache.toree.kernel.protocol.v5.MsgData("closing" -> "done")) 
      } 
    } 
""".stripMargin 
) 
相關問題