最终实现目标是当数据源发生变化,切换数据库时,无需重启程序,只需通过一定方式(如调用接口或监听配置中心消息)将新数据源配置传入,程序即可自动热切换数据源。核心思路为封装一个自定义数据源,维护一个真实数据源的引用,更新数据源即为更新此引用。

动态数据源封装

首先要做的就是封装一个满足可动态更新的数据源,来替代默认数据源。要自定义数据源,只需实现DataSource接口即可,要实现可动态更新,只需提供一个函数,可以接受新的实际数据源对象,更新自身引用,定义如下:

package top.wteng.dds.dsintegration;

import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;

import javax.sql.DataSource;

public class DynamicDataSource implements DataSource {
    // 原子引用,可保证多线程下引用的一致性,更新数据源即为更新此引用
    private AtomicReference<DataSource> dReference;

    // 构造函数
    public DynamicDataSource(DataSource dataSource) {
        this.dReference = new AtomicReference<DataSource>(dataSource);
    }

    // 设置新数据源并返回旧数据源
    public DataSource setAndGetDataSource(DataSource dataSource) {
        return this.dReference.getAndSet(dataSource);
    }

    /** 以下实现各接口方法,通过get方法获取引用的实际数据源对象 */

    @Override
    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
        return dReference.get().getParentLogger();
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return dReference.get().unwrap(iface);
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return dReference.get().isWrapperFor(iface);
    }

    @Override
    public Connection getConnection() throws SQLException {
        return dReference.get().getConnection();
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        return dReference.get().getConnection(username, password);
    }

    @Override
    public PrintWriter getLogWriter() throws SQLException {
        return dReference.get().getLogWriter();
    }

    @Override
    public void setLogWriter(PrintWriter out) throws SQLException {
        dReference.get().setLogWriter(out);
    }

    @Override
    public void setLoginTimeout(int seconds) throws SQLException {
        dReference.get().setLoginTimeout(seconds);
    }

    @Override
    public int getLoginTimeout() throws SQLException {
        return dReference.get().getLoginTimeout();
    }

}

数据源管理类封装

然后封装一个数据源管理类,提供创建与关闭数据源的操作。创建的数据源类型这里采用阿里的druid,需要引入其依赖:

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>1.2.6</version>
</dependency>

根据数据源配置的bean中的配置信息创建新数据源,关闭数据源是放在单独的线程池中进行,如下:

package top.wteng.dds.dsintegration;

import java.sql.Connection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.sql.DataSource;

import com.alibaba.druid.pool.DruidDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.stereotype.Component;

@Component
public class DataSourceManager {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final ExecutorService executorService = Executors.newSingleThreadExecutor(); // 用于关闭数据源的任务队列
    private final int MAX_SHUTDOWN_RETRY_TIMES = 20; // 关闭数据源时的最大重试次数
    
    private final DataSourceProperties dataSourceProperties; // 注入数据源配置(spring.datasource.*)

    public DataSourceManager(@Autowired DataSourceProperties dataSourceProperties) {
        this.dataSourceProperties = dataSourceProperties;
    }

    public DruidDataSource createDataSource() {
        // 创建数据源
        DruidDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(DruidDataSource.class).build();
        return dataSource;
    }

    public DruidDataSource createAndTestDataSource() {
        DruidDataSource dataSource = this.createDataSource();
        try {
            // 测试能否正确打开连接
            Connection connection = dataSource.getConnection();
            connection.close(); // 关闭连接
        } catch (Exception e) {
            this.logger.error("data source test failed ...");
        }
        return dataSource;
    }
    
    // 关闭数据源
    public void shutdownDataSource(DataSource dataSource) {
        DruidDataSource oldDs = (DruidDataSource) dataSource;
        executorService.execute(() -> {
            int retyTimes = 0;
            while (retyTimes < MAX_SHUTDOWN_RETRY_TIMES) {
                try {
                    if (oldDs.getActiveCount() > 0) {
                        // 若依然存在活动连接,也等待两秒重试,已防止中断还未完成的业务
                        retyTimes += 1;
                        if (retyTimes < MAX_SHUTDOWN_RETRY_TIMES) {
                            logger.warn("datasource {} has actived connections, will retry after 2 seconds ...", oldDs.getName());
                            try {
                                Thread.sleep(2 * 1000);
                            } catch (InterruptedException e1) {
                                e1.printStackTrace();
                            }
                            continue;
                        } else {
                            logger.warn("datasource {} will be force shutdown, because max retry times reached but there are still live connections  ...", oldDs.getName());
                        }
                    }
                    oldDs.close();
                    logger.warn("datasource {} with url {} has been closed  ...", oldDs.getName(), oldDs.getUrl());
                    return;

                } catch (Exception e) {
                    retyTimes ++;
                    if (retyTimes < MAX_SHUTDOWN_RETRY_TIMES) {
                        logger.warn("shutdown datasource {} failed, will retry after 2 seconds ...", oldDs.getName());
                        try {
                            Thread.sleep(2 * 1000);
                        } catch (InterruptedException e1) {
                            e1.printStackTrace();
                        }
                    } else {
                        logger.warn("shutdown datasource {} errored, max retry times reached ...", oldDs.getName());
                    }
                }
            }
        });

    }
}

数据源更新类封装

数据源与数据源创建策略都有了,下面便是实现数据源更新的逻辑,数据源更新即接收新数据源的配置,首先更新数据源配置的bean(DataSourceProperties),然后创建新的数据源,并更新动态数据源的引用到新数据源,最后关闭旧数据源,在需要操作更新数据源的地方,注入该类并调用updateDataSource即可实现数据源的动态更新。如下:

package top.wteng.dds.dsintegration;

import java.util.Map;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.stereotype.Component;

@Component
public class DataSourceUpdater {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired private DataSourceProperties dataSourceProperties;
    @Autowired private DataSourceManager dataSourceManager;
    @Autowired private DynamicDataSource dynamicDataSource;

    public synchronized void updateDataSource(Map<String, String> newProperties) throws Exception {
        try {
            // 先更新数据源配置,这里简单更新四个配置属性,可以根据需求增加其它配置项的更新
            newProperties.forEach((k, v) -> {
                switch (k) {
                    case "username": dataSourceProperties.setUsername(v); break;
                    case "password": dataSourceProperties.setPassword(v); break;
                    case "driver": dataSourceProperties.setDriverClassName(v); break;
                    case "url": dataSourceProperties.setUrl(v); break;
                }
            });
            // 创建新的数据源
            DataSource newDs = dataSourceManager.createAndTestDataSource();
            // 设置新的数据源并关闭旧数据源
            this.dataSourceManager.shutdownDataSource(dynamicDataSource.setAndGetDataSource(newDs));
        } catch (Exception e) {
            logger.error("update data source failed");
            throw(e);
        }
    }
}

数据源配置

最后编写配置类配置数据源,告诉Spring使用我们自定义的数据源,如下:

package top.wteng.dds.configuration;

import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import top.wteng.dds.dsintegration.DataSourceManager;
import top.wteng.dds.dsintegration.DynamicDataSource;

@Configuration
public class DataSourceConfiguration {

    @Bean
    public DynamicDataSource dataSource(@Autowired DataSourceManager dManager) {
        // 指定自定义数据源,替换默认的数据源bean
        DataSource dataSource = dManager.createAndTestDataSource();
        return new DynamicDataSource(dataSource);
    }
}

至此,数据源相关工作就都完成了,dao层实现无任何变化,无论是使用jpa还是mybatis等,不再赘述。

测试

首先在spring配置文件中配置初始数据源信息:

spring:
  datasource: # 数据源配置
    url: jdbc:mysql://localhost:3306/ds0?useSSL=false
    type: com.alibaba.druid.pool.DruidDataSource # 指定数据源类型
    username: root
    password: root
    druid:
      initial-size: 2
      max-active: 10
  jpa: # jpa配置
    generate-ddl: true
    show-sql: false
    hibernate:
      ddl-auto: none

创建两个数据库ds0ds1并创建t_user表,各插入一条数据,ds0t_user中有用户u1_ds0ds1t_user中有用户u2_ds1如下:

CREATE DATABASE IF NOT EXISTS `ds0` CHARACTER SET utf8;

CREATE TABLE IF NOT EXISTS `ds0`.`t_user`  (
  `id` int(0) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

insert into ds0.t_user(`name`, `password`) values('u1_ds0', 'root');

CREATE DATABASE IF NOT EXISTS `ds1` CHARACTER SET utf8;

CREATE TABLE IF NOT EXISTS `ds1`.`t_user`  (
  `id` int(0) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
insert into ds1.t_user(`name`, `password`) values('u2_ds1', 'root');

编写一个简单的单元测试函数,测试数据源切换,如下:

package top.wteng.dds;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import top.wteng.dds.dsintegration.DataSourceUpdater;
import top.wteng.dds.entity.User;
import top.wteng.dds.service.UserService;

@SpringBootTest
class DynamicDataSourceIntegrationApplicationTests {
    private final Logger logger = LoggerFactory.getLogger("Test");

    @Autowired private DataSourceUpdater dataSourceUpdater;
    @Autowired private UserService userService;

    @Test
    void dynamicUpdateDataSource() throws Exception {
        // 初始数据源是application.yaml中配置的ds0
        logger.info(" === users from ds0 === ");
        List<User> usersFromDs0 = this.userService.findAll();
        usersFromDs0.forEach(u -> System.out.println(u.toString()));

        // 切换数据源,这里简单切换到另一数据库ds1
        Map<String, String> newDsProperties = new HashMap<>();
        newDsProperties.put("url", "jdbc:mysql://localhost:3306/ds1?useSSL=false");
        this.dataSourceUpdater.updateDataSource(newDsProperties);
        logger.info("data source has updated to ds1 ...");

        logger.info(" === users from ds1 === ");
        List<User> usersFromDs1 = this.userService.findAll();
        usersFromDs1.forEach(u -> System.out.println(u.toString()));
    }
}

核心部分输出如下:

INFO 19492 --- [           main] j.LocalContainerEntityManagerFactoryBean : Initialized JPA EntityManagerFactory for persistence unit 'default'
INFO 19492 --- [           main] micDataSourceIntegrationApplicationTests : Started DynamicDataSourceIntegrationApplicationTests in 2.334 seconds (JVM running for 2.94)
INFO 19492 --- [           main] Test                                     :  === users from ds0 === 
{ id='1', name='u1_ds0', password='root'}
INFO 19492 --- [           main] com.alibaba.druid.pool.DruidDataSource   : {dataSource-2} inited
INFO 19492 --- [           main] Test                                     : data source has updated to ds1 ...
INFO 19492 --- [           main] Test                                     :  === users from ds1 === 
INFO 19492 --- [pool-1-thread-1] com.alibaba.druid.pool.DruidDataSource   : {dataSource-1} closing ...
INFO 19492 --- [pool-1-thread-1] com.alibaba.druid.pool.DruidDataSource   : {dataSource-1} closed
WARN 19492 --- [pool-1-thread-1] t.w.dds.dsintegration.DataSourceManager  : datasource DataSource-141154428 with url jdbc:mysql://localhost:3306/ds0?useSSL=false has been closed  ...
{ id='1', name='u2_ds1', password='root'}
INFO 19492 --- [ionShutdownHook] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'