Java事务处理全解析(八)——分布式事务入门例子(Spring+JTA+Atomikos+Hibernate+JMS)
在本系列先前的文章中,我们主要讲解了JDBC对本地事务的处理,本篇文章将讲到一个分布式事务的例子。
请通过以下方式下载github源代码:
git clone https://github.com/davenkin/jta-atomikos-hibernate-activemq.git
本地事务和分布式事务的区别在于:本地事务只用于处理单一数据源事务(比如单个数据库),分布式事务可以处理多种异构的数据源,比如某个业务操作中同时包含了JDBC和JMS或者某个操作需要访问多个不同的数据库。
Java通过JTA完成分布式事务,JTA本身只是一种规范,不同的应用服务器都包含有自己的实现(比如JbossJTA),同时还存在独立于应用服务器的单独JTA实现,比如本篇中要讲到的Atomikos。对于JTA的原理,这里不细讲,读者可以通过这篇文章了解相关知识。
在本篇文章中,我们将实现以下一个应用场景:你在网上购物,下了订单之后,订单数据将保存在系统的数据库中,同时为了安排物流,订单信息将以消息(Message)的方式发送到物流部门以便送货。
以上操作同时设计到数据库操作和JMS消息发送,为了使整个操作成为一个原子操作,我们只能选择分布式事务。我们首先设计一个service层,定义OrderService接口:
package davenkin;
public interface OrderService {
public void makeOrder(Order order);
}
为了简单起见,我们设计一个非常简单的领域对象Order:
@XmlRootElement(name = "Order")
@XmlAccessorType(XmlAccessType.FIELD)
public class Order {
@XmlElement(name = "Id",required = true)
private long id;
@XmlElement(name = "ItemName",required = true)
private String itemName;
@XmlElement(name = "Price",required = true)
private double price;
@XmlElement(name = "BuyerName",required = true)
private String buyerName;
@XmlElement(name = "MailAddress",required = true)
private String mailAddress;
public Order() {
}
为了采用JAXB对Order对象进行Marshal和Unmarshal,我们在Order类中加入了JAXB相关的Annotation。 我们将使用Hibernate来完成数据持久化,然后使用Spring提供的JmsTemplate将Order转成xml后以TextMessage的形式发送到物流部门的ORDER.QUEUE中。
(一)准备数据库
为了方便,我们将采用Spring提供的embedded数据库,默认情况下Spring采用HSQL作为后台数据库,虽然在本例中我们将采用HSQL的非XA的DataSource,但是通过Atomikos包装之后依然可以参与分布式事务。
SQL脚本包含在createDB.sql文件中:
CREATE TABLE USER_ORDER(
ID INT NOT NULL,
ITEM_NAME VARCHAR (100) NOT NULL UNIQUE,
PRICE DOUBLE NOT NULL,
BUYER_NAME CHAR (32) NOT NULL,
MAIL_ADDRESS VARCHAR(500) NOT NULL,
PRIMARY KEY(ID)
);
在Spring中配置DataSource如下:
<jdbc:embedded-database id="dataSource">
<jdbc:script location="classpath:createDB.sql"/>
</jdbc:embedded-database>
(二)启动ActiveMQ
我们将采用embedded的ActiveMQ,在测试之前启动ActiveMQ提供的BrokerService,在测试执行完之后关闭BrokerService。
@BeforeClass
public static void startEmbeddedActiveMq() throws Exception {
broker = new BrokerService();
broker.addConnector("tcp://localhost:61616");
broker.start();
}
@AfterClass
public static void stopEmbeddedActiveMq() throws Exception {
broker.stop();
}
(三)实现OrderService
创建一个DefaultOrderService,该类实现了OrderService接口,并维护一个JmsTemplate和一个Hibernate的SessionFactory实例变量,分别用于Message的发送和数据库处理。
package davenkin;
import org.hibernate.SessionFactory;
import org.hibernate.classic.Session;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.annotation.Transactional;
public class DefaultOrderService implements OrderService{
private JmsTemplate jmsTemplate;
private SessionFactory sessionFactory;
@Override
@Transactional
public void makeOrder(Order order) {
Session session = sessionFactory.getCurrentSession();
session.save(order);
jmsTemplate.convertAndSend(order);
}
@Required
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
@Required
public void setSessionFactory(SessionFactory sessionFactory) {
this.sessionFactory = sessionFactory;
}
}
(四)创建Order的Mapping配置文件
<?xml version="1.0"?>
<!DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN"
"http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd">
<hibernate-mapping>
<class name="davenkin.Order" table="USER_ORDER">
<id name="id" type="long">
<column name="ID" />
<generator class="increment" />
</id>
<property name="itemName" type="string">
<column name="ITEM_NAME" />
</property>
<property name="price" type="double">
<column name="PRICE"/>
</property>
<property name="buyerName" type="string">
<column name="BUYER_NAME"/>
</property>
<property name="mailAddress" type="string">
<column name="MAIL_ADDRESS"/>
</property>
</class>
</hibernate-mapping>
(五)配置Atomikos事务
在Spring的IoC容器中,我们需要配置由Atomikos提供的UserTransaction和TransactionManager,然后再配置Spring的JtaTransactionManager:
<bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp" init-method="init" destroy-method="shutdownForce">
<constructor-arg>
<props>
<prop key="com.atomikos.icatch.service">com.atomikos.icatch.standalone.UserTransactionServiceFactory</prop>
</props>
</constructor-arg>
</bean>
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="userTransactionService">
<property name="forceShutdown" value="false" />
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="userTransactionService">
<property name="transactionTimeout" value="300" />
</bean>
<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="userTransactionService">
<property name="transactionManager" ref="atomikosTransactionManager" />
<property name="userTransaction" ref="atomikosUserTransaction" />
</bean>
<tx:annotation-driven transaction-manager="jtaTransactionManager" />
(六)配置JMS
对于JMS,为了能使ActiveMQ加入到分布式事务中,我们需要配置ActiveMQXAConnectionFactory,而不是ActiveMQConnectionFactory,然后再配置JmsTemplate,此外还需要配置MessageConvertor在Order对象和XML之间互转。
<bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="amqConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init">
<property name="uniqueResourceName" value="XAactiveMQ" />
<property name="xaConnectionFactory" ref="jmsXaConnectionFactory" />
<property name="poolSize" value="5"/>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="amqConnectionFactory"/>
<property name="receiveTimeout" value="2000" />
<property name="defaultDestination" ref="orderQueue"/>
<property name="sessionTransacted" value="true" />
<property name="messageConverter" ref="oxmMessageConverter"/>
</bean>
<bean id="orderQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="ORDER.QUEUE"/>
</bean>
<bean id="oxmMessageConverter"
class="org.springframework.jms.support.converter.MarshallingMessageConverter">
<property name="marshaller" ref="marshaller"/>
<property name="unmarshaller" ref="marshaller"/>
</bean>
<oxm:jaxb2-marshaller id="marshaller">
<oxm:class-to-be-bound name="davenkin.Order"/>
</oxm:jaxb2-marshaller>
(七)测试
在测试中,我们首先通过(二)中的方法启动ActiveMQ,再调用DefaultOrderService,最后对数据库和QUEUE进行验证:
@Test
public void makeOrder(){
orderService.makeOrder(createOrder());
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
assertEquals(1, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER"));
String dbItemName = jdbcTemplate.queryForObject("SELECT ITEM_NAME FROM USER_ORDER", String.class);
String messageItemName = ((Order) jmsTemplate.receiveAndConvert()).getItemName();
assertEquals(dbItemName, messageItemName);
}
@Test(expected = IllegalArgumentException.class)
public void failToMakeOrder()
{
orderService.makeOrder(null);
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
assertEquals(0, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER"));
assertNull(jmsTemplate.receiveAndConvert());
}
- 浏览: 32192 次
- 性别:
- 来自: 北京
文章分类
最新评论
发表评论
-
String:字符串常量池
2018-12-12 10:56 299作为最基础的引用数据类型,Java 设计者为 String ... -
Java开发中的23种设计模式详解(转)
2016-03-03 17:08 612设计模式(Design Patterns) ... -
数据库分库分表(sharding)系列(四) 多数据源的事务处理
2015-06-09 00:10 519系统经sharding改造之后,原来单一的数据库会演变成多个 ... -
数据库分库分表(sharding)系列(三) 关于使用框架还是自主开发以及sharding实现层面的考量
2015-04-14 11:05 442当团队对系统业务和数据库进行了细致的梳理,确定了切分方案后 ... -
【转】反应器(Reactor)模式
2015-01-03 22:03 359Java NIO非堵塞技术实际是采取反应器模式,或者说是观 ... -
Java NIO使用及原理分析 (四) (转)
2015-01-03 14:39 718转载自:李会军•宁静致远 在上一篇文章中介绍了关于缓 ... -
Java NIO使用及原理分析(三) (转)
2015-01-03 14:37 570转载自:李会军•宁静致远 在上一篇文章中介绍了缓冲区 ... -
Java NIO使用及原理分析(二)(转)
2015-01-03 14:33 510转载自:李会军• ... -
Java NIO使用及原理分析 (一) (转)
2015-01-03 14:31 500转载自:李会军•宁静致远 最近由于工作关系要做 ... -
JTA 深度历险 - 原理与实现(非原创)
2014-11-30 00:03 481转自:http://www.ibm.com/develop ... -
JavaBeans、EJB和POJO详解
2014-11-03 12:26 597转自:http://developer.51cto.com/ ...
相关推荐
java分布式事务demo
另一方面提供了进一步学习这些知识点的参考资料,希望能给想掌握编写分布式Java应用知识点的开发人员提供一定的帮助以及指引,同时也希望书中分享的经验对于目前正在从事分布式Java应用编写的开发人员提供帮助。
《分布式Java应用基础与实践》pdf电子版,
另一方面提供了进一步学习这些知识点的参考资料,希望能给想掌握编写分布式Java应用知识点的开发人员提供一定的帮助以及指引,同时也希望书中分享的经验对于目前正在从事分布式Java应用编写的开发人员提供帮助。
分布式Java应用基础与实践pdf高清扫描版,解压即可使用,欢迎下载...........................................
本书介绍了编写分布式Java应用涉及的众多知识点,分为了基于Java实现网络通信、RPC;基于SOA实现大型分布式Java应用;编写高性能Java应用;构建高可用、可伸缩的系统四个部分
分布式java应用完整版分布式java应用分布式java应用
Java分布式应用学习笔记01分布式Java应用和SOA
Java分布式学习笔记01分布式Java应用
《分布式Java》高清版+书签 《分布式Java》高清版+书签
以上是百度百科的解释,简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。本质上来说,分布式事务...
自己封装redisson方法,同时通过注解的方式加入redis分布式事务锁,可靠。
分布式Java应用:基础与实践
分布式java应用_林昊_电子版的
分布式事务介绍
先向tb1插一条数据,然后在向tb2插一条数据,当没有设置事务时,如果tb2出现异常,tb1能正常插入数据,当设置了分布式事务后,如果tb2出现异常,tb1会自动回滚,没有数据插入。 分布式事务是针对不同数据库的(当然...
(开发入门)分布式java基础与实践 (开发入门)分布式java基础与实践 (开发入门)分布式java基础与实践 (开发入门)分布式java基础与实践
分布式事务是一个绕不过去的挑战!微服务架构本质上就是分布式服务化架构,微服务架构的流行,让分布式事务问题日益突出!尤其是在订单业务、资金业务等系统核心业务流程中,一定要有可靠的分布式事务解决方案来保证...