海量日志中统计次数最多的100个IP

由于标题长度限制,原题是这样:某系统QPS100万,每十分钟统计一下请求次数最多的100个IP。ip请求写到日志的话,其实就是超大文件中统计top k问题。10分钟6亿条记录,大约是10G级别,所以对于一般单机处理来讲不能一次性加载到内存计算。所以分治算法是处理这类问题的基本思想。

思路

前面说了分治思想。那么具体如何分解问题呢。

思路就是把大文件分割成多个可以内存处理的小文件,对每个小文件统计top k问题,最后再对所有统计结果合并得到最终的top k。

注意,这里的分割并不是随意分割的,那样最终结果显然是不对的,必须保证相同的ip记录都分割到同一个文件。那么hash算法最合适不过了,可以把相同的ip哈希到同一文件。

关于top k问题,效率高的解法是使用构造最小堆或者借助快速排序的思想,复杂度为O(nlogk)。这里更适合用最小堆,具体来说,就是先利用前k个数据构建一个固定大小k的最小堆,对之后的数据,小于堆顶不做处理,大于则替换堆顶并调整。这样,对每个文件顺序处理完之后就得到最终结果,而不需要保留每个文件的top k再归并。

实现

博主偷懒,借助TreeSet代替最小堆来维护top k数据,TreeSet的话底层是借助红黑树排序,比最小堆复杂些,实际上对每个小文件用红黑树全排序再截取前k个。复杂度O(nlogm),这里m是每个小文件中的数量, m>>k。再有时间的话再用最小堆优化一下,复杂度应为O(nlogk)。

ps:已实现最小堆版本,见实现2,并做了对比实验

定时任务使用quartz实现。

下面是代码。

IP类,封装ip计数,使用TreeSet存放须实现comparable接口。注意这里重写compare方法不要return 0,否则会被TreeSet视为相同对象而放不进去。这个可以看一下TreeSet的实现,它实际上内部还是一个TreeMap,只是把对象作为key,而value没有使用。add添加元素时,会调用TreeMap的put方法,put内部又会调用compare方法,如果compare返回结果为0,只是重新setValue,对TreeSet相当于什么也没做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.hellolvs;

import org.apache.commons.lang3.builder.ToStringBuilder;

/**
* IP计数POJO
*
* @author lvs
* @date 2017/12/08.
*/
public class IP implements Comparable<IP> {

private String ip;
private int count;

public IP() {
}

public IP(String ip, int count) {
this.ip = ip;
this.count = count;
}

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}

@Override
public int compareTo(IP o) {
return o.count < this.count ? -1 : 1;
}

@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
}
}

IPCountJob类,定时统计日志文件中top k个ip。

注意其中的分割文件,这里的分割需要对文件边读边写,不能一次性读入内存再分割。guava io的readLines是直接装入内存的,所以不能用。可以使用java原生的io类,或使用commons io的LineIterator更优雅一些。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package com.hellolvs;

import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.base.StandardSystemProperty;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.common.io.LineProcessor;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 定时Job,每十分钟统计请求次数前k的ip
*
* @author lvs
* @date 2017/12/08.
*/
public class IPCountJob implements Job {

private static final Logger LOG = LoggerFactory.getLogger(IPCountJob.class);

private static final String LINE_SEPARATOR = StandardSystemProperty.LINE_SEPARATOR.value();
private static final Charset UTF_8 = Charsets.UTF_8;

private static final String INPUT_PATH = "/home/lvs/logs/ip.log";
private static final String OUTPUT_PATH = "/home/lvs/logs/split/";

private static final int SPLIT_NUM = 1024;
private static final int TOP_K = 100;

/**
* 利用TreeSet存储请求次数前k的IP
*/
private TreeSet<IP> resultSet = Sets.newTreeSet();

/**
* 分割文件用,保存每个文件的写入流对象
*/
private final Map<Integer, BufferedWriter> bufferMap = Maps.newHashMapWithExpectedSize(SPLIT_NUM);

/**
* 定时任务,每十分钟统计请求次数前k的IP
*/
@Override
public void execute(JobExecutionContext jobExecutionContext) {
// 捕获异常,防止定时任务中断
try {
execute();
} catch (Exception e) {
LOG.error("定时任务出错:{}", e.getMessage(), e);
}
}

/**
* 统计大文件中请求次数前k的IP
*
* @throws IOException I/O error
*/
public void execute() throws IOException {
// 这里应该每10分钟获取当前轮替日志文件路径,此处用常量路径模拟
File ipLogFile = new File(INPUT_PATH);

splitLog(ipLogFile, SPLIT_NUM);

File logSplits = new File(OUTPUT_PATH);
for (File logSplit : logSplits.listFiles()) {
countTopK(logSplit, TOP_K);
}

LOG.info("结果集:{}", resultSet.size());
for (IP ip : resultSet) {
LOG.info("{}", ip);
}
}

/**
* 生成模拟日志文件
*
* @param logNum 生成日志条数
* @throws IOException I/O error
*/
public static void generateLog(long logNum) throws IOException {

/* 创建文件 */
File log = new File(INPUT_PATH);
File parentDir = log.getParentFile();
if (!parentDir.exists()) {
parentDir.mkdirs();
}
log.createNewFile();

/* 生成随机ip写入文件 */
SecureRandom random = new SecureRandom();
try (BufferedWriter bw = new BufferedWriter(new FileWriter(log))) {
for (int i = 0; i < logNum; i++) {
StringBuilder sb = new StringBuilder();
sb.append("192.").append(random.nextInt(255)).append(".").append(random.nextInt(255)).append(".")
.append(random.nextInt(255)).append(LINE_SEPARATOR);
bw.write(sb.toString());
}
bw.flush();
}
}

/**
* 分割日志文件
*
* @param logFile 待分割文件
* @param fileNum 分割文件数量
* @throws IOException I/O error
*/
private void splitLog(File logFile, int fileNum) throws IOException {

/* 为每个分割文件创建写入流对象 */
for (int i = 0; i < fileNum; i++) {
File file = new File(OUTPUT_PATH + i);
File parentDir = file.getParentFile();
if (!parentDir.exists()) {
parentDir.mkdirs();
}
bufferMap.put(i, new BufferedWriter(new FileWriter(file)));
}

/* 根据ip的hashcode将数据分割到不同文件中 */
LineIterator it = null;
try {
it = FileUtils.lineIterator(logFile, "UTF-8");
while (it.hasNext()) {
String ip = it.nextLine();
int hashCode = Objects.hashCode(ip);
hashCode = hashCode < 0 ? -hashCode : hashCode;
BufferedWriter writer = bufferMap.get(hashCode % fileNum);
writer.write(ip + LINE_SEPARATOR);
}
} finally {
/* 释放资源 */
LineIterator.closeQuietly(it);
for (Map.Entry<Integer, BufferedWriter> buffer : bufferMap.entrySet()) {
BufferedWriter writer = buffer.getValue();
writer.flush();
writer.close();
}
bufferMap.clear();
}
}

/**
* 统计请求次数前k的IP
*
* @param logSplit 当前分割文件
* @param k top k
* @throws IOException I/O error
*/
private void countTopK(File logSplit, int k) throws IOException {

/* 读取文件对ip计数 */
HashMap<String, AtomicInteger> ipCountMap = Files.readLines(logSplit, UTF_8,
new LineProcessor<HashMap<String, AtomicInteger>>() {
private HashMap<String, AtomicInteger> ipCountMap = Maps.newHashMap();

@Override
public boolean processLine(String line) throws IOException {
AtomicInteger ipCount = ipCountMap.get(line.trim());
if (ipCount != null) {
ipCount.getAndIncrement();
} else {
ipCountMap.put(line.trim(), new AtomicInteger(1));
}
return true;
}

@Override
public HashMap<String, AtomicInteger> getResult() {
return ipCountMap;
}
});

/* 统计结果添加到TreeSet */
for (Map.Entry<String, AtomicInteger> entry : ipCountMap.entrySet()) {
resultSet.add(new IP(entry.getKey(), entry.getValue().get()));
}

/* TreeSet只保留前k个ip */
TreeSet<IP> temp = Sets.newTreeSet();
int i = 0;
for (IP o : resultSet) {
temp.add(o);
i++;
if (i >= k) {
break;
}
}
resultSet = temp;
}

/**
* 返回统计结果
*
* @return 结果集合
*/
public TreeSet<IP> getResult() {
return resultSet;
}
}

Main,定时任务启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.hellolvs;

import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;

/**
* 定时任务启动器
*
* @author lvs
* @date 2017/12/11.
*/
public class Main {
public static void main(String[] args) throws Exception {
// 生成模拟日志文件
IPCountJob.generateLog(600000000);

JobDetail job = JobBuilder.newJob(IPCountJob.class)
.withIdentity("ipCountJob", "group1").build();

Trigger trigger = TriggerBuilder
.newTrigger()
.withIdentity("ipCountTrigger", "group1")
.withSchedule(
SimpleScheduleBuilder.simpleSchedule()
.withIntervalInMinutes(10).repeatForever())
.build();

Scheduler scheduler = new StdSchedulerFactory().getScheduler();
scheduler.start();
scheduler.scheduleJob(job, trigger);
}
}

实现2

IP类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.hellolvs;

import org.apache.commons.lang3.builder.ToStringBuilder;

/**
* IP计数POJO
*
* @author lvs
* @date 2017/12/08.
*/
public class IP implements Comparable<IP> {

private String ip;
private int count;

public IP() {
}

public IP(String ip, int count) {
this.ip = ip;
this.count = count;
}

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}

@Override
public int compareTo(IP o) {
return Integer.compare(this.count, o.count);
}

@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
}
}

IPCountJob类,最小堆版本统计top k

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package com.hellolvs;

import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.base.StandardSystemProperty;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.io.LineProcessor;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 定时Job,每十分钟统计请求次数前k的ip
*
* @author lvs
* @date 2017/12/08.
*/
public class IPCountJob implements Job {

private static final Logger LOG = LoggerFactory.getLogger(IPCountJob.class);

private static final String LINE_SEPARATOR = StandardSystemProperty.LINE_SEPARATOR.value();
private static final Charset UTF_8 = Charsets.UTF_8;

private static final String INPUT_PATH = "/home/lvs/logs/ip.log";
private static final String OUTPUT_PATH = "/home/lvs/logs/split/";

private static final int SPLIT_NUM = 1024;
private static final int TOP_K = 100;

/**
* 利用最小堆结构存储请求次数前k的IP
*/
private List<IP> result = Lists.newArrayListWithExpectedSize(TOP_K);

/**
* 分割文件用,保存每个文件的写入流对象
*/
private final Map<Integer, BufferedWriter> bufferMap = Maps.newHashMapWithExpectedSize(SPLIT_NUM);

/**
* 定时任务,每十分钟统计请求次数前k的IP
*/
@Override
public void execute(JobExecutionContext jobExecutionContext) {
// 捕获异常,防止定时任务中断
try {
execute();
} catch (Exception e) {
LOG.error("定时任务出错:{}", e.getMessage(), e);
}
}

/**
* 统计大文件中请求次数前k的IP
*
* @throws IOException I/O error
*/
public void execute() throws IOException {
// 这里应该每10分钟获取当前轮替日志文件路径,此处用常量路径模拟
File ipLogFile = new File(INPUT_PATH);

splitLog(ipLogFile, SPLIT_NUM);
File logSplits = new File(OUTPUT_PATH);
for (File logSplit : logSplits.listFiles()) {
countTopK(logSplit, TOP_K);
}

MinHeap.sort(result);
LOG.info("结果集:{}", result.size());
for (int i = result.size() - 1; i >= 0; i--) {
LOG.info("{}", result.get(i));
}
}

/**
* 生成模拟日志文件
*
* @param logNum 生成日志条数
* @throws IOException I/O error
*/
public static void generateLog(long logNum) throws IOException {

/* 创建文件 */
File log = new File(INPUT_PATH);
File parentDir = log.getParentFile();
if (!parentDir.exists()) {
parentDir.mkdirs();
}
log.createNewFile();

/* 生成随机ip写入文件 */
SecureRandom random = new SecureRandom();
try (BufferedWriter bw = new BufferedWriter(new FileWriter(log))) {
for (int i = 0; i < logNum; i++) {
StringBuilder sb = new StringBuilder();
sb.append("192.").append(random.nextInt(255)).append(".").append(random.nextInt(255)).append(".")
.append(random.nextInt(255)).append(LINE_SEPARATOR);
bw.write(sb.toString());
}
bw.flush();
}
}

/**
* 分割日志文件
*
* @param logFile 待分割文件
* @param fileNum 分割文件数量
* @throws IOException I/O error
*/
private void splitLog(File logFile, int fileNum) throws IOException {

/* 为每个分割文件创建写入流对象 */
for (int i = 0; i < fileNum; i++) {
File file = new File(OUTPUT_PATH + i);
File parentDir = file.getParentFile();
if (!parentDir.exists()) {
parentDir.mkdirs();
}
bufferMap.put(i, new BufferedWriter(new FileWriter(file)));
}

/* 根据ip的hashcode将数据分割到不同文件中 */
LineIterator it = null;
try {
it = FileUtils.lineIterator(logFile, "UTF-8");
while (it.hasNext()) {
String ip = it.nextLine();
int hashCode = Objects.hashCode(ip);
hashCode = hashCode < 0 ? -hashCode : hashCode;
BufferedWriter writer = bufferMap.get(hashCode % fileNum);
writer.write(ip + LINE_SEPARATOR);
}
} finally {
/* 释放资源 */
LineIterator.closeQuietly(it);
for (Map.Entry<Integer, BufferedWriter> buffer : bufferMap.entrySet()) {
BufferedWriter writer = buffer.getValue();
writer.flush();
writer.close();
}
bufferMap.clear();
}
}

/**
* 统计请求次数前k的IP
*
* @param logSplit 当前分割文件
* @param k top k
* @throws IOException I/O error
*/
private void countTopK(File logSplit, int k) throws IOException {

/* 读取文件对ip计数 */
HashMap<String, AtomicInteger> ipCountMap = Files.readLines(logSplit, UTF_8,
new LineProcessor<HashMap<String, AtomicInteger>>() {
private HashMap<String, AtomicInteger> ipCountMap = Maps.newHashMap();

@Override
public boolean processLine(String line) throws IOException {
AtomicInteger ipCount = ipCountMap.get(line.trim());
if (ipCount != null) {
ipCount.getAndIncrement();
} else {
ipCountMap.put(line.trim(), new AtomicInteger(1));
}
return true;
}

@Override
public HashMap<String, AtomicInteger> getResult() {
return ipCountMap;
}
});

/* 前k条数据用来构建初始最小堆,之后的数据比堆顶大则替换堆顶并调堆 */
for (Map.Entry<String, AtomicInteger> entry : ipCountMap.entrySet()) {
IP ip = new IP(entry.getKey(), entry.getValue().get());
if (result.size() != k) {
result.add(ip);
if (result.size() == k) {
MinHeap.initMinHeap(result);
}
} else {
if (ip.compareTo(result.get(0)) > 0) {
result.set(0, ip);
MinHeap.adjust(result, 0, k);
}
}
}
}

/**
* 返回统计结果
*
* @return 结果集合
*/
public List<IP> getResult() {
return result;
}
}

MinHeap类,最小堆工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.hellolvs;

import java.util.List;

/**
* 最小堆
*
* @author lvs
* @date 2017-12-12
*/
public class MinHeap {

/**
* 对最小堆排序
*
* @param list 已经为最小堆结构的列表
* @param <T> 元素须实现Comparable接口
*/
public static <T extends Comparable<? super T>> void sort(List<T> list) {
for (int i = list.size() - 1; i > 0; i--) {
swap(list, 0, i);
adjust(list, 0, i);
}
}

/**
* 初始化最小堆
*
* @param list 待初始化为最小堆的列表
* @param <T> 元素须实现Comparable接口
*/
public static <T extends Comparable<? super T>> void initMinHeap(List<T> list) {
/* 从最后一个非叶节点开始至根节点依次调整 */
for (int i = list.size() / 2 - 1; i >= 0; i--) {
adjust(list, i, list.size());
}
}

/**
* 调堆
*
* @param list 当前堆
* @param <T> 元素须实现Comparable接口
* @param cur 待调整位置
* @param length 当前堆大小
*/
public static <T extends Comparable<? super T>> void adjust(List<T> list, int cur, int length) {
T tmp = list.get(cur);
for (int i = 2 * cur + 1; i < length; i = 2 * i + 1) {
if (i + 1 < length && list.get(i).compareTo(list.get(i + 1)) > 0) {
i++; // i指向孩子节点中最小的节点
}
if (tmp.compareTo(list.get(i)) > 0) {
list.set(cur, list.get(i)); // 最小孩子节点调整到其父节点
cur = i; // 当前节点置为最小孩子节点,继续调整
} else {
break; // 没有调整时退出循环
}
}
list.set(cur, tmp); // 被调整节点最终存放位置
}

/**
* 交换List中的元素
*
* @param list 待交换列表
* @param i 第一个元素位置
* @param j 第二个元素位置
*/
private static <T extends Comparable<? super T>> void swap(List<T> list, int i, int j) {
T tmp = list.get(i);
list.set(i, list.get(j));
list.set(j, tmp);
}
}

Main类,无改动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.hellolvs;

import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;

/**
* 定时任务启动器
*
* @author lvs
* @date 2017/12/11.
*/
public class Main {
public static void main(String[] args) throws Exception {
// 生成模拟日志文件
IPCountJob.generateLog(600000000);

JobDetail job = JobBuilder.newJob(IPCountJob.class)
.withIdentity("ipCountJob", "group1").build();

Trigger trigger = TriggerBuilder
.newTrigger()
.withIdentity("ipCountTrigger", "group1")
.withSchedule(
SimpleScheduleBuilder.simpleSchedule()
.withIntervalInMinutes(10).repeatForever())
.build();

Scheduler scheduler = new StdSchedulerFactory().getScheduler();
scheduler.start();
scheduler.scheduleJob(job, trigger);
}
}

附一下pom文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.hellolvs</groupId>
<artifactId>ipCount</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>

<properties>
<guava.version>20.0</guava.version>
<commons-lang3.version>3.1</commons-lang3.version>
<commons-io.version>2.4</commons-io.version>
<joda-time.version>2.6</joda-time.version>
<org.quartz-scheduler.version>2.1.7</org.quartz-scheduler.version>
<org.slf4j.version>1.7.5</org.slf4j.version>
<logback.version>1.0.13</logback.version>
<junit.version>4.10</junit.version>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencyManagement>
<dependencies>
<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<!-- commons lang3-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>

<!-- commons io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>

<!-- joda-time -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda-time.version}</version>
</dependency>

<!-- quartz -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>${org.quartz-scheduler.version}</version>
</dependency>

<!-- slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${org.slf4j.version}</version>
</dependency>

<!-- logback -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
<scope>runtime</scope>
</dependency>

<!-- junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit-dep</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<!-- commons lang3-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<!-- commons io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>

<!-- joda-time -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>

<!-- quartz -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
</dependency>

<!-- slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<!-- logback -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>

<!-- junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit-dep</artifactId>
</dependency>
</dependencies>

<build>
<finalName>ROOT</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>

</project>

对比实验

生成了6亿条数据的日志。

TreeSet版本:

1
2
3
4
生成6亿条日志时间:521582
分割文件时间:173219
分割后统计top k时间:195037
定时任务执行时间:368294

注:定时任务执行时间指的是对大文件的总统计时间,主要是分割文件+分割后统计top k。

cpu和堆使用情况:

可以看到堆变化明显分为三阶段:对应了生成日志、分割日志、分割后统计top k。

dump1

最小堆版本:

1
2
3
4
生成6亿条日志时间:513840
分割文件时间:148861
分割后统计top k时间:190966
定时任务执行时间:339870

cpu和堆使用情况:

dump2

总结:

生成日志和分割文件是没有改动的,运行时间不一样,可能有一定误差。

倒是两个版本统计top k时间没有明显的变化,按上面分析O(nlogm)和O(nlogk)应该有比较明显的差距才对,这里n=600000000,m约600000,k=100,各位可以帮忙分析一下效率差距不大的原因。

不过可以看到堆内存使用明显降低了约100MB,因为TreeSet需要添加m个元素再截取k个,而MinHeap只需要添加k个元素。

坚持原创技术分享,您的支持将鼓励我继续创作!
分享到: