2017-07-18 59 views
0

我正在編寫一個Web應用程序,用戶可以在其中發送卡夫卡用戶收到的消息請求。我試圖通過在會話中放置Map對象,然後在收到消息時將消息放入Map對象中。同時在請求處理方法中,使用循環來檢查Map是否填充了消息。現在的問題是,我無法設法獲取消費者對象中的會話對象,因此它不會以這種方式工作。如果還有其他方法可以做到這一點,請告訴我。如何在Web API調用中從Kafka使用者獲取特定消息?

這裏的請求應對方法:

@Override 
public Map<String, Object> getKeyValueMap(String pileCode) { 
    String uniqueId = "GetConfiguration" + new Date().getTime(); 
    session.setAttribute("uniqueId", uniqueId); 

    GetConfigurationReq obj = new GetConfigurationReq(); 
    obj.setKey("getConfiguration"); 
    GetConfigurationReq.Data data = new GetConfigurationReq.Data(); 
    data.setConnector(pileCode); 
    data.setUniqueId(uniqueId); 
    List<KeyValue> keyValue = new ArrayList<>(); 
    for (int i = 0; i < KeyConstant.PARAMS.length; i++) { 
     KeyValue temp = new KeyValue(); 
     temp.setKey(KeyConstant.PARAMS[i]); 
     temp.setValue("0"); 
     keyValue.add(temp); 
    } 
    data.setKeyValue(keyValue); 
    obj.setData(data); 
    String msg = new Gson().toJson(obj); 
    ConnectorChargingOperationProducer producer = new ConnectorChargingOperationProducer(); 
    producer.responseOperation(KeyConstant.KAFKA_PUB, "getConfiguration", msg); 

    Map params = (Map) session.getAttribute("params"); 
    try { 
     int n = 0; 
     while (params.size() == 0) { 
      Thread.sleep(1000); 
      n++; 
      if (n == 20) { 
       System.out.printf("After %s seconds waiting timeout%n", n); 
       break; 
      } 
     } 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    return params; 
} 

消費類:

public class ConnectorChargingOperationConsumer implements MessageListener<String,String> { 
private static Logger logger = LogManager.getLogger(ConnectorChargingOperationConsumer.class); 

@Autowired 
HttpSession session; 

@Override 
public void onMessage(ConsumerRecord<String, String> consumerRecord) { 
    String message = consumerRecord.value(); 
    System.out.println("xxx: " + message); 
    logger.info("yyy: " + message); 
    Gson gson = new Gson(); 
    JsonObject respObj = gson.fromJson(message, JsonObject.class); 
    JsonElement uniqueId = respObj.get("uniqueId"); 
    if (uniqueId != null) { 
     String uniqueId2 = (String) session.getAttribute("uniqueId"); 
     if (uniqueId.getAsString().equals(uniqueId2)) { 
      GetConfigurationResp resp = gson.fromJson(respObj, GetConfigurationResp.class); 
      List<KeyValue> keyValue = resp.getKeyValue(); 
      Map params = (Map) session.getAttribute("params"); 
      for (int i = 0; i < keyValue.size(); i++) { 
       KeyValue temp = keyValue.get(i); 
       params.put(temp.getKey(), temp.getValue()); 
      } 
     } 
    } 
} 
} 

異常調用時拋出:

org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'messageListernerConsumerService': Unsatisfied dependency expressed through field 'session': No qualifying bean of type [javax.servlet.http.HttpSession] found for dependency [javax.servlet.http.HttpSession]: expected at least 1 bean which qualifies as autowire candidate for this dependency. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true)}; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type [javax.servlet.http.HttpSession] found for dependency [javax.servlet.http.HttpSession]: expected at least 1 bean which qualifies as autowire candidate for this dependency. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true)} 
at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:569) 
at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:88) 
at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:349) 
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1214) 
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:543) 
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:482) 
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306) 
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) 
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) 
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) 
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:776) 
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:861) 
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:541) 
at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139) 
at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83) 
at com.yunshan.openCharge.kafka.ConnectorChargingOperationProducer.responseOperation(ConnectorChargingOperationProducer.java:30) 
at com.yunshan.openCharge.service.impl.PileServiceImpl.getKeyValueMap(PileServiceImpl.java:260) 
at com.yunshan.openCharge.controller.PileController.getParameters(PileController.java:158) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221) 
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) 
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:114) 
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827) 
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738) 
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85) 
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:963) 
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897) 
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970) 
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:861) 
at javax.servlet.http.HttpServlet.service(HttpServlet.java:622) 
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846) 
at javax.servlet.http.HttpServlet.service(HttpServlet.java:729) 
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:292) 
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) 
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) 
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) 
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) 
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197) 
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) 
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) 
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) 
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:212) 
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106) 
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502) 
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:141) 
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) 
at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:616) 
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88) 
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:528) 
at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1099) 
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:670) 
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1520) 
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1476) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type [javax.servlet.http.HttpSession] found for dependency [javax.servlet.http.HttpSession]: expected at least 1 bean which qualifies as autowire candidate for this dependency. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true)} 
at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1406) 
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1057) 
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1019) 
at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:566) 
... 59 more 

回答

0

我解決了這個問題,最終通過使用靜態在消費者類中映射字段,而不是試圖將其放入會話中,即:

取代:

@Autowired 
HttpSession session; 

public static final Map<String, Object> PARAMS = new HashMap<>(); 
相關問題