2011-12-26 84 views
1

請幫我理解以下內容。用JPA和JMS進行春季集成測試的交易傳播

我有一個Spring集成測試,我想測試ProcessCommentsDao類的方法:)

@RunWith(SpringJUnit4ClassRunner.class) 
@ContextConfiguration(locations = {"classpath:testContext.xml"}) 
@Transactional() 
public class ParseCommentsTest { 

    @Resource 
    private ProcessCommentsDao processCommentsDao; 

    @Test 
    public void testJMS() throws Exception { 

    // Test data creation 
    ......................... 

    processCommentsDao.parseComments(); 
    } 
} 

在該方法中parseComments(,我得到實體的名單,然後每個實體處理通過Spring的JMS消息監聽實現:

@Service 
public class ProcessCommentsDaoImpl extends BaseDao implements IProcessCommentsDao { 

    private static final int PARSE_COMMENTS_COUNT_LIMIT = 100; 
    @Autowired 
    private JmsTemplate jmsTemplate; 
    @Autowired 
    private Queue parseCommentsDestination; 

    @Override 
    public void parseComments() { 

     List<Comment> comments = attributeDao.findTopUnparsedComments(PARSE_COMMENTS_COUNT_LIMIT); 

     for (Comment comment : comments) { 
     jmsTemplate.convertAndSend(parseCommentsDestination, comment); 
     } 
    } 
} 

了MessageListener的執行標準如下:

@Component 
public class QueueListener implements MessageListener { 

    @PersistenceContext 
    private EntityManager em; 

    @Transactional() 
    public void onMessage(final Message message) { 
     try { 
     if (message instanceof ObjectMessage) { 
      final ObjectMessage objMessage = (ObjectMessage) message; 
      Comment comment = (Comment) objMessage.getObject(); 

      //...Some logic ... 

      comment = em.find(Comment.class, comment.getId()); 
      comment.setPosStatus(ParsingType.PROCESSED); 
      em.merge(comment); 

       //...Some logic ... 

    } catch (final JMSException e) { 
     e.printStackTrace(); 
    } 
} 

}

其結果是,該方法em.find(Comment.class,comment.getId())返回null,因爲數據是在另一個線程創建和當前線程不知道這個交易什麼。有沒有辦法設置事務傳播,以便MessageListener方法可以在主線程中創建,其中測試方法在哪裏運行?

回答

0

其結果是,該方法em.find(Comment.class,comment.getId())返回null,因爲數據是在另一個線程創建和當前線程不知道這個交易什麼。

更準確地說,消息偵聽器在單獨的事務中運行,並且它看不到由ParseCommentsTest.testJMS創建的數據,因爲該方法沒有提交。

更重要的是,您的測試寫得不正確。它有一個競爭條件:對jmsTemplate.convertAndSend()的調用是異步的,所以QueueListener.messageListener()中的邏輯可以在測試方法完成後調用(並回滾它所做的更改)。每次運行時,此測試可能會產生不同的結果。

您的代碼也不容易測試。考慮將處理邏輯從onMessage()方法提取到POJO並單獨進行測試。

+0

謝謝爲了您的評論。我也想過以你所建議的方式進行測試。另外,我找到了我的問題的解決方案。我在下一篇文章中寫過。關於測試中的種族問題,我發佈了一個不完整的版本。更正了下面的代碼。 – Prix 2011-12-29 12:05:40

1

我找到了我的問題的下一個解決方案。測試數據在一個單獨的事務,這創造manualy產生:

@RunWith(SpringJUnit4ClassRunner.class) 
@ContextConfiguration(locations = {"classpath:testContext.xml"}) 
@Transactional() 
public class ParseCommentsTest { 

    @Resource 
    private ProcessCommentsDao processCommentsDao; 
    @Autowired 
    private PlatformTransactionManager transactionManager; 

    @Before 
    public void tearUp() { 
    createTestData(); 
    } 

    @Test 
    @Rollback(false) 
    public void testJMS() throws Exception { 
    processCommentsDao.parseComments(); 
    } 

    @After 
    public void tearDown() { 
    removeTestData(); 
} 

private void createTestData() { 
    TransactionTemplate txTemplate = new TransactionTemplate(transactionManager); 
    txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); 
    txTemplate.execute(new TransactionCallback<Object>() { 

     @Override 
     public Object doInTransaction(TransactionStatus status) { 
      try { 
      // Test data creation 
      ........................... 
     } 
    }); 
    } 
} 

在方法ProcessCommentsDaoImpl.parseComments()被實現等待所有異步JMS請求的完成。主線程完成其工作,直到所有的實體處理:

@Service 
public class ProcessCommentsDaoImpl extends BaseDao implements IProcessCommentsDao { 

    private static final int PARSE_COMMENTS_COUNT_LIMIT = 100; 
    @Autowired 
    private JmsTemplate jmsTemplate; 
    @Autowired 
    private Queue parseCommentsDestination; 

    @Override 
    public void parseComments() { 

    List<Comment> comments = attributeDao.findTopUnparsedComments(PARSE_COMMENTS_COUNT_LIMIT); 

    for (Comment comment : comments) { 
    jmsTemplate.convertAndSend(parseCommentsDestination, comment); 
    } 
    // Wait all request procesed 
    waitParseCommentsProcessed(comments); 
    } 

    @Override 
    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW) 
    public void parseComment(Long commentId) { 
    ...................... 
    //Some logic 
    .................... 
    } 
} 

而且MessageLisener的一個重構如下:ParseCommentQueueListener的

public class ParseCommentQueueListener { 

    private static Logger log = Logger.getLogger(ParseCommentQueueListener.class); 

    @Resource(name = SpringContext.DAO_PROCESS_COMMENTS) 
    private IProcessCommentsDao processCommentsDao; 

    public Object receive(ObjectMessage message) { 
    try { 
     Long id = (Long) message.getObject(); 
     processCommentsDao.parseComment(id); 
    } catch (JMSException ex) { 
     log.error(ex.toString()); 
    } 
    return message; 
    } 
} 

xml配置是如下:

<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> 
    <constructor-arg> 
     <bean class="com.provoxlabs.wordminer.parsing.ParseCommentQueueListener"/> 
    </constructor-arg> 
    <property name="defaultListenerMethod" value="receive"/> 
    <property name="defaultResponseDestination" ref="parseCommentsStatusDestination"/> 
    <!-- we don't want automatic message context extraction --> 
    <property name="messageConverter"> 
     <null/> 
    </property> 
</bean>