spark-submit --class org.dia.red.ctakes.spark.CtakesSparkMain target/spark-ctakes-0.1-job.jar
被拋出的錯誤是:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/uima/cas/FSIndex
at org.dia.red.ctakes.spark.CtakesSparkMain.main(CtakesSparkMain.java:50)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.uima.cas.FSIndex
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 10 more
的CtakesSparkMain
類下面調用CtakesFunction
類:
package org.dia.red.ctakes.spark;
import java.util.List;
import java.io.PrintWriter;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.uima.jcas.cas.FSArray;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.json.JSONObject;
public class CtakesSparkMain {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("ctakes");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("/mnt/d/metistream/ctakes-streaming/SparkStreamingCTK/testdata100.txt").map(new CtakesFunction());
String first = lines.take(2).get(0);
PrintWriter out = new PrintWriter("/mnt/d/metistream/ctakes-streaming/SparkStreamingCTK/test_outputs/output.txt");
out.println(first);
out.close();
sc.close();
}
}
CtakesFunction:
package org.dia.red.ctakes.spark;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.ctakes.typesystem.type.refsem.OntologyConcept;
import org.apache.ctakes.typesystem.type.textsem.*;
import org.apache.uima.UIMAException;
import org.apache.uima.cas.FSIndex;
import org.apache.uima.cas.Type;
import org.apache.uima.UIMAException;
import org.apache.uima.jcas.JCas;
import org.apache.uima.analysis_engine.AnalysisEngineDescription;
import org.apache.uima.cas.impl.XmiCasSerializer;
import org.apache.uima.fit.factory.JCasFactory;
import org.apache.uima.fit.pipeline.SimplePipeline;
import org.apache.uima.jcas.cas.FSArray;
import org.apache.uima.util.XMLSerializer;
import org.apache.spark.api.java.function.Function;
import it.cnr.iac.CTAKESClinicalPipelineFactory;
import org.json.*;
/**
* @author Selina Chu, Michael Starch, and Giuseppe Totaro
*
*/
public class CtakesFunction implements Function<String, String> {
transient JCas jcas = null;
transient AnalysisEngineDescription aed = null;
private void setup() throws UIMAException {
System.setProperty("ctakes.umlsuser", "");
System.setProperty("ctakes.umlspw", "");
this.jcas = JCasFactory.createJCas();
this.aed = CTAKESClinicalPipelineFactory.getDefaultPipeline();
}
private void readObject(ObjectInputStream in) {
try {
in.defaultReadObject();
this.setup();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (UIMAException e) {
e.printStackTrace();
}
}
@Override
public String call(String paragraph) throws Exception {
this.jcas.setDocumentText(paragraph);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
SimplePipeline.runPipeline(this.jcas, this.aed);
FSIndex index = this.jcas.getAnnotationIndex(IdentifiedAnnotation.type);
Iterator iter = index.iterator();
JSONArray annotationsArray = new JSONArray();
JSONObject allAnnotations = new JSONObject();
ArrayList<String> types = new ArrayList<String>();
types.add("org.apache.ctakes.typesystem.type.textsem.SignSymptomMention");
types.add("org.apache.ctakes.typesystem.type.textsem.DiseaseDisorderMention");
types.add("org.apache.ctakes.typesystem.type.textsem.AnatomicalSiteMention");
types.add("org.apache.ctakes.typesystem.type.textsem.ProcedureMention");
types.add("import org.apache.ctakes.typesystem.type.textsem.MedicationMention");
String type;
String[] splitType;
FSArray snomedArray;
ArrayList<String> snomedStringArray = new ArrayList<String>();
while (iter.hasNext()){
IdentifiedAnnotation annotation = (IdentifiedAnnotation)iter.next();
type = annotation.getType().toString();
if (types.contains(type)){
JSONObject annotations = new JSONObject();
splitType = type.split("[.]");
annotations.put("id", annotation.getId());
annotations.put("subject", annotation.getSubject());
annotations.put("type", splitType[splitType.length - 1]);
annotations.put("text", annotation.getCoveredText());
annotations.put("polarity", annotation.getPolarity());
annotations.put("confidence", annotation.getConfidence());
snomedArray = annotation.getOntologyConceptArr();
for (int i = 0; i < snomedArray.size(); i++){
snomedStringArray.add(((OntologyConcept)snomedArray.get(i)).getCode());
}
annotations.put("snomed_codes", snomedStringArray);
snomedStringArray.clear();
annotationsArray.put(annotations);
}
}
allAnnotations.put("Annotations", annotationsArray);
this.jcas.reset();
return allAnnotations.toString();
}
}
我試圖修改存儲庫@https://github.com/selinachu/SparkStreamingCTK以利用正常的Spark而不是SparkStreaming(和Spark 2.0),但尚未能解決此問題。
你正在使用什麼版本的cTakes和Spark?你遇到了什麼其他問題? – YuGagarin
我使用的是補丁版本的cTAKES 4.00(補丁概述在這裏:https://issues.apache.org/jira/browse/CTAKES-445)。對於Spark,目前利用'2.1.0'。目前正在對我的項目進行全面重建,所以我可以在一個小時內重現我遇到的cTAKES快速管道的錯誤。 (奇怪的是,cTAKES默認管道工作正常。) – mongolol
很高興聽到。那麼你是否讓cTakes 4.0與Spark一起工作?我正在研究類似的項目btw(ICD9/ICD10代碼),並自己處理類似的問題...... – YuGagarin