文件异步Excel生成以及相关功能实现

文件异步Excel生成以及相关功能实现

以下代码都是直接在markdown上编辑的,均未通过编译,请根据逻辑自行编写代码

1. Excel文件异步生成

这个功能比较好实现,首先设计一下接口


    /**
     * 文件生成 - 根据数据查询条件导出数据
     * @param infoEntity
     * @return
     */
    @PostMapping("/fileDataExportGen")
    public Result fileDataExportGen(@RequestBody InfoEntity infoEntity){
        String fileId = this.fileService.fileDataExportGen(infoEntity);
        return Result.ok().put("body", fileId);
    }

    
    /**
     * 文件生成状态查询
     * @param fileId
     * @return
     */
    @PostMapping("/fileDataExportStatus")
    public Result fileDataExportStatus(@RequestBody @NotNull(message = "文件标识不能为空")  String fileId){
        String status = this.fileService.fileDataExportStatus(fileId);
        return Result.ok().put("body", status);
    }
    

    /**
     * 文件下载
     * @param fileId
     * @return
     */
    @GetMapping("/fileDownload")
    public void fileDownload(@RequestParam @NotNull(message = "文件标识不能为空") String fileId, HttpServletResponse response) {
        fileService.fileDownload(fileId, response);
    }

这样可以通过 fileDataExportGen 接口申请通知后端生成文件,通过 fileDataExportStatus 查看文件生成情况,并通过 fileDownload 接口下载文件

Snipaste_2025-03-04_15-09-14

接下来省略service中fileDataExportStatus方法和fileDownload方法,在service中只需要定义一个Async方法,然后通过fileDataExportGen方法内调用即可(需要在Application类中添加@EnableAsync)


    @Value("${demo.export.basePath:/home/demo/exportFile}")
    private String basePath;

    /**
     * 生成文件
     */
    public void fileDataExportGen(InfoEntity infoEntity){
            InfoService bean = context.getBean(InfoService.class);
            bean.startFileGenerate(infoEntity);
    }

    /**
     * 异步方法 生成文件
     */
    @Override
    @Async
    public void startFileGenerate(InfoEntity infoEntity) throws Exception {
        List<JSONObject> list = dao.select(infoEntity);
        List<HeadEntity> headers = dao.selectHeader();

        String sheetName = "data";

        try(Workbook workbook = new XSSFWorkbook()){

            // 写入标题和数据
            PoiUtil.createSheetByHeaderAndData(workbook, sheetName, headers, list);

            // file存在本地或者对象存储
            File file = PoiUtil.writeFile(workbook, basePath);
            // TODO 对象存储

        } catch(Exception e){
            // throw
        }


        // TODO 状态更新
    }

简单说明一下createSheetByHeaderAndData方法

        public static void createSheetByHeaderAndData(Workbook workbook, String sheetName, List<HeadEntity> headers, List<JSONObject> list){

            // 为空则默认赋值
            if(Obejcts.isNull(workbook)){
                workbook = new XSSFWorkbook();
            }

            Sheet sheet = workbook.getSheet(sheetName);
            if(!Obejcts.isNull(workbook)){
                // throw 指定名称的sheet页已经存在
            }
            sheet = workbook.createSheet(sheetName);

            int lastRow = writeHeader(workbook, sheet, header);
            writeData(workbook, sheet, header, list, lastRow);

        }

这样一个比较简单的异步生成文件和下载的功能就实现了

2. 进度条实现(减少请求开销)

用户多次请求状态导致开销过大,于是考虑使用sse对生成过程进行实时反馈。

简单说明:sse类似于WebSocket,都是建立浏览器与服务器之间的通信渠道。WebSocket开销大,但是可以全双工通信,sse开销较小,只能服务器向浏览器推送信息。本质上就是以流信息的方式,完成一次用时很长的下载。

Controller:



    /**
     * 文件生成 - 根据数据查询条件导出数据
     * @param infoEntity
     * @return
     */
    @PostMapping("/fileDataExportGen")
    public SseEmitter fileDataExportGen(@RequestBody InfoEntity infoEntity){
        // 返回SseEmitter实体
        SseEmitter sseEmitter = this.fileService.fileDataExportGen(infoEntity);
        return sseEmitter;
    }

Service:


    private final static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    /**
     * 生成文件
     */
    public SseEmitter fileDataExportGen(InfoEntity infoEntity){

        // 生成实体,并创建唯一标识
        String sseCode = UUID.randomUUID();
        SseEmitter sseEmitter = new SseEmitter(60000L);

        // 注册回调
        sseEmitter.onCompletion(completionCallBack(sseCode));
        sseEmitter.onError(errorCallBack(sseCode));
        sseEmitter.onTimeout(timeoutCallBack(sseCode));
        sseEmitterMap.put(sseCode, sseEmitter);

        // 异步调用 startFileGenerate 方法
        InfoService bean = context.getBean(InfoService.class);
        bean.startFileGenerate(infoEntity, sseCode);

        // 返回SseEmitter实体
        return sseEmitter;
    }

    
    /**
     * 异步方法 生成文件
     */
    @Override
    @Async
    public void startFileGenerate(InfoEntity infoEntity, String sseCode) throws Exception {

        SseEmitter sseEmitter = sseEmitterMap.get(sseCode);

        String sheetName = "data";
        try(Workbook workbook = new XSSFWorkbook()){

            // 发送给前端 如有需要可以修改传送的结构
            sseEmitter.send(sseCode);
            sseEmitter.send("开始");

            // 查询数据信息和标题信息
            List<JSONObject> list = dao.select(infoEntity);
            List<HeadEntity> headers = dao.selectHeader();
            
            // 发送数据已经查询成功
            sseEmitter.send("查询成功");
            sseEmitter.send("共查询到" + list.size() + "条数据");

            // 创建sheet
            sheet = workbook.createSheet(sheetName);

            // 写入表头
            int lastRow = PoiUtil.writeHeader(workbook, sheet, header);

            // 每写入100个,返回一次进度值
            for (int i = 0; i < size; i += batchSize) {
                List<JSONObject> batch = list.subList(i, endIndex);
                PoiUtil.writeData(workbook, sheet, header, batch, i + lastRow);
                sseEmitter.send("已写入" + i + "行数据");
            }

            // file存在本地或者对象存储
            File file = PoiUtil.writeFile(workbook, basePath);
            sseEmitter.send("写入完成");

            // TODO 对象存储

        } catch(Exception e){
            // throw
        } finally{
            sseEmitter.complete();
            sseEmitterMap.remove(sseCode);
        }


        // TODO 状态更新
    }

这样可以直接通过sse返回进度,并让前端页面在获取信息后打印信息或者制作进度条以通知用户。当进度结束后前端可以直接提供按钮来访问 fileDownload 接口下载文件

从下一章开始不在详细阐述sse使用细节

3. SSE断联后重连机制

当因为用户误操作或者产品需求,需要在文件下载中拥有断联后重连机制。这个逻辑比较好实现,因为SSE是默认支持重连操作的。

但是还有个两个问题所在

  1. 按照第二章的代码分析,当用户断联的这段时间内,服务端发送的信息在客户端是无法捕获的,也就是说如果断联后发送的信息会丢失。

  2. 当用户断联后,服务端的SSE实例在完成,超时,报错时都会被注销,那么在注销后重连会获取不到任何信息

针对上述两个问题,生成对应方案:

版本号定义防止信息丢失

在每次发送信息时定义一个版本号,并临时储存版本号和发送的信息。版本号定义为:第一次版本号为0,此后每次信息发送时版本号原子性加一。

这样前端在获取到服务端发送的信息后将主要的sseCode和version存到localStorage中,当刷新页面后如果获取到了相关信息,则访问重连接口,重连接口会根据version重新发送之后的所有信息。

Controller:

    
    /**
     * 文件生成 - 根据数据查询条件导出数据
     * @param sseCode
     */
    @GetMapping("/reConnect")
    public SseEmitter reConnect(@RequestParam String sseCode){
        SseEmitter sseEmitter = this.fileService.reConnect(sseCode);
        return sseEmitter;
    }

Service:

    
    // 模拟存储每个客户端的最新版本号
    private Map<String, Integer> clientVersions = new HashMap<>();
    // 模拟存储所有信息
    private List<String> allMessages = new ArrayList<>();

    /**
     * 文件生成 - 根据数据查询条件导出数据
     */
    public SseEmitter reConnect(String sseCode){
        SseEmitter sseEmitter = SseEmitterUtil.get(sseCode);

        // 获取客户端的最后版本号,如果不存在则默认为 0
        int lastVersion = clientVersions.getOrDefault(sseCode, 0);

        // 获取断联后丢失的信息
        List<String> lostMessages = getLostMessages(lastVersion);

        // 发送丢失的信息
        sendMessages(sseEmitter, lostMessages);

        // 更新客户端的版本号
        clientVersions.put(sseCode, allMessages.size());

        // 处理 SseEmitter 的完成、超时和错误事件
        sseEmitter.onCompletion(completionCallBack(sseCode));
        sseEmitter.onError(errorCallBack(sseCode));
        sseEmitter.onTimeout(timeoutCallBack(sseCode));
        sseEmitterMap.put(sseCode, sseEmitter);

        return sseEmitter;
    }

可以将数据存入redis中并设置超时时间,在超时时间内都可以获取信息。

4. 数据量过大的处理方式(流式查询 + excel刷盘写入)

如果遇到数据量过大的情况(假设百万条以上的数据),那要考虑的问题就比较多

  1. 不能全量读取所有数据到内存中,这样会导致内存溢出。

  2. 不能通过数据库分页功能实现部分读取,因为这种操作在插入/更新/删除数据操作时会影响到结果集的查询

  3. 写入excel时需要做临时文件处理,这样也是为了防止内存溢出

  4. 数据量大导致导出时间过长,需要多线程处理

于是通过特殊的流式查询加刷盘已达到资源占用量小,还可以保证数据输出的完整性

流式查询

通过ResultHandler ,将查询结果以迭代器形式逐行返回,而非一次性加载到内存

实现步骤如下:

Mapper:

    // 具体sql在xml中实现,fetchSize代表从数据库请求一次返回多少条数据,数据量越大,占用内存越多,但是请求的开销越少
    @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = 1000)
    void selectByResultHandler(ResultHandler<InfoEntity> handler);

自定义 ResultHandler:

public class DataHandler implements ResultHandler<InfoEntity> {
    
    
    private static final int BATCH_SIZE = 100;

    private List<InfoEntity> buffer = new ArrayList<>();

    private final Workbook workbook;
    private int lastRow;
    private String basePath;
    private String sseCode;

    
    // 构造函数接收 Workbook 对象
    public ExcelResultHandler(Workbook workbook, int lastRow, String basePath, String sseCode) {
        this.workbook = workbook;
        this.lastRow = lastRow;
        this.basePath = basePath;
        this.sseCode = sseCode;

    }

    @Override
    public void handleResult(ResultContext<? extends InfoEntity> resultContext) {
        InfoEntity infoEntity = resultContext.getResultObject();

        buffer.add(infoEntity);

        if (buffer.size() >= BATCH_SIZE) {
            processBatch(buffer); // 执行批量操作
            buffer.clear();
            SseEmitter.send(sseCode, lastRow);
        }
    }

    // 处理剩余数据(最后一次可能不足 BATCH_SIZE)
    public void finalize() {
        if (!buffer.isEmpty()) {
            processBatch(buffer);
        }
        
        File file = PoiUtil.writeFile(workbook, basePath);
    }

    private void processBatch(List<InfoEntity> infoEntity) {
        PoiUtil.writeData(workbook, sheet, header, batch, lastRow);
        lastRow += infoEntity.size();
    }
}

最后在Service上调用


    @Value("${demo.export.basePath:/home/demo/exportFile}")
    private String basePath;

    public void exportToExcel(String sseCode, HttpServletResponse response) {
        response.setHeader("Content-Disposition", "attachment;filename=data.xlsx");
        try (SXSSFWorkbook workbook = new SXSSFWorkbook(100)) {

            // 创建sheet
            sheet = workbook.createSheet(sheetName);

            // 写入表头
            int lastRow = PoiUtil.writeHeader(workbook, sheet, header);

            // 创建handler
            ResultHandler<InfoEntity> handler = new DataHandler<>(workbook, lastRow, basePath, sseCode);

            // 开始查询
            infoMapper.selectByResultHandler(handler);
        }
    }

如此完成流式查询。

异步刷盘分块写入

Apache POI 的 SXSSFWorkbook,实现 Excel 流式写入,其底层机制是 ​内存缓存+磁盘临时文件。

例如设置 rowAccessWindowSize=100 表示仅保留 100 行数据在内存中,超出部分自动写入临时文件,显著降低内存占用。

其实在上面的代码中,已经使用了 SXSSFWorkbook ,并设定了rowAccessWindowSize,这样 SXSSFWorkbook 就可以实现自动刷盘。

如果想做到手动刷盘可以这么做


    SXSSFWorkbook workbook = new SXSSFWorkbook(100);
    Sheet sheet = workbook.createSheet("Data");

    // 逐行读取并写入
    int rowNum = 0;
    Iterator<Data> iterator = cursor.iterator();
    while (iterator.hasNext()) {
        Data data = iterator.next();
        Row row = sheet.createRow(rowNum++);
        row.createCell(0).setCellValue(data.getId());
        row.createCell(1).setCellValue(data.getName());

        // 每写入 1000 条刷盘一次
        if (rowNum % 1000 == 0) {
            ((SXSSFSheet) sheet).flushRows(1000);  // 强制刷盘
            workbook.write(tempOutputStream);      // 写入临时文件
            if(rowNum % 200000){
                sheet = workbook.createSheet("Data_" + (rowNum / 200000)); // 分 Sheet 写入(可选)
            }
        }
    }

    // 最终刷盘并保存文件
    workbook.write(finalOutputStream);
    workbook.dispose();  // 清理临时文件

此方法可以同时做到分sheet页,保证一个sheet页存储20万条数据

多线程写入文件

实际上 SXSSFWorkbook 本身并不是线程安全的类,ResultHandler默认也是在单线程中被调用。异步生成文件的目的就是让用户对长时间的数据处理减少反感,所以在需求不强烈的情况下还是建议单线程处理。如果想要做到多线程可以用以下两个办法:

  1. 在ResultHandler内启用线程将数据插入SXSSFWorkbook,同时对SXSSFWorkbook做好同步机制,这样在操作SXSSFWorkbook时可以继续对数据进行读取。

  2. 使用阻塞队列解耦数据读取与写入,ResultHandler将数据批次放入队列,独立线程从队列取出批次并写入Excel。

5. 文件定期删除(redis超时过期 + 定时清理)

如果是将文件存入对象存储中,那只需要设置过期时间即可,但是如果是存在本地,那需要方法来定期删除。

比较保险的方法就是通过 redis超时过期 + Scheduled定时清理 ,已达到文件的清理逻辑完整性

redis超时过期

实现 redis超时过期 ,在文件生成结束后将文件属性等信息存入redis中并设置过期时间,到期后会自动调用 onMessage 方法来清理文件。


    @Slf4j
    @Component
    public class FileGenExpirationListener extends KeyExpirationEventMessageListener {

        @Value("${demo.export.basePath:/home/demo/exportFile")
        private String basePath ;

        @Value("${demo.export.redisPrefix:file}")
        private String redisPrefix;

        @Value("${demo.export.redisExpireSeconds:300}")
        private int redisExpireSeconds;

        @Autowired
        public FileGenExpirationListener(RedisMessageListenerContainer listenerContainer) {
            super(listenerContainer);
        }

        /**
         * 生成文件过期删除
         * @param message
         * @param pattern
         */
        @Override
        public void onMessage(Message message, byte[] pattern) {
            String expiredKey = message.toString();
            // 假设 key 的格式为 "file:文件路径"
            if (expiredKey.startsWith(redisPrefix)) {
                String filePath =  basePath + "/" + expiredKey.replace(redisPrefix + ":", "") + ".xlsx";
                deleteFile(filePath);
            }
        }

        /**
         * 删除文件
         * @param path
         */
        private void deleteFile(String path) {
            deleteFile(new File(path));
        }


        /**
         * 删除文件
         * @param file
         */
        private void deleteFile(File file) {
            if (file.exists() && file.isFile()) {
                if (file.delete()) {
                    log.info("生成文件过期删除成功:{}", file.getAbsolutePath());
                } else {
                    log.error("生成文件过期删除失败:{}", file.getAbsolutePath());
                }
            }
        }
    }

需要注意的是redis需要支持键空间通知功能

    notify-keyspace-events Ex

定时清理

使用Scheduled注解, 每30分钟扫描一次指定basePath路径下所有超过指定时间的.xlsx文件,这样可以删除当服务因某些原因停止运行导致无法通过redis到期机制删除的文件。


    /**
     * 定时查看文件生成路径下的文件是否超时失效,并对失效文件删除
     */
    @Scheduled(fixedRate = 30 * 60 * 1000)
    public void checkExpiredFiles() {
        try {
            Path dirPath = Paths.get(basePath);
            if (!Files.exists(dirPath) || !Files.isDirectory(dirPath)) {
                System.err.println("目录不存在或不是有效文件夹: " + basePath);
                return;
            }

            Files.walk(dirPath)
                    .filter(Files::isRegularFile)
                    .forEach(file -> {
                        try {
                            BasicFileAttributes attrs = Files.readAttributes(file, BasicFileAttributes.class);
                            LocalDateTime lastModifiedTime = LocalDateTime.ofInstant(
                                    attrs.lastModifiedTime().toInstant(),
                                    ZoneId.systemDefault()
                            );

                            LocalDateTime cutoffTime = LocalDateTime.now().minusSeconds(redisExpireSeconds);
                            String name = file.getFileName().toString();
                            String extension = name.substring(name.lastIndexOf("."));
                            if (lastModifiedTime.isBefore(cutoffTime) && BusConstant.EXCEL_EXT_XLSX.equalsIgnoreCase(extension)) {
                                deleteFile(file.toFile());
                            }
                        } catch (IOException e) {
                            // throw
                        }
                    });
        } catch (IOException e) {
            // throw
        }
    }
# java 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×