0
我正在編寫一個Web應用程序,用戶可以在其中發送卡夫卡用戶收到的消息請求。我試圖通過在會話中放置Map對象,然後在收到消息時將消息放入Map對象中。同時在請求處理方法中,使用循環來檢查Map是否填充了消息。現在的問題是,我無法設法獲取消費者對象中的會話對象,因此它不會以這種方式工作。如果還有其他方法可以做到這一點,請告訴我。如何在Web API調用中從Kafka使用者獲取特定消息?
這裏的請求應對方法:
@Override
public Map<String, Object> getKeyValueMap(String pileCode) {
String uniqueId = "GetConfiguration" + new Date().getTime();
session.setAttribute("uniqueId", uniqueId);
GetConfigurationReq obj = new GetConfigurationReq();
obj.setKey("getConfiguration");
GetConfigurationReq.Data data = new GetConfigurationReq.Data();
data.setConnector(pileCode);
data.setUniqueId(uniqueId);
List<KeyValue> keyValue = new ArrayList<>();
for (int i = 0; i < KeyConstant.PARAMS.length; i++) {
KeyValue temp = new KeyValue();
temp.setKey(KeyConstant.PARAMS[i]);
temp.setValue("0");
keyValue.add(temp);
}
data.setKeyValue(keyValue);
obj.setData(data);
String msg = new Gson().toJson(obj);
ConnectorChargingOperationProducer producer = new ConnectorChargingOperationProducer();
producer.responseOperation(KeyConstant.KAFKA_PUB, "getConfiguration", msg);
Map params = (Map) session.getAttribute("params");
try {
int n = 0;
while (params.size() == 0) {
Thread.sleep(1000);
n++;
if (n == 20) {
System.out.printf("After %s seconds waiting timeout%n", n);
break;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return params;
}
消費類:
public class ConnectorChargingOperationConsumer implements MessageListener<String,String> {
private static Logger logger = LogManager.getLogger(ConnectorChargingOperationConsumer.class);
@Autowired
HttpSession session;
@Override
public void onMessage(ConsumerRecord<String, String> consumerRecord) {
String message = consumerRecord.value();
System.out.println("xxx: " + message);
logger.info("yyy: " + message);
Gson gson = new Gson();
JsonObject respObj = gson.fromJson(message, JsonObject.class);
JsonElement uniqueId = respObj.get("uniqueId");
if (uniqueId != null) {
String uniqueId2 = (String) session.getAttribute("uniqueId");
if (uniqueId.getAsString().equals(uniqueId2)) {
GetConfigurationResp resp = gson.fromJson(respObj, GetConfigurationResp.class);
List<KeyValue> keyValue = resp.getKeyValue();
Map params = (Map) session.getAttribute("params");
for (int i = 0; i < keyValue.size(); i++) {
KeyValue temp = keyValue.get(i);
params.put(temp.getKey(), temp.getValue());
}
}
}
}
}
異常調用時拋出:
org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'messageListernerConsumerService': Unsatisfied dependency expressed through field 'session': No qualifying bean of type [javax.servlet.http.HttpSession] found for dependency [javax.servlet.http.HttpSession]: expected at least 1 bean which qualifies as autowire candidate for this dependency. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true)}; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type [javax.servlet.http.HttpSession] found for dependency [javax.servlet.http.HttpSession]: expected at least 1 bean which qualifies as autowire candidate for this dependency. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true)}
at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:569)
at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:88)
at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:349)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1214)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:543)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:482)
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:776)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:861)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:541)
at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83)
at com.yunshan.openCharge.kafka.ConnectorChargingOperationProducer.responseOperation(ConnectorChargingOperationProducer.java:30)
at com.yunshan.openCharge.service.impl.PileServiceImpl.getKeyValueMap(PileServiceImpl.java:260)
at com.yunshan.openCharge.controller.PileController.getParameters(PileController.java:158)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:114)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:963)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:861)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:622)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:729)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:292)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:212)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:141)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79)
at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:616)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:528)
at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1099)
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:670)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1520)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1476)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type [javax.servlet.http.HttpSession] found for dependency [javax.servlet.http.HttpSession]: expected at least 1 bean which qualifies as autowire candidate for this dependency. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true)}
at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1406)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1057)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1019)
at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:566)
... 59 more