博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Java多线程】写入同一文件,自定义线程池与线程回收利用
阅读量:6452 次
发布时间:2019-06-23

本文共 7299 字,大约阅读时间需要 24 分钟。

hot3.png

csv的数据格式。

一个文件好几百兆,1个文件大概200万行左右的数据,现在我要解决的问题是,将 csv的数据读出来,组合数据,生成sql文件。

  

以前单线程跑,跑了一下午才完成,大概跑了几个小时。多线程跑,大概2-3分钟左右,200万条数据,包括过滤。

 

这个场景在平常开发中也是经常要用到的。发出来,希望大家能够指导学习~

优化版地址:

package test.com.linapex.room;import java.io.BufferedReader;import java.io.BufferedWriter;import java.io.File;import java.io.FileOutputStream;import java.io.FileReader;import java.io.OutputStreamWriter;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.locks.ReentrantReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;import com.linapex.common.util.FileUtils;import com.linapex.common.util.ZhengzeValidate;public class TBuilderRoomSqlFileTool{	final static int DATACACHENUM = 10000;	static int currThreadCount = 0;	static int maxThreadCount = 10;	static File roomFilterLogFile = new File("roomFilter.log");	static File sqlFile = new File("roomSql.sql");	static File csvFile = new File("D:\\baiduyundownload\\asd\\2000W\\1-200W.csv");	final static String sqlStrTemplate = "INSERT INTO `t_room_record`(id,name, card, gender, birthday, address, zip, mobile, email, version) VALUES (null,':0', ':1', ':2', ':3', ':4', ':5', ':6', ':7',':8');";	public static BufferedWriter initSQLWrite() throws Exception	{		if (sqlFile.exists())		{			sqlFile.delete();			if (!sqlFile.createNewFile())			{				System.err.println("创建文件失败:" + sqlFile.getAbsolutePath());			}		}		return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(sqlFile, true), "UTF-8"));	}	public static void loadCSV(CallBack2 callBack) throws Exception	{		BufferedReader reader = null;		try		{			reader = new BufferedReader(new FileReader(csvFile));			String str = null;			int num = 0;			while ((str = reader.readLine()) != null)			{				num++;				callBack.call(num, str);			}		} finally		{			reader.close();		}	}	public static void main(String[] args) throws Exception	{		final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreadCount);		final List
> threadResultList = new ArrayList
>(); final WriteSqlHandle writeSqlFile = new WriteSqlHandle(initSQLWrite(), DATACACHENUM); long begin = System.currentTimeMillis(); loadCSV(new CallBack2() { @Override public void call(int num, String str) { String[] strs = str.split(","); if (strs.length < 8) { writeLog("此条数据不录入::0", Arrays.toString(strs)); return; } String name = strs[0].trim(); if (!ZhengzeValidate.isChina(name)) { writeLog("此条数据不录入::0", Arrays.toString(strs)); return; } try { String card = strs[4]; String gender = strs[5]; String birthday = strs[6]; String address = strs[7]; String zip = strs[8]; String mobile = strs[20]; String email = strs[22]; String version = strs[31]; //生成sql语句 final String tempSql = tm(sqlStrTemplate, name, card, gender, birthday, address, zip, mobile, email, version); //添加数据,如果超出了缓存数据,则 开始写入文件系统 if (writeSqlFile.add(tempSql)) { currThreadCount++; //如果提交的线程过多,则取回之后再提交. if (currThreadCount >= maxThreadCount) { System.out.println(String.format("当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount)); for (Future fs : threadResultList) { try { fs.get(); currThreadCount--; System.out.println("已回调线程数:" + (maxThreadCount - currThreadCount)); } catch (Exception e) { e.printStackTrace(); } } threadResultList.clear(); //清空 currThreadCount = threadResultList.size(); System.out.println(String.format("重新开始提交线程   当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount)); } Future future = threadPool.submit(new Runnable() { @Override public void run() { try { writeSqlFile.save(); } catch (Exception e) { e.printStackTrace(); } } }); threadResultList.add(future); // System.out.println(String.format("开启了%s条线程(保存了%s条数据)", curr_thread_count, num)); } } catch (Exception e) { writeLog("录入错误的数据::0", Arrays.toString(strs)); writeLog("错误的原因::0", e.getMessage()); } } }); writeSqlFile.flush(); threadPool.shutdown(); long end = System.currentTimeMillis() - begin; System.out.println(String.format("任务完成时间:%s", end)); } public static void writeLog(String str, Object... values) { FileUtils.doWriteFile(roomFilterLogFile.getAbsolutePath(), tm(str, values) + "\r\n", null, false); } public static String tm(String strSource, Object... values) { if (strSource == null) { return null; } StringBuilder builder = new StringBuilder(strSource); final String prefix = ":"; for (int index = 0; index < values.length; index++) { String value = values[index].toString(); if (value == null) { continue; } String key = new StringBuilder(prefix).append(index).toString(); int i = -1; if ((i = builder.indexOf(key, i)) > -1) { int len = key.length(); builder.replace(i, i + len, value); } } return builder.toString(); }}class WriteSqlHandle{ ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); WriteLock writeLock = readWriteLock.writeLock(); List
 cacheList; BufferedWriter bw; int dataCacheNum; public WriteSqlHandle(BufferedWriter bw) { this.bw = bw; cacheList = new ArrayList
(); } public WriteSqlHandle(BufferedWriter bw, int dataCacheNum) { this.bw = bw; this.dataCacheNum = dataCacheNum; cacheList = new ArrayList
(dataCacheNum); } public boolean add(String sqlStr) { writeLock.lock(); cacheList.add(sqlStr); writeLock.unlock(); return cacheList.size() >= dataCacheNum; } public void save() throws Exception { writeLock.lock(); long begin = System.currentTimeMillis(); System.out.println(String.format("%s,准备消费   需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size())); for (String str : cacheList) { bw.write(str + "\r\n"); } long end = System.currentTimeMillis() - begin; System.out.println(String.format("%s,消费完成,耗费时间:%s ms,消费数据长度:%s", Thread.currentThread().getName(), end, cacheList.size())); cacheList.clear(); //清空数据. writeLock.unlock(); } public void flush() throws Exception { System.out.println(String.format("flush线程:%s, 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size())); for (String str : cacheList) { bw.write(str + "\r\n"); } System.out.println(String.format("flush线程:%s, 消费完成,消费数据长度:%s", Thread.currentThread().getName(), cacheList.size())); cacheList.clear(); //清空数据 closeWrite(); } private void closeWrite() throws Exception { bw.flush(); bw.close(); }}interface CallBack2{ void call(int num, String str);}

如果需要测试代码的朋友,请修改

1、FileUtils.doWriteFile 改成 System.out 输出

2、将数据过滤去掉即可。

输出的日志,经过优化,代码效率从2-3分钟提高了1分钟多点:

pool-1-thread-8,准备消费   需要保存数据的集合长度:0pool-1-thread-8,消费完成,耗费时间:0 ms,消费数据长度:0已回调线程数:8已回调线程数:9pool-1-thread-9,准备消费   需要保存数据的集合长度:0pool-1-thread-9,消费完成,耗费时间:0 ms,消费数据长度:0已回调线程数:10重新开始提交线程   当前线程数:0 允许最大线程数:10 等待线程完成回调.pool-1-thread-10,准备消费   需要保存数据的集合长度:4pool-1-thread-10,消费完成,耗费时间:0 ms,消费数据长度:4flush线程:main, 需要保存数据的集合长度:9102flush线程:main, 消费完成,消费数据长度:9102任务完成时间:104797 ms

不允许转载~~~只能看,不能摸/偷笑

转载于:https://my.oschina.net/linapex/blog/195376

你可能感兴趣的文章
DotNet(C#)自定义运行时窗体设计器 一
查看>>
P2627 修剪草坪[dp][单调队列]
查看>>
JS详细入门教程(上)
查看>>
Android学习笔记21-ImageView获取网络图片
查看>>
线段树分治
查看>>
git代码冲突
查看>>
lnmp1.3 配置pathinfo---thinkphp3.2 亲测有效
查看>>
查看Linux 系统的配置和增减用户/增减组/增减权限
查看>>
利用android studio 生成 JNI需要的动态库so文件
查看>>
poll
查看>>
衡量优秀的卓越的前端工程师
查看>>
解析查询 queryString 请求参数的函数
查看>>
学生选课系统数据存文件
查看>>
flutter进行自动编译操作步骤
查看>>
4.6 直接插入排序法
查看>>
我的毕设总结所用的技术和只是要点 基于stm32F4的AGV嵌入式控制系统的设计
查看>>
盘点国内外那些有野心的BI公司
查看>>
JMeter—断言
查看>>
C++的新类创建:继承与组合
查看>>
m5-第9周作业
查看>>