更新了多个文件以支持数据同步功能,包括配置文件和数据库操作类的修改。

This commit is contained in:
YuTian 2025-05-11 19:08:28 +08:00
parent 0eb25bcbb0
commit 8327a0c06f
15 changed files with 1306 additions and 50 deletions

View File

@ -1,14 +1,24 @@
package com.io.yutian.elementoriginlib; package com.io.yutian.elementoriginlib;
import com.io.yutian.elementoriginlib.config.MongodbConfig;
import com.io.yutian.elementoriginlib.config.OriginLibConfig; import com.io.yutian.elementoriginlib.config.OriginLibConfig;
import com.io.yutian.elementoriginlib.config.RedisConfig;
import com.io.yutian.elementoriginlib.lang.Lang; import com.io.yutian.elementoriginlib.lang.Lang;
import com.io.yutian.elementoriginlib.listener.GuiHandlerListener; import com.io.yutian.elementoriginlib.listener.GuiHandlerListener;
import com.io.yutian.elementoriginlib.listener.PlayerChatInputListener; import com.io.yutian.elementoriginlib.listener.PlayerChatInputListener;
import com.io.yutian.elementoriginlib.logger.Logger; import com.io.yutian.elementoriginlib.logger.Logger;
import com.io.yutian.elementoriginlib.manager.CommandManager; import com.io.yutian.elementoriginlib.manager.CommandManager;
import com.io.yutian.elementoriginlib.redis.RedisIO; import com.io.yutian.elementoriginlib.redis.RedisIO;
import com.io.yutian.elementoriginlib.util.FileUtil;
import com.io.yutian.elementoriginlib.ztsd.ZstdDictionaryManager;
import org.bukkit.configuration.file.FileConfiguration;
import org.bukkit.configuration.file.YamlConfiguration;
import org.bukkit.plugin.java.JavaPlugin; import org.bukkit.plugin.java.JavaPlugin;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
public final class ElementOriginLib extends JavaPlugin { public final class ElementOriginLib extends JavaPlugin {
public static final Logger LOGGER = Logger.getLogger(ElementOriginLib.class); public static final Logger LOGGER = Logger.getLogger(ElementOriginLib.class);
@ -18,6 +28,9 @@ public final class ElementOriginLib extends JavaPlugin {
private RedisIO redisIO; private RedisIO redisIO;
private OriginLibConfig config; private OriginLibConfig config;
private RedisConfig redisConfig;
private MongodbConfig mongodbConfig;
private ZstdDictionaryManager zstdDictionaryManager;
@Override @Override
public void onEnable() { public void onEnable() {
@ -31,19 +44,51 @@ public final class ElementOriginLib extends JavaPlugin {
reload(); reload();
redisIO = new RedisIO(); redisIO = new RedisIO();
redisIO.init(this); redisIO.init(redisConfig);
ZstdDictionaryManager.DictionaryConfig config = new ZstdDictionaryManager.DictionaryConfig.Builder()
.minSampleSize(1024) // 最小样本大小1KB
.maxSampleSize(16 * 1024 * 1024) // 最大样本大小16MB
.minQualityThreshold(0.5) // 最小样本质量阈值
.compressionLevel(3) // 压缩级别1-223是平衡值
.maxDictionarySize(128 * 1024) // 字典大小128B
.trainingInterval(Duration.ofHours(1)) // 训练间隔1小时
.maxSamples(1024) // 触发训练的样本数量
.maxSampleFileSize(512 * 1024 * 1024) // 样本文件大小限制512MB
.build();
try {
zstdDictionaryManager = new ZstdDictionaryManager("plugins/pixeldatasync/zstd_dictionary", config);
} catch (IOException e) {
e.printStackTrace();
}
} }
@Override @Override
public void onDisable() { public void onDisable() {
redisIO.close(); redisIO.close();
zstdDictionaryManager.shutdown();
} }
public void reload() { public void reload() {
config = new OriginLibConfig(); config = new OriginLibConfig();
saveDefaultConfig(); saveDefaultConfig();
reloadConfig(); reloadConfig();
redisConfig = new RedisConfig();
File redisFile = new File(getDataFolder(), "redis.yml");
if (!redisFile.exists()) {
saveResource("redis.yml", false);
}
FileConfiguration redisFileConfig = YamlConfiguration.loadConfiguration(redisFile);
redisConfig.load(redisFileConfig);
mongodbConfig = new MongodbConfig();
File mongodbFile = new File(getDataFolder(), "mongodb.yml");
if (!mongodbFile.exists()) {
saveResource("mongodb.yml", false);
}
FileConfiguration mongodbFileConfig = YamlConfiguration.loadConfiguration(mongodbFile);
mongodbConfig.load(mongodbFileConfig);
config.load(getConfig()); config.load(getConfig());
Lang.registerLangFile(this); Lang.registerLangFile(this);
Lang.reload(); Lang.reload();
@ -53,6 +98,14 @@ public final class ElementOriginLib extends JavaPlugin {
return config; return config;
} }
public RedisConfig getRedisConfig() {
return redisConfig;
}
public MongodbConfig getMongodbConfig() {
return mongodbConfig;
}
public RedisIO getRedisIO() { public RedisIO getRedisIO() {
return redisIO; return redisIO;
} }

View File

@ -0,0 +1,40 @@
package com.io.yutian.elementoriginlib.config;
import org.bukkit.configuration.file.FileConfiguration;
public class MongodbConfig {
private String host;
private int port;
private String database;
private String username;
private String password;
public void load(FileConfiguration config) {
host = config.getString("host", "127.0.0.1");
port = config.getInt("port", 27017);
database = config.getString("database", "admin");
username = config.getString("username", "root");
password = config.getString("password", "123456");
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public String getDatabase() {
return database;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
}

View File

@ -7,13 +7,6 @@ public class OriginLibConfig {
private String redisBungeeNetworkId; private String redisBungeeNetworkId;
private String redisBungeeProxyId; private String redisBungeeProxyId;
private String mongoDbHost;
private int mongoDbPort;
private String mongoDbDatabase;
private String mongoDbUsername;
private String mongoDbPassword;
public void load(FileConfiguration config) { public void load(FileConfiguration config) {
redisBungeeNetworkId = config.getString("redisBungeeNetworkId"); redisBungeeNetworkId = config.getString("redisBungeeNetworkId");
redisBungeeProxyId = config.getString("redisBungeeProxyId"); redisBungeeProxyId = config.getString("redisBungeeProxyId");

View File

@ -0,0 +1,31 @@
package com.io.yutian.elementoriginlib.config;
import org.bukkit.configuration.file.FileConfiguration;
public class RedisConfig {
private String server;
private int port;
private String password;
public void load(FileConfiguration config) {
server = config.getString("server", "localhost");
port = config.getInt("port", 6379);
password = config.getString("password");
if (password != null && (password.isEmpty() || password.equals("none"))) {
password = null;
}
}
public String getServer() {
return server;
}
public int getPort() {
return port;
}
public String getPassword() {
return password;
}
}

View File

@ -1,29 +1,11 @@
package com.io.yutian.elementoriginlib.datasync; package com.io.yutian.elementoriginlib.datasync;
import com.io.yutian.elementoriginlib.ElementOriginLib; import java.util.UUID;
import com.io.yutian.elementoriginlib.serialize.SerializeHelper;
import redis.clients.jedis.Jedis;
public class DataSyncHelper { public class DataSyncHelper {
public static void saveData(String key, Object value) { public static void sync(UUID uuid, Object obj) {
String strValue = SerializeHelper.serialize(value);
try (Jedis jedis = ElementOriginLib.inst().getRedisIO().getJedisPool().getResource()) {
jedis.set(key, strValue);
}
}
public static <T> T getData(String key, Class<T> clazz) {
try (Jedis jedis = ElementOriginLib.inst().getRedisIO().getJedisPool().getResource()) {
if (!jedis.exists(key)) {
return null;
}
String strValue = jedis.get(key);
if (strValue == null) {
return null;
}
return SerializeHelper.deserialize(strValue, clazz);
}
} }
} }

View File

@ -0,0 +1,6 @@
package com.io.yutian.elementoriginlib.datasync;
public enum SyncDirection {
REDIS_TO_MONGO,
MONGO_TO_REDIS
}

View File

@ -0,0 +1,155 @@
package com.io.yutian.elementoriginlib.datasync;
import com.io.yutian.elementoriginlib.mongodb.MongoDBIO;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.ReplaceOptions;
import org.bson.Document;
import org.bukkit.entity.Player;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisException;
public class SyncModule {
public static final String KEY_PREFIX = "playerdata";
private static final int DEFAULT_LOCK_EXPIRE = 15;
private static final String _ID = "_id";
private final String moduleName;
private final String databaseName;
private final String collectionName;
public SyncModule(String moduleName, String databaseName, String collectionName) {
this.moduleName = moduleName;
this.databaseName = databaseName;
this.collectionName = collectionName;
}
public SyncResult syncToRedis(Player player, JedisPool jedisPool, MongoDBIO mongoDBIO) {
if (player == null || jedisPool == null || mongoDBIO == null) {
return SyncResult.INVALID_PARAMETERS;
}
String playerId = player.getUniqueId().toString();
String redisKey = buildRedisKey(playerId);
String lockKey = redisKey + ":lock";
try (Jedis jedis = jedisPool.getResource()) {
if (!acquireLock(jedis, lockKey)) {
return SyncResult.LOCK_FAILED;
}
MongoCollection<Document> collection = mongoDBIO.getMongoDatabase()
.getCollection(collectionName);
Document query = new Document(_ID, player.getUniqueId());
Document mongoDoc = collection.find(query).first();
if (mongoDoc == null) {
return SyncResult.MONGO_DATA_NOT_FOUND;
}
String jsonData = mongoDoc.toJson();
jedis.set(redisKey, jsonData);
return SyncResult.SUCCESS;
} catch (JedisException e) {
return SyncResult.REDIS_ERROR;
} catch (Exception e) {
return SyncResult.MONGO_ERROR;
} finally {
releaseLock(jedisPool, lockKey);
}
}
public SyncResult syncToMongoDB(Player player, JedisPool jedisPool, MongoDBIO mongoDBIO) {
if (player == null || jedisPool == null || mongoDBIO == null) {
return SyncResult.INVALID_PARAMETERS;
}
String playerId = player.getUniqueId().toString();
String redisKey = buildRedisKey(playerId);
String lockKey = redisKey + ":lock";
try (Jedis jedis = jedisPool.getResource()) {
if (!acquireLock(jedis, lockKey)) {
return SyncResult.LOCK_FAILED;
}
if (!jedis.exists(redisKey)) {
return SyncResult.NOT_FOUND_KEY;
}
String redisValue = jedis.get(redisKey);
if (redisValue == null || redisValue.isEmpty()) {
return SyncResult.EMPTY_DATA;
}
MongoCollection<Document> collection = mongoDBIO.getMongoDatabase()
.getCollection(collectionName);
Document doc = Document.parse(redisValue);
collection.replaceOne(
new Document(_ID, player.getUniqueId()),
doc,
new ReplaceOptions().upsert(true)
);
jedis.del(redisKey);
return SyncResult.SUCCESS;
} catch (JedisException e) {
return SyncResult.REDIS_ERROR;
} catch (Exception e) {
return SyncResult.MONGO_ERROR;
} finally {
releaseLock(jedisPool, lockKey);
}
}
public boolean checkMongoDBConnection(MongoDBIO mongoDBIO) {
try {
Document pingResult = mongoDBIO.getMongoDatabase()
.runCommand(new Document("ping", 1));
return true;
} catch (Exception e) {
return false;
}
}
public boolean checkRedisConnection(JedisPool jedisPool) {
try (Jedis jedis = jedisPool.getResource()) {
String result = jedis.ping();
return "PONG".equals(result);
} catch (Exception e) {
return false;
}
}
private String buildRedisKey(String playerId) {
return String.format("%s:%s:%s", KEY_PREFIX, collectionName, playerId);
}
private boolean acquireLock(Jedis jedis, String lockKey) {
long result = jedis.setnx(lockKey, "1");
if (result == 1) {
jedis.expire(lockKey, DEFAULT_LOCK_EXPIRE);
return true;
}
return false;
}
private void releaseLock(JedisPool jedisPool, String lockKey) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.del(lockKey);
} catch (JedisException e) {
}
}
public String getModuleName() {
return moduleName;
}
public String getDatabaseName() {
return databaseName;
}
public String getCollectionName() {
return collectionName;
}
}

View File

@ -0,0 +1,33 @@
package com.io.yutian.elementoriginlib.datasync;
public enum SyncResult {
SUCCESS("同步成功", false),
REDIS_CONNECTION_FAILED("Redis连接失败", true),
MONGO_CONNECTION_FAILED("MongoDB连接失败", true),
LOCK_FAILED("获取锁失败", true),
NOT_FOUND_KEY("Redis键不存在", false),
MONGO_DATA_NOT_FOUND("MongoDB数据不存在", false),
EMPTY_DATA("Redis数据为空", false),
REDIS_ERROR("Redis操作异常", true),
MONGO_ERROR("MongoDB操作异常", true),
INVALID_PARAMETERS("参数无效", true),
UNKNOWN_ERROR("未知错误", true);
private final String message;
private final boolean error;
SyncResult(String message, boolean isError) {
this.message = message;
this.error = isError;
}
public String getMessage() {
return message;
}
public boolean isError() {
return error;
}
}

View File

@ -0,0 +1,26 @@
package com.io.yutian.elementoriginlib.mongodb;
import com.io.yutian.elementoriginlib.ElementOriginLib;
import com.io.yutian.elementoriginlib.config.MongodbConfig;
import java.util.HashMap;
import java.util.Map;
public class MongoDBFactory {
private static Map<String, MongoDBIO> mongoDBIOs = new HashMap<>();
public static MongoDBIO getMongoDBIO(String dbName) {
return getMongoDBIO(ElementOriginLib.inst().getMongodbConfig(), dbName);
}
public static MongoDBIO getMongoDBIO(MongodbConfig config, String dbName) {
MongoDBIO mongoDBIO = mongoDBIOs.get(dbName);
if (mongoDBIO == null) {
mongoDBIO = new MongoDBIO(config.getHost(), config.getPort(), config.getUsername(), config.getPassword(), dbName);
mongoDBIOs.put(dbName, mongoDBIO);
}
return mongoDBIO;
}
}

View File

@ -0,0 +1,59 @@
package com.io.yutian.elementoriginlib.mongodb;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import org.bson.UuidRepresentation;
import org.bson.codecs.UuidCodec;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
public class MongoDBIO {
private final static Logger LOG = LoggerFactory.getLogger(MongoDBIO.class);
private static final CodecRegistry CODEC_REGISTRY = CodecRegistries.fromRegistries(
CodecRegistries.fromCodecs(new UuidCodec(UuidRepresentation.STANDARD)),
MongoClientSettings.getDefaultCodecRegistry()
);
private MongoDatabase mongoDatabase;
public MongoDBIO(String host, int port, String username, String password, String dbname) {
String connectionString = String.format(
"mongodb://%s:%s@%s:%d/?authSource=admin",
URLEncoder.encode(username, StandardCharsets.UTF_8),
URLEncoder.encode(password, StandardCharsets.UTF_8),
host, port
);
MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(connectionString))
.codecRegistry(CODEC_REGISTRY)
.uuidRepresentation(UuidRepresentation.STANDARD)
.build();
MongoClient client = MongoClients.create(settings);
mongoDatabase = client.getDatabase(dbname);
if (mongoDatabase == null) {
throw new IllegalArgumentException("获取数据库实例失败:" + dbname);
}
}
public MongoDBIO(MongoClient mongo, String dbname) {
mongoDatabase = mongo.getDatabase(dbname);
if (mongoDatabase == null) {
throw new IllegalArgumentException("获取数据库实例失败:" + dbname);
}
}
public MongoDatabase getMongoDatabase() {
return mongoDatabase;
}
}

View File

@ -1,34 +1,19 @@
package com.io.yutian.elementoriginlib.redis; package com.io.yutian.elementoriginlib.redis;
import com.io.yutian.elementoriginlib.util.FileUtil; import com.io.yutian.elementoriginlib.config.RedisConfig;
import org.bukkit.configuration.file.FileConfiguration;
import org.bukkit.configuration.file.YamlConfiguration;
import org.bukkit.plugin.Plugin;
import redis.clients.jedis.Jedis; import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.JedisPoolConfig;
import java.io.File;
import java.util.Set; import java.util.Set;
public class RedisIO { public class RedisIO {
private JedisPool jedisPool; private JedisPool jedisPool;
public void init(Plugin plugin) { public void init(RedisConfig redisConfig) {
File file = FileUtil.getFile(plugin, "", "redis.yml");
if (!file.exists()) {
plugin.saveResource("redis.yml", false);
}
FileConfiguration configuration = YamlConfiguration.loadConfiguration(file);
String redisServer = configuration.getString("server", "localhost");
int redisPort = configuration.getInt("port", 6379);
String redisPassword = configuration.getString("password");
if (redisPassword != null && (redisPassword.isEmpty() || redisPassword.equals("none"))) {
redisPassword = null;
}
try { try {
String finalRedisPassword = redisPassword; String finalRedisPassword = redisConfig.getPassword();
JedisPoolConfig config = new JedisPoolConfig(); JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(256); config.setMaxTotal(256);
config.setMaxIdle(64); config.setMaxIdle(64);
@ -39,7 +24,7 @@ public class RedisIO {
config.setMinEvictableIdleTimeMillis(600_000); config.setMinEvictableIdleTimeMillis(600_000);
config.setBlockWhenExhausted(true); config.setBlockWhenExhausted(true);
config.setMaxWaitMillis(2000); config.setMaxWaitMillis(2000);
jedisPool = new JedisPool(config, redisServer, redisPort, 0, finalRedisPassword); jedisPool = new JedisPool(config, redisConfig.getServer(), redisConfig.getPort(), 0, finalRedisPassword);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }

View File

@ -0,0 +1,888 @@
package com.io.yutian.elementoriginlib.ztsd;
import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdDictTrainer;
import com.github.luben.zstd.ZstdInputStream;
import com.io.yutian.elementoriginlib.ElementOriginLib;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class ZstdDictionaryManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ZstdDictionaryManager.class);
private final DictionaryConfig config;
private final BlockingQueue<Sample> sampleQueue;
private final Map<String, DictionaryVersion> dictionaryVersions;
private final CompressionStats stats;
private final AtomicBoolean isTraining;
private final ScheduledExecutorService scheduler;
private final Path baseDir;
private final Path samplesDir;
private final Path statsDir;
private final AtomicInteger fileCounter;
public ZstdDictionaryManager(String storageDir, DictionaryConfig config) throws IOException {
this.config = config;
this.sampleQueue = new LinkedBlockingQueue<>();
this.dictionaryVersions = new ConcurrentHashMap<>();
this.stats = new CompressionStats();
this.isTraining = new AtomicBoolean(false);
this.scheduler = Executors.newScheduledThreadPool(1);
this.baseDir = Paths.get(storageDir).toAbsolutePath();
this.samplesDir = baseDir.resolve("samples");
this.statsDir = baseDir.resolve("stats");
this.fileCounter = new AtomicInteger(0);
Files.createDirectories(this.samplesDir);
Files.createDirectories(this.statsDir);
loadLatestSamples();
loadStats();
if (config.scheduledTraining) {
startScheduledTraining();
}
}
private void startScheduledTraining() {
scheduler.scheduleAtFixedRate(
this::trainIfNeeded,
config.getTrainingInterval().toMillis(),
config.getTrainingInterval().toMillis(),
TimeUnit.MILLISECONDS
);
LOGGER.info("已启动定时训练任务,间隔: {} 毫秒", config.getTrainingInterval().toMillis());
}
public void addSample(String text, String source) throws IOException {
try {
Sample sample = new Sample(text.getBytes("UTF-8"), source);
evaluateSampleQuality(sample);
if (sample.getQuality() >= config.getMinQualityThreshold()) {
sampleQueue.offer(sample);
if (sampleQueue.size() >= config.getMaxSamples()) {
trainIfNeeded();
}
}
} catch (Exception e) {
stats.recordError("ADD_SAMPLE_ERROR");
throw new IOException("添加样本失败: " + e.getMessage(), e);
}
}
private void evaluateSampleQuality(Sample sample) {
double quality = 1.0;
byte[] data = sample.getData();
LOGGER.debug("评估样本质量 - 来源: {}, 大小: {} 字节",
sample.getSource(), data.length);
if (data.length == 0) {
sample.setQuality(0.0);
LOGGER.debug("样本为空质量设为0");
return;
}
if (data.length < config.getMinSampleSize()) {
quality *= 0.5;
LOGGER.debug("样本过小,质量降低 - 当前大小: {}, 最小要求: {}",
data.length, config.getMinSampleSize());
} else if (data.length > config.getMaxSampleSize()) {
quality *= 0.3;
LOGGER.debug("样本过大,质量降低 - 当前大小: {}, 最大限制: {}",
data.length, config.getMaxSampleSize());
}
SampleMetrics metrics = calculateSampleMetrics(data);
if (metrics.entropy < 0.5) {
quality *= 0.7;
LOGGER.debug("样本熵值过低,质量降低 - 熵值: {}", metrics.entropy);
}
if (metrics.uniqueRatio < 0.3) {
quality *= 0.8;
LOGGER.debug("样本多样性不足,质量降低 - 唯一字节比例: {}", metrics.uniqueRatio);
}
if (metrics.repetitionRatio > 0.7) {
quality *= 0.6;
LOGGER.debug("样本重复性过高,质量降低 - 重复比例: {}", metrics.repetitionRatio);
}
if (metrics.distributionScore < 0.4) {
quality *= 0.9;
LOGGER.debug("样本字节分布不均匀,质量降低 - 分布得分: {}", metrics.distributionScore);
}
if (metrics.compressionPotential < 0.3) {
quality *= 0.85;
LOGGER.debug("样本压缩潜力较低,质量降低 - 压缩潜力: {}", metrics.compressionPotential);
}
sample.setQuality(quality);
LOGGER.debug("样本质量评估完成 - 最终质量: {}, 熵值: {}, 唯一字节比例: {}, 重复比例: {}, 分布得分: {}, 压缩潜力: {}",
quality, metrics.entropy, metrics.uniqueRatio, metrics.repetitionRatio, metrics.distributionScore, metrics.compressionPotential);
}
private SampleMetrics calculateSampleMetrics(byte[] data) {
int[] frequencies = new int[256];
int[] runLengths = new int[256];
int currentRun = 1;
byte lastByte = data[0];
for (int i = 1; i < data.length; i++) {
byte currentByte = data[i];
frequencies[currentByte & 0xFF]++;
if (currentByte == lastByte) {
currentRun++;
} else {
if (currentRun > runLengths[lastByte & 0xFF]) {
runLengths[lastByte & 0xFF] = currentRun;
}
currentRun = 1;
lastByte = currentByte;
}
}
if (currentRun > runLengths[lastByte & 0xFF]) {
runLengths[lastByte & 0xFF] = currentRun;
}
int uniqueBytes = 0;
for (int freq : frequencies) {
if (freq > 0) uniqueBytes++;
}
double entropy = 0.0;
for (int freq : frequencies) {
if (freq > 0) {
double probability = (double) freq / data.length;
entropy -= probability * (Math.log(probability) / Math.log(2));
}
}
entropy /= 8.0;
double repetitionRatio = 0.0;
for (int runLength : runLengths) {
repetitionRatio += (double) runLength / data.length;
}
double distributionScore = 0.0;
double expectedFreq = 1.0 / 256;
for (int freq : frequencies) {
if (freq > 0) {
double actualFreq = (double) freq / data.length;
distributionScore += 1.0 - Math.abs(actualFreq - expectedFreq);
}
}
distributionScore /= 256;
double compressionPotential = 1.0 - (entropy * 0.4 + repetitionRatio * 0.3 + (1.0 - distributionScore) * 0.3);
return new SampleMetrics(
entropy,
(double) uniqueBytes / 256,
repetitionRatio,
distributionScore,
compressionPotential
);
}
private void trainIfNeeded() {
if (isTraining.compareAndSet(false, true)) {
CompletableFuture.runAsync(this::trainAndRotate)
.whenComplete((v, e) -> isTraining.set(false));
}
}
private void trainAndRotate() {
try {
List<Sample> samples = new ArrayList<>();
sampleQueue.drainTo(samples, config.getMaxSamples());
if (samples.isEmpty()) {
LOGGER.info("没有样本需要训练");
return;
}
LOGGER.info("开始训练字典,样本数量: {}", samples.size());
ZstdDictTrainer trainer = new ZstdDictTrainer(config.getMaxDictionarySize(), config.getMaxDictionarySize());
for (Sample sample : samples) {
trainer.addSample(sample.getData());
}
byte[] newDictionary = trainer.trainSamples();
LOGGER.info("字典训练完成,大小: {} 字节", newDictionary.length);
String versionId = Instant.now().toString().replace(":", "-");
DictionaryVersion version = new DictionaryVersion(versionId, newDictionary);
dictionaryVersions.put(versionId, version);
LOGGER.info("创建新字典版本: {}", versionId);
saveSamples(samples);
cleanOldVersions();
} catch (Exception e) {
LOGGER.error("训练字典失败: {}", e.getMessage());
stats.recordError("TRAINING_ERROR");
}
}
public byte[] compress(String text) throws IOException {
if (dictionaryVersions.isEmpty()) {
throw new IllegalStateException("未初始化字典,请先添加样本");
}
try {
long startTime = System.nanoTime();
byte[] original = text.getBytes("UTF-8");
byte[] compressed = Zstd.compressUsingDict(
original,
getLatestDictionary().getDictionary(),
config.getCompressionLevel()
);
long endTime = System.nanoTime();
stats.recordCompression(original.length, compressed.length, endTime - startTime);
DictionaryVersion latestDict = getLatestDictionary();
latestDict.incrementUseCount();
latestDict.updatePerformanceMetrics(
(double) compressed.length / original.length,
endTime - startTime,
0
);
return compressed;
} catch (Exception e) {
stats.recordError("COMPRESSION_ERROR");
throw new IOException("压缩失败: " + e.getMessage(), e);
}
}
public String decompress(byte[] compressed) throws IOException {
if (dictionaryVersions.isEmpty()) {
throw new IllegalStateException("未初始化字典,请先添加样本");
}
try {
long startTime = System.nanoTime();
try (ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
ZstdInputStream zis = new ZstdInputStream(bis)) {
zis.setDict(getLatestDictionary().getDictionary());
byte[] result = zis.readAllBytes();
long endTime = System.nanoTime();
stats.recordDecompression(endTime - startTime);
return new String(result, "UTF-8");
}
} catch (Exception e) {
stats.recordError("DECOMPRESSION_ERROR");
throw new IOException("解压失败: " + e.getMessage(), e);
}
}
private DictionaryVersion getLatestDictionary() {
return dictionaryVersions.values().stream()
.max(Comparator.comparing(DictionaryVersion::getCreateTime))
.orElseThrow(() -> new IllegalStateException("没有可用的字典"));
}
private void saveSamples(List<Sample> samples) throws IOException {
if (samples.isEmpty()) {
LOGGER.info("没有样本需要保存");
return;
}
String timestamp = Instant.now().toString().replace(":", "-");
String sampleFileName = String.format("samples_%s_%03d.bin",
timestamp, fileCounter.incrementAndGet());
Path sampleFile = samplesDir.resolve(sampleFileName);
LOGGER.info("开始保存样本文件: {}, 样本数量: {}", sampleFileName, samples.size());
try (ObjectOutputStream out = new ObjectOutputStream(Files.newOutputStream(sampleFile))) {
out.writeObject(samples);
LOGGER.info("样本文件保存成功: {}", sampleFile);
if (Files.exists(sampleFile)) {
long fileSize = Files.size(sampleFile);
LOGGER.info("样本文件大小: {} 字节", fileSize);
} else {
LOGGER.error("样本文件保存失败: 文件不存在");
}
} catch (IOException e) {
LOGGER.error("保存样本文件失败: {}, {}", sampleFile, e.getMessage());
throw e;
}
}
private void cleanOldVersions() {
if (dictionaryVersions.size() > 3) {
LOGGER.info("开始清理旧版本字典,当前版本数: {}", dictionaryVersions.size());
dictionaryVersions.entrySet().stream()
.sorted(Comparator.comparing(e -> e.getValue().getCreateTime()))
.limit(dictionaryVersions.size() - 3)
.forEach(e -> {
dictionaryVersions.remove(e.getKey());
LOGGER.info("已删除旧版本字典: {}", e.getKey());
});
}
}
public void exportDictionary(String versionId, Path targetPath) throws IOException {
DictionaryVersion version = dictionaryVersions.get(versionId);
if (version == null) {
throw new IllegalArgumentException("字典版本不存在: " + versionId);
}
Files.write(targetPath, version.getDictionary());
}
public void importDictionary(Path sourcePath, String versionId) throws IOException {
byte[] dictionary = Files.readAllBytes(sourcePath);
DictionaryVersion version = new DictionaryVersion(versionId, dictionary);
dictionaryVersions.put(versionId, version);
}
public CompressionStats getStats() {
return stats;
}
public Map<String, DictionaryVersion> getDictionaryVersions() {
return Collections.unmodifiableMap(dictionaryVersions);
}
public void shutdown() {
LOGGER.info("开始关闭字典管理器");
saveStats();
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
LOGGER.warn("调度器未能在60秒内正常关闭强制关闭");
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
LOGGER.error("关闭调度器时被中断");
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
LOGGER.info("字典管理器已关闭");
}
private void loadLatestSamples() throws IOException {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(samplesDir, "samples_*.bin")) {
List<Path> files = new ArrayList<>();
stream.forEach(files::add);
LOGGER.info("样本文件数量: {}", files.size());
Optional<Path> latestFile = files.stream()
.max(Comparator.comparingLong(p -> {
try {
return Files.getLastModifiedTime(p).toMillis();
} catch (IOException e) {
LOGGER.error("获取文件修改时间失败: {}, {}", p, e.getMessage());
return 0L;
}
}));
if (latestFile.isPresent()) {
try (ObjectInputStream in = new ObjectInputStream(Files.newInputStream(latestFile.get()))) {
List<Sample> loadedSamples = (List<Sample>) in.readObject();
LOGGER.info("加载样本文件: {}, 样本数量: {}", latestFile.get().getFileName(), loadedSamples.size());
for (Sample sample : loadedSamples) {
if (sample.getQuality() >= config.getMinQualityThreshold()) {
sampleQueue.offer(sample);
}
}
if (sampleQueue.size() >= config.getMaxSamples()) {
LOGGER.info("样本数量达到阈值,开始训练字典");
trainAndRotate();
} else {
LOGGER.info("样本数量不足,当前样本数: {}, 需要样本数: {}",
sampleQueue.size(), config.getMaxSamples());
}
} catch (ClassNotFoundException e) {
throw new IOException("加载样本失败: " + e.getMessage(), e);
}
} else {
LOGGER.info("未找到历史样本文件");
}
} catch (IOException e) {
LOGGER.error("加载样本文件失败: {}", e.getMessage());
throw e;
}
}
private void saveStats() {
try {
String timestamp = Instant.now().toString().replace(":", "-");
String statsFileName = String.format("compression_stats_%s_%03d.json",
timestamp, fileCounter.incrementAndGet());
Path timestampedStatsFile = statsDir.resolve(statsFileName);
Map<String, Object> statsData = new HashMap<>();
statsData.put("totalCompressedSize", stats.getTotalCompressedSize());
statsData.put("totalOriginalSize", stats.getTotalOriginalSize());
statsData.put("totalCompressionTime", stats.getTotalCompressionTime());
statsData.put("totalDecompressionTime", stats.getTotalDecompressionTime());
statsData.put("errorCounts", stats.getErrorCounts());
statsData.put("averageCompressionRatio", stats.getAverageCompressionRatio());
statsData.put("averageCompressionTime", stats.getAverageCompressionTime());
statsData.put("averageDecompressionTime", stats.getAverageDecompressionTime());
statsData.put("timestamp", timestamp);
Map<String, Object> timeStats = new HashMap<>();
timeStats.put("lastHour", calculateTimeStats(Duration.ofHours(1)));
timeStats.put("lastDay", calculateTimeStats(Duration.ofDays(1)));
timeStats.put("lastWeek", calculateTimeStats(Duration.ofDays(7)));
statsData.put("timeStats", timeStats);
Map<String, Object> versionInfo = new HashMap<>();
dictionaryVersions.forEach((id, version) -> {
Map<String, Object> vInfo = new HashMap<>();
vInfo.put("createTime", version.getCreateTime().toString());
vInfo.put("useCount", version.getUseCount());
vInfo.put("compressionRatio", version.getCompressionRatio());
vInfo.put("compressionTime", version.getCompressionTime());
vInfo.put("decompressionTime", version.getDecompressionTime());
versionInfo.put(id, vInfo);
});
statsData.put("dictionaryVersions", versionInfo);
Files.writeString(timestampedStatsFile, new com.google.gson.Gson().toJson(statsData));
LOGGER.info("已保存统计文件: {}", timestampedStatsFile.getFileName());
} catch (IOException e) {
LOGGER.error("保存统计数据失败: {}", e.getMessage());
}
}
private Map<String, Object> calculateTimeStats(Duration duration) {
Map<String, Object> timeStats = new HashMap<>();
Instant cutoff = Instant.now().minus(duration);
List<DictionaryVersion> recentVersions = dictionaryVersions.values().stream()
.filter(v -> v.getCreateTime().isAfter(cutoff))
.collect(Collectors.toList());
if (!recentVersions.isEmpty()) {
double avgRatio = recentVersions.stream()
.mapToDouble(DictionaryVersion::getCompressionRatio)
.average()
.orElse(0.0);
double avgCompTime = recentVersions.stream()
.mapToLong(DictionaryVersion::getCompressionTime)
.average()
.orElse(0.0);
double avgDecompTime = recentVersions.stream()
.mapToLong(DictionaryVersion::getDecompressionTime)
.average()
.orElse(0.0);
timeStats.put("averageCompressionRatio", avgRatio);
timeStats.put("averageCompressionTime", avgCompTime);
timeStats.put("averageDecompressionTime", avgDecompTime);
timeStats.put("versionCount", recentVersions.size());
timeStats.put("totalUseCount", recentVersions.stream()
.mapToLong(DictionaryVersion::getUseCount)
.sum());
}
return timeStats;
}
private void loadStats() {
try {
List<Path> statsFiles = Files.list(statsDir)
.filter(p -> p.getFileName().toString().startsWith("compression_stats_"))
.collect(Collectors.toList());
LOGGER.info("统计文件数量: {}", statsFiles.size());
Optional<Path> latestStatsFile = statsFiles.stream()
.max(Comparator.comparing(p -> {
try {
return Files.getLastModifiedTime(p).toMillis();
} catch (IOException e) {
LOGGER.error("获取文件修改时间失败: {}, {}", p, e.getMessage());
return 0L;
}
}));
if (latestStatsFile.isPresent()) {
Path statsPath = latestStatsFile.get();
LOGGER.info("加载统计文件: {}", statsPath.getFileName());
try {
String json = Files.readString(statsPath);
Map<String, Object> statsData = new com.google.gson.Gson().fromJson(json, Map.class);
// 更新统计数据
if (statsData.containsKey("totalCompressedSize")) {
long originalSize = ((Number) statsData.get("totalOriginalSize")).longValue();
long compressedSize = ((Number) statsData.get("totalCompressedSize")).longValue();
long compressionTime = ((Number) statsData.get("totalCompressionTime")).longValue();
if (originalSize > 0 && compressedSize > 0) {
stats.recordCompression(originalSize, compressedSize, compressionTime);
LOGGER.info("已加载压缩统计: 原始大小={}, 压缩后大小={}, 压缩时间={}ns",
originalSize, compressedSize, compressionTime);
}
}
if (statsData.containsKey("totalDecompressionTime")) {
long decompressionTime = ((Number) statsData.get("totalDecompressionTime")).longValue();
if (decompressionTime > 0) {
stats.recordDecompression(decompressionTime);
LOGGER.info("已加载解压统计: 解压时间={}ns", decompressionTime);
}
}
if (statsData.containsKey("errorCounts")) {
Map<String, Number> errorCounts = (Map<String, Number>) statsData.get("errorCounts");
errorCounts.forEach((error, count) -> {
long errorCount = count.longValue();
if (errorCount > 0) {
for (int i = 0; i < errorCount; i++) {
stats.recordError(error.toString());
}
LOGGER.info("已加载错误统计: {}={}", error, errorCount);
}
});
}
} catch (Exception e) {
LOGGER.error("解析统计文件失败: {}, {}", statsPath, e.getMessage());
}
} else {
LOGGER.info("未找到历史统计文件,将使用新的统计记录");
}
} catch (IOException e) {
LOGGER.error("加载统计数据失败: {}", e.getMessage());
}
}
private static class SampleMetrics {
final double entropy;
final double uniqueRatio;
final double repetitionRatio;
final double distributionScore;
final double compressionPotential;
SampleMetrics(double entropy, double uniqueRatio, double repetitionRatio,
double distributionScore, double compressionPotential) {
this.entropy = entropy;
this.uniqueRatio = uniqueRatio;
this.repetitionRatio = repetitionRatio;
this.distributionScore = distributionScore;
this.compressionPotential = compressionPotential;
}
}
public static class DictionaryConfig {
private final int minSampleSize;
private final int maxSampleSize;
private final double minQualityThreshold;
private final int compressionLevel;
private final int maxDictionarySize;
private final Duration trainingInterval;
private final int maxSamples;
private final int maxSampleFileSize;
private final boolean scheduledTraining;
private DictionaryConfig(Builder builder) {
this.minSampleSize = builder.minSampleSize;
this.maxSampleSize = builder.maxSampleSize;
this.minQualityThreshold = builder.minQualityThreshold;
this.compressionLevel = builder.compressionLevel;
this.maxDictionarySize = builder.maxDictionarySize;
this.trainingInterval = builder.trainingInterval;
this.maxSamples = builder.maxSamples;
this.maxSampleFileSize = builder.maxSampleFileSize;
this.scheduledTraining = builder.scheduledTraining;
}
public int getMinSampleSize() {
return minSampleSize;
}
public int getMaxSampleSize() {
return maxSampleSize;
}
public double getMinQualityThreshold() {
return minQualityThreshold;
}
public int getCompressionLevel() {
return compressionLevel;
}
public int getMaxDictionarySize() {
return maxDictionarySize;
}
public Duration getTrainingInterval() {
return trainingInterval;
}
public int getMaxSamples() {
return maxSamples;
}
public int getMaxSampleFileSize() {
return maxSampleFileSize;
}
public boolean isScheduledTraining() {
return scheduledTraining;
}
public static class Builder {
private int minSampleSize = 1024;
private int maxSampleSize = 10 * 1024 * 1024;
private double minQualityThreshold = 0.5;
private int compressionLevel = 3;
private int maxDictionarySize = 10 * 1024;
private Duration trainingInterval = Duration.ofMinutes(30);
private int maxSamples = 100;
private int maxSampleFileSize = 10 * 1024 * 1024;
private boolean scheduledTraining = false;
public Builder minSampleSize(int size) {
this.minSampleSize = size;
return this;
}
public Builder maxSampleSize(int size) {
this.maxSampleSize = size;
return this;
}
public Builder minQualityThreshold(double threshold) {
this.minQualityThreshold = threshold;
return this;
}
public Builder compressionLevel(int level) {
this.compressionLevel = level;
return this;
}
public Builder maxDictionarySize(int size) {
this.maxDictionarySize = size;
return this;
}
public Builder trainingInterval(Duration interval) {
this.trainingInterval = interval;
return this;
}
public Builder maxSamples(int count) {
this.maxSamples = count;
return this;
}
public Builder maxSampleFileSize(int size) {
this.maxSampleFileSize = size;
return this;
}
public Builder scheduledTraining(boolean scheduledTraining) {
this.scheduledTraining = scheduledTraining;
return this;
}
public DictionaryConfig build() {
return new DictionaryConfig(this);
}
}
}
public static class Sample implements Serializable {
private static final long serialVersionUID = 1L;
private final byte[] data;
private final Instant timestamp;
private final String source;
private final Map<String, Object> metadata;
private double quality;
public Sample(byte[] data, String source) {
this.data = data;
this.timestamp = Instant.now();
this.source = source;
this.quality = 1.0;
this.metadata = new ConcurrentHashMap<>();
}
public byte[] getData() {
return data;
}
public Instant getTimestamp() {
return timestamp;
}
public String getSource() {
return source;
}
public double getQuality() {
return quality;
}
public void setQuality(double quality) {
this.quality = Math.max(0.0, Math.min(1.0, quality));
}
public Map<String, Object> getMetadata() {
return metadata;
}
}
public static class CompressionStats {
private final AtomicLong totalCompressedSize;
private final AtomicLong totalOriginalSize;
private final AtomicLong totalCompressionTime;
private final AtomicLong totalDecompressionTime;
private final Map<String, AtomicLong> errorCounts;
public CompressionStats() {
this.totalCompressedSize = new AtomicLong(0);
this.totalOriginalSize = new AtomicLong(0);
this.totalCompressionTime = new AtomicLong(0);
this.totalDecompressionTime = new AtomicLong(0);
this.errorCounts = new ConcurrentHashMap<>();
}
public void recordCompression(long originalSize, long compressedSize, long time) {
totalOriginalSize.addAndGet(originalSize);
totalCompressedSize.addAndGet(compressedSize);
totalCompressionTime.addAndGet(time);
}
public void recordDecompression(long time) {
totalDecompressionTime.addAndGet(time);
}
public void recordError(String errorType) {
errorCounts.computeIfAbsent(errorType, k -> new AtomicLong(0)).incrementAndGet();
}
public double getAverageCompressionRatio() {
long original = totalOriginalSize.get();
return original > 0 ? (double) totalCompressedSize.get() / original : 0.0;
}
public double getAverageCompressionTime() {
long count = totalOriginalSize.get();
return count > 0 ? (double) totalCompressionTime.get() / count : 0.0;
}
public double getAverageDecompressionTime() {
long count = totalOriginalSize.get();
return count > 0 ? (double) totalDecompressionTime.get() / count : 0.0;
}
public long getTotalCompressedSize() {
return totalCompressedSize.get();
}
public long getTotalOriginalSize() {
return totalOriginalSize.get();
}
public long getTotalCompressionTime() {
return totalCompressionTime.get();
}
public long getTotalDecompressionTime() {
return totalDecompressionTime.get();
}
public Map<String, Long> getErrorCounts() {
Map<String, Long> result = new ConcurrentHashMap<>();
errorCounts.forEach((k, v) -> result.put(k, v.get()));
return result;
}
}
public static class DictionaryVersion implements Serializable {
private static final long serialVersionUID = 1L;
private final String versionId;
private final Instant createTime;
private final AtomicLong useCount;
private final byte[] dictionary;
private final Map<String, Object> metadata;
private double compressionRatio;
private long compressionTime;
private long decompressionTime;
public DictionaryVersion(String versionId, byte[] dictionary) {
this.versionId = versionId;
this.createTime = Instant.now();
this.useCount = new AtomicLong(0);
this.dictionary = dictionary;
this.metadata = new ConcurrentHashMap<>();
}
public void incrementUseCount() {
useCount.incrementAndGet();
}
public void updatePerformanceMetrics(double ratio, long compTime, long decompTime) {
this.compressionRatio = ratio;
this.compressionTime = compTime;
this.decompressionTime = decompTime;
}
public String getVersionId() {
return versionId;
}
public Instant getCreateTime() {
return createTime;
}
public long getUseCount() {
return useCount.get();
}
public byte[] getDictionary() {
return dictionary;
}
public Map<String, Object> getMetadata() {
return metadata;
}
public double getCompressionRatio() {
return compressionRatio;
}
public long getCompressionTime() {
return compressionTime;
}
public long getDecompressionTime() {
return decompressionTime;
}
}
}

View File

@ -0,0 +1,5 @@
host: "localhost"
port: 27017
database: "admin"
username: "root"
password: "123456"

View File

@ -1,3 +1,3 @@
redis-server: 127.0.0.1 server: 127.0.0.1
redis-port: 6379 port: 6379
redis-password: "none" password: "none"