0
我寫了一個UDF,它基本上計算給定的IP地址是否在cidr列表中。我可以從scala調用我的UDF,它工作正常,但是當我從spark sql調用udf時,它拋出了這個錯誤。請幫幫我。scala用戶定義函數不工作在sparksql中
%spark
def isinlist = (ip:String) => {
import org.apache.commons.net.util.SubnetUtils
def checkipinrange = (cidr:String,ip:String) => {
val utils = new SubnetUtils(cidr);
val isInRange = utils.getInfo().isInRange(ip);
if (isInRange) {
true
} else {
false
}
}
sqlContext.udf.register("checkipinrange",checkipinrange)
val query=s"""select *
from tag_ip
where checkipinrange(tag_ip.cidr, '$ip') """
val validrange = sqlContext.sql(query)
if(validrange.count > 0) {
true
} else {
false
}
}
isinlist("5.9.29.73")
sqlContext.udf.register("isinlist",isinlist)
tag_ip是cidr ip範圍的列表。這裏isinlist函數工作正常。但是當我從spark sql調用isinlist函數時,它顯示下面的錯誤。
java.lang.NullPointerException
at $line926276415525.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$$$3baf9f919752f0ab1f5a31ad94af9f4$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$isinlist$1.apply(<console>:198)
at $line926276415525.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$$$3baf9f919752f0ab1f5a31ad94af9f4$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$isinlist$1.apply(<console>:184)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
有人能幫助我什麼問題?
是上述的代碼工作?你怎麼從sql查詢調用isinlist? –