Java处理大JSON的办法
既然要搞大 JSON 处理那就肯定要找到点大数据量的json ,GitHub Archive 记录了 GitHub 上公开的事件时间线(如代码推送、问题创建、Fork等),并按小时归档。所以这个就是我们需要的数据毕竟世界各地的开源开发人员正在处理数百万个项目:编写代码和文档、修复和提交错误等等(来自官网的吐槽)。
先去 https://www.gharchive.org/ 地址下载对应的数据,如果是 window 不需要这个 wget 直接浏览器打开即可下载
wget https://data.gharchive.org/2015-01-01-15.json.gz
这个数据下下来应该是这样的
{"id":"2489651045","type":"CreateEvent","actor":{"id":665991,"login":"petroav","gravatar_id":"","url":"https://api.github.com/users/petroav","avatar_url":"https://avatars.githubusercontent.com/u/665991?"},"repo":{"id":28688495,"name":"petroav/6.828","url":"https://api.github.com/repos/petroav/6.828"},"payload":{"ref":"master","ref_type":"branch","master_branch":"master","description":"Solution to homework and assignments from MIT's 6.828 (Operating Systems Engineering). Done in my spare time.","pusher_type":"user"},"public":true,"created_at":"2015-01-01T15:00:00Z"}
{"id":"2489651051","type":"PushEvent","actor":{"id":3854017,"login":"rspt","gravatar_id":"","url":"https://api.github.com/users/rspt","avatar_url":"https://avatars.githubusercontent.com/u/3854017?"},"repo":{"id":28671719,"name":"rspt/rspt-theme","url":"https://api.github.com/repos/rspt/rspt-theme"},"payload":{"push_id":536863970,"size":1,"distinct_size":1,"ref":"refs/heads/master","head":"6b089eb4a43f728f0a594388092f480f2ecacfcd","before":"437c03652caa0bc4a7554b18d5c0a394c2f3d326","commits":[{"sha":"6b089eb4a43f728f0a594388092f480f2ecacfcd","author":{"email":"5c682c2d1ec4073e277f9ba9f4bdf07e5794dabe@rspt.ch","name":"rspt"},"message":"Fix main header height on mobile","distinct":true,"url":"https://api.github.com/repos/rspt/rspt-theme/commits/6b089eb4a43f728f0a594388092f480f2ecacfcd"}]},"public":true,"created_at":"2015-01-01T15:00:01Z"}
。。。。
可以看到数据并不是正常理解的一个 [] 里面 抱着一堆 {} ,这个是一行行的。大数据的一般都是这样的这个文件应该属于 jsonl 格式。我使用的是 jackson
加 jdk 17 作为我读取数据的框架,将下载的数据名称改为 data.json 放到 resources 目录中。
先导入需要的 pom
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version> <!-- 使用最新稳定版 -->
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.38</version>
<scope>provided</scope>
</dependency>
</dependencies>
先按照数据创建json数据对应的数据实体
DataEntity.java
package com.runbrick.entity;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Data;
@Data
@JsonIgnoreProperties(ignoreUnknown = true) // 开启忽略数据中不存在的内容
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class) // 一定要配置这个不然没办法自动转化为驼峰命名
public class DataEntity {
String id;
String type;
ActorEntity actor;
RepoEntity repo;
PayloadEntity payload;
String createdAt;
}
ActorEntity.java
package com.runbrick.entity;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Data;
@Data
@JsonIgnoreProperties(ignoreUnknown = true) // 开启忽略数据中不存在的内容
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
public class ActorEntity {
String id;
String login;
String gravatarId;
String url;
String avatarUrl;
}
PayloadEntity.java
package com.runbrick.entity;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Data;
@Data
@JsonIgnoreProperties(ignoreUnknown = true) // 开启忽略数据中不存在的内容
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
public class PayloadEntity {
String ref;
String refType;
String masterBranch;
String description;
String pusherType;
}
RepoEntity.java
package com.runbrick.entity;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Data;
@Data
@JsonIgnoreProperties(ignoreUnknown = true) // 开启忽略数据中不存在的内容
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
public class RepoEntity {
String id;
String name;
String url;
}
在 \src\test\java\com\runbrick
(你在你自己的测试目录创建就行)创建一个测试类
BigJsonTest.java
package com.runbrick;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.runbrick.entity.DataEntity;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
public class BigJsonTest {
@Test
public void test() {
ObjectMapper objectMapper = new ObjectMapper();
ClassLoader classLoader = this.getClass().getClassLoader();
InputStream resource = classLoader.getResourceAsStream("data.json"); // 获取流
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(resource));) {
String line;
long lineCount = 0;
while ((line = bufferedReader.readLine()) != null) {
lineCount++;
try {
// 对当前行字符串进行解析,转换成功了就很简单了。
DataEntity event = objectMapper.readValue(line, DataEntity.class);
// 检查是否是我们关心的事件类型
System.out.println(event);
} catch (IOException e) {
// 如果某一行不是合法的JSON,记录错误并继续处理下一行
System.err.println("Failed to parse JSON on line " + lineCount + ": " + e.getMessage());
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
如果数据处理逻辑比较多的话可以使用连接池+CompletableFuture模式
package com.runbrick;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.runbrick.entity.ActorEntity;
import com.runbrick.entity.DataEntity;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.*;
public class BigJsonTreadTest {
private static final ObjectMapper objectMapper = new ObjectMapper();
@Test
public void test() {
// 1. 创建我们的自定义线程池
ThreadPoolExecutor executor = createThreadPool();
ObjectMapper objectMapper = new ObjectMapper();
ClassLoader classLoader = this.getClass().getClassLoader();
InputStream resource = classLoader.getResourceAsStream("data.json"); // 获取流
ArrayList<CompletableFuture<Optional<DataEntity>>> futures = new ArrayList<>();
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(resource));) {
String line;
long lineCount = 0;
while ((line = bufferedReader.readLine()) != null) {
final String currentLine = line;
// 3. 使用 supplyAsync 提交任务到我们的线程池
CompletableFuture<Optional<DataEntity>> future = CompletableFuture.supplyAsync(() -> {
// 批量获取每一条数据的new DataEntity().getCreatedAt()
try {
DataEntity event = objectMapper.readValue(currentLine, DataEntity.class);
// TimeUnit.MILLISECONDS.sleep(100);
return Optional.of(event);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, executor); // <-- 指定使用我们自己的执行器!
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// 使用 stream 直接拿取获得的数据
futures.stream().map(CompletableFuture::join)
.filter(Optional::isPresent)
.map(Optional::get)
.map(DataEntity::getActor)
.map(ActorEntity::getLogin)
.forEach(System.out::println);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
executor.shutdown();
}
}
/**
* 创建线程池
* @return
*/
public static ThreadPoolExecutor createThreadPool() {
int corePoolSize = Runtime.getRuntime().availableProcessors(); // 核心线程数,通常设为CPU核心数
int maxPoolSize = corePoolSize * 2; // 最大线程数
long keepAliveTime = 60L; // 线程空闲时间
// 创建一个有界队列来存放待处理任务,防止内存溢出
ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(1000);
return new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, workQueue, Executors.defaultThreadFactory(),
// 拒绝策略:当队列满时,让提交任务的线程(主线程)自己执行任务
// 这是一种很好的“背压”策略,可以减慢文件读取速度,防止任务堆积
new ThreadPoolExecutor.CallerRunsPolicy());
}
}