最终实现目标是当数据源发生变化,切换数据库时,无需重启程序,只需通过一定方式(如调用接口或监听配置中心消息)将新数据源配置传入,程序即可自动热切换数据源。核心思路为封装一个自定义数据源,维护一个真实数据源的引用,更新数据源即为更新此引用。
动态数据源封装
首先要做的就是封装一个满足可动态更新的数据源,来替代默认数据源。要自定义数据源,只需实现DataSource
接口即可,要实现可动态更新,只需提供一个函数,可以接受新的实际数据源对象,更新自身引用,定义如下:
package top.wteng.dds.dsintegration;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import javax.sql.DataSource;
public class DynamicDataSource implements DataSource {
// 原子引用,可保证多线程下引用的一致性,更新数据源即为更新此引用
private AtomicReference<DataSource> dReference;
// 构造函数
public DynamicDataSource(DataSource dataSource) {
this.dReference = new AtomicReference<DataSource>(dataSource);
}
// 设置新数据源并返回旧数据源
public DataSource setAndGetDataSource(DataSource dataSource) {
return this.dReference.getAndSet(dataSource);
}
/** 以下实现各接口方法,通过get方法获取引用的实际数据源对象 */
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return dReference.get().getParentLogger();
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return dReference.get().unwrap(iface);
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return dReference.get().isWrapperFor(iface);
}
@Override
public Connection getConnection() throws SQLException {
return dReference.get().getConnection();
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return dReference.get().getConnection(username, password);
}
@Override
public PrintWriter getLogWriter() throws SQLException {
return dReference.get().getLogWriter();
}
@Override
public void setLogWriter(PrintWriter out) throws SQLException {
dReference.get().setLogWriter(out);
}
@Override
public void setLoginTimeout(int seconds) throws SQLException {
dReference.get().setLoginTimeout(seconds);
}
@Override
public int getLoginTimeout() throws SQLException {
return dReference.get().getLoginTimeout();
}
}
数据源管理类封装
然后封装一个数据源管理类,提供创建与关闭数据源的操作。创建的数据源类型这里采用阿里的druid
,需要引入其依赖:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.6</version>
</dependency>
根据数据源配置的bean
中的配置信息创建新数据源,关闭数据源是放在单独的线程池中进行,如下:
package top.wteng.dds.dsintegration;
import java.sql.Connection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.sql.DataSource;
import com.alibaba.druid.pool.DruidDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.stereotype.Component;
@Component
public class DataSourceManager {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final ExecutorService executorService = Executors.newSingleThreadExecutor(); // 用于关闭数据源的任务队列
private final int MAX_SHUTDOWN_RETRY_TIMES = 20; // 关闭数据源时的最大重试次数
private final DataSourceProperties dataSourceProperties; // 注入数据源配置(spring.datasource.*)
public DataSourceManager(@Autowired DataSourceProperties dataSourceProperties) {
this.dataSourceProperties = dataSourceProperties;
}
public DruidDataSource createDataSource() {
// 创建数据源
DruidDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(DruidDataSource.class).build();
return dataSource;
}
public DruidDataSource createAndTestDataSource() {
DruidDataSource dataSource = this.createDataSource();
try {
// 测试能否正确打开连接
Connection connection = dataSource.getConnection();
connection.close(); // 关闭连接
} catch (Exception e) {
this.logger.error("data source test failed ...");
}
return dataSource;
}
// 关闭数据源
public void shutdownDataSource(DataSource dataSource) {
DruidDataSource oldDs = (DruidDataSource) dataSource;
executorService.execute(() -> {
int retyTimes = 0;
while (retyTimes < MAX_SHUTDOWN_RETRY_TIMES) {
try {
if (oldDs.getActiveCount() > 0) {
// 若依然存在活动连接,也等待两秒重试,已防止中断还未完成的业务
retyTimes += 1;
if (retyTimes < MAX_SHUTDOWN_RETRY_TIMES) {
logger.warn("datasource {} has actived connections, will retry after 2 seconds ...", oldDs.getName());
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
continue;
} else {
logger.warn("datasource {} will be force shutdown, because max retry times reached but there are still live connections ...", oldDs.getName());
}
}
oldDs.close();
logger.warn("datasource {} with url {} has been closed ...", oldDs.getName(), oldDs.getUrl());
return;
} catch (Exception e) {
retyTimes ++;
if (retyTimes < MAX_SHUTDOWN_RETRY_TIMES) {
logger.warn("shutdown datasource {} failed, will retry after 2 seconds ...", oldDs.getName());
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
} else {
logger.warn("shutdown datasource {} errored, max retry times reached ...", oldDs.getName());
}
}
}
});
}
}
数据源更新类封装
数据源与数据源创建策略都有了,下面便是实现数据源更新的逻辑,数据源更新即接收新数据源的配置,首先更新数据源配置的bean
(DataSourceProperties
),然后创建新的数据源,并更新动态数据源的引用到新数据源,最后关闭旧数据源,在需要操作更新数据源的地方,注入该类并调用updateDataSource
即可实现数据源的动态更新。如下:
package top.wteng.dds.dsintegration;
import java.util.Map;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.stereotype.Component;
@Component
public class DataSourceUpdater {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired private DataSourceProperties dataSourceProperties;
@Autowired private DataSourceManager dataSourceManager;
@Autowired private DynamicDataSource dynamicDataSource;
public synchronized void updateDataSource(Map<String, String> newProperties) throws Exception {
try {
// 先更新数据源配置,这里简单更新四个配置属性,可以根据需求增加其它配置项的更新
newProperties.forEach((k, v) -> {
switch (k) {
case "username": dataSourceProperties.setUsername(v); break;
case "password": dataSourceProperties.setPassword(v); break;
case "driver": dataSourceProperties.setDriverClassName(v); break;
case "url": dataSourceProperties.setUrl(v); break;
}
});
// 创建新的数据源
DataSource newDs = dataSourceManager.createAndTestDataSource();
// 设置新的数据源并关闭旧数据源
this.dataSourceManager.shutdownDataSource(dynamicDataSource.setAndGetDataSource(newDs));
} catch (Exception e) {
logger.error("update data source failed");
throw(e);
}
}
}
数据源配置
最后编写配置类配置数据源,告诉Spring
使用我们自定义的数据源,如下:
package top.wteng.dds.configuration;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import top.wteng.dds.dsintegration.DataSourceManager;
import top.wteng.dds.dsintegration.DynamicDataSource;
@Configuration
public class DataSourceConfiguration {
@Bean
public DynamicDataSource dataSource(@Autowired DataSourceManager dManager) {
// 指定自定义数据源,替换默认的数据源bean
DataSource dataSource = dManager.createAndTestDataSource();
return new DynamicDataSource(dataSource);
}
}
至此,数据源相关工作就都完成了,dao
层实现无任何变化,无论是使用jpa
还是mybatis
等,不再赘述。
测试
首先在spring
配置文件中配置初始数据源信息:
spring:
datasource: # 数据源配置
url: jdbc:mysql://localhost:3306/ds0?useSSL=false
type: com.alibaba.druid.pool.DruidDataSource # 指定数据源类型
username: root
password: root
druid:
initial-size: 2
max-active: 10
jpa: # jpa配置
generate-ddl: true
show-sql: false
hibernate:
ddl-auto: none
创建两个数据库ds0
、ds1
并创建t_user
表,各插入一条数据,ds0
的t_user
中有用户u1_ds0
,ds1
的t_user
中有用户u2_ds1
如下:
CREATE DATABASE IF NOT EXISTS `ds0` CHARACTER SET utf8;
CREATE TABLE IF NOT EXISTS `ds0`.`t_user` (
`id` int(0) NOT NULL AUTO_INCREMENT,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
insert into ds0.t_user(`name`, `password`) values('u1_ds0', 'root');
CREATE DATABASE IF NOT EXISTS `ds1` CHARACTER SET utf8;
CREATE TABLE IF NOT EXISTS `ds1`.`t_user` (
`id` int(0) NOT NULL AUTO_INCREMENT,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
insert into ds1.t_user(`name`, `password`) values('u2_ds1', 'root');
编写一个简单的单元测试函数,测试数据源切换,如下:
package top.wteng.dds;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import top.wteng.dds.dsintegration.DataSourceUpdater;
import top.wteng.dds.entity.User;
import top.wteng.dds.service.UserService;
@SpringBootTest
class DynamicDataSourceIntegrationApplicationTests {
private final Logger logger = LoggerFactory.getLogger("Test");
@Autowired private DataSourceUpdater dataSourceUpdater;
@Autowired private UserService userService;
@Test
void dynamicUpdateDataSource() throws Exception {
// 初始数据源是application.yaml中配置的ds0
logger.info(" === users from ds0 === ");
List<User> usersFromDs0 = this.userService.findAll();
usersFromDs0.forEach(u -> System.out.println(u.toString()));
// 切换数据源,这里简单切换到另一数据库ds1
Map<String, String> newDsProperties = new HashMap<>();
newDsProperties.put("url", "jdbc:mysql://localhost:3306/ds1?useSSL=false");
this.dataSourceUpdater.updateDataSource(newDsProperties);
logger.info("data source has updated to ds1 ...");
logger.info(" === users from ds1 === ");
List<User> usersFromDs1 = this.userService.findAll();
usersFromDs1.forEach(u -> System.out.println(u.toString()));
}
}
核心部分输出如下:
INFO 19492 --- [ main] j.LocalContainerEntityManagerFactoryBean : Initialized JPA EntityManagerFactory for persistence unit 'default'
INFO 19492 --- [ main] micDataSourceIntegrationApplicationTests : Started DynamicDataSourceIntegrationApplicationTests in 2.334 seconds (JVM running for 2.94)
INFO 19492 --- [ main] Test : === users from ds0 ===
{ id='1', name='u1_ds0', password='root'}
INFO 19492 --- [ main] com.alibaba.druid.pool.DruidDataSource : {dataSource-2} inited
INFO 19492 --- [ main] Test : data source has updated to ds1 ...
INFO 19492 --- [ main] Test : === users from ds1 ===
INFO 19492 --- [pool-1-thread-1] com.alibaba.druid.pool.DruidDataSource : {dataSource-1} closing ...
INFO 19492 --- [pool-1-thread-1] com.alibaba.druid.pool.DruidDataSource : {dataSource-1} closed
WARN 19492 --- [pool-1-thread-1] t.w.dds.dsintegration.DataSourceManager : datasource DataSource-141154428 with url jdbc:mysql://localhost:3306/ds0?useSSL=false has been closed ...
{ id='1', name='u2_ds1', password='root'}
INFO 19492 --- [ionShutdownHook] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'