Skip to content

Commit

Permalink
✨ feat: 1. Add database migration function. 2. Add download speed sta…
Browse files Browse the repository at this point in the history
…tistics persistence and area chart display.
  • Loading branch information
jarvis2f committed Jan 6, 2025
1 parent 15c8363 commit 2de0e63
Show file tree
Hide file tree
Showing 33 changed files with 1,932 additions and 103 deletions.
34 changes: 25 additions & 9 deletions api/src/main/java/telegram/files/DataVerticle.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package telegram.files;

import cn.hutool.core.lang.Version;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
Expand All @@ -9,11 +10,12 @@
import io.vertx.jdbcclient.JDBCConnectOptions;
import io.vertx.jdbcclient.JDBCPool;
import io.vertx.sqlclient.PoolOptions;
import telegram.files.repository.FileRepository;
import telegram.files.repository.SettingRepository;
import telegram.files.repository.TelegramRepository;
import io.vertx.sqlclient.SqlConnection;
import org.jooq.lambda.tuple.Tuple;
import telegram.files.repository.*;
import telegram.files.repository.impl.FileRepositoryImpl;
import telegram.files.repository.impl.SettingRepositoryImpl;
import telegram.files.repository.impl.StatisticRepositoryImpl;
import telegram.files.repository.impl.TelegramRepositoryImpl;

import java.io.File;
Expand All @@ -31,21 +33,35 @@ public class DataVerticle extends AbstractVerticle {

public static SettingRepository settingRepository;

public static StatisticRepository statisticRepository;

public void start(Promise<Void> stopPromise) {
pool = JDBCPool.pool(vertx,
new JDBCConnectOptions()
.setJdbcUrl("jdbc:sqlite:%s".formatted(getDataPath()))
,
new PoolOptions().setMaxSize(16).setName("pool-tf")
);
settingRepository = new SettingRepositoryImpl(pool);
telegramRepository = new TelegramRepositoryImpl(pool);
fileRepository = new FileRepositoryImpl(pool);
settingRepository = new SettingRepositoryImpl(pool);
Future.all(List.of(
telegramRepository.init(),
fileRepository.init(),
settingRepository.init()
))
statisticRepository = new StatisticRepositoryImpl(pool);
List<Definition> definitions = List.of(
new SettingRecord.SettingRecordDefinition(),
new TelegramRecord.TelegramRecordDefinition(),
new FileRecord.FileRecordDefinition(),
new StatisticRecord.StatisticRecordDefinition()
);
pool.getConnection()
.compose(conn -> Future.all(definitions.stream().map(d -> d.createTable(conn)).toList()).map(conn))
.compose(conn -> settingRepository.<Version>getByKey(SettingKey.version).map(v -> Tuple.tuple(conn, v)))
.compose(tuple -> {
SqlConnection conn = tuple.v1;
Version version = tuple.v2 == null ? new Version("0.0.0") : tuple.v2;
return Future.all(definitions.stream().map(d -> d.migrate(conn, version, new Version(Start.VERSION))).toList());
})
.compose(r ->
settingRepository.createOrUpdate(SettingKey.version.name(), Start.VERSION))
.onSuccess(r -> {
log.info("Database initialized");
stopPromise.complete();
Expand Down
19 changes: 13 additions & 6 deletions api/src/main/java/telegram/files/HttpVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -398,12 +398,19 @@ private void handleTelegramDownloadStatistics(RoutingContext ctx) {
ctx.fail(400);
return;
}
getTelegramVerticle(telegramId)
.ifPresentOrElse(telegramVerticle ->
telegramVerticle.getDownloadStatistics()
.onSuccess(ctx::json)
.onFailure(ctx::fail),
() -> ctx.fail(404));
Optional<TelegramVerticle> telegramVerticleOptional = getTelegramVerticle(telegramId);
if (telegramVerticleOptional.isEmpty()) {
ctx.fail(404);
return;
}
TelegramVerticle telegramVerticle = telegramVerticleOptional.get();

String type = ctx.request().getParam("type");
String timeRange = ctx.request().getParam("timeRange");
(Objects.equals(type, "phase") ? telegramVerticle. getDownloadStatisticsByPhase(Convert.toInt(timeRange, 1)) :
telegramVerticle.getDownloadStatistics())
.onSuccess(ctx::json)
.onFailure(ctx::fail);
}

private void handleTelegramChange(RoutingContext ctx) {
Expand Down
13 changes: 13 additions & 0 deletions api/src/main/java/telegram/files/MessyUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package telegram.files;

import java.time.LocalDateTime;

public class MessyUtils {

public static LocalDateTime withGrouping5Minutes(LocalDateTime time) {
int minute = time.getMinute();
int minuteGroup = minute / 5;
int newMinute = minuteGroup * 5;
return time.withMinute(newMinute).withSecond(0).withNano(0);
}
}
2 changes: 1 addition & 1 deletion api/src/main/java/telegram/files/Start.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
public class Start {
private static final Log log = LogFactory.get();

public static final String VERSION = "0.1.6";
public static final String VERSION = "0.1.7";

private static final CountDownLatch shutdownLatch = new CountDownLatch(1);

Expand Down
16 changes: 12 additions & 4 deletions api/src/main/java/telegram/files/TdApiHelp.java
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ public FileRecord convertFileRecord(long telegramId) {
Base64.encode((byte[]) BeanUtil.getProperty(content, "photo.minithumbnail.data")),
content.caption.text,
null,
"idle"
"idle",
System.currentTimeMillis(),
null
);
}
}
Expand Down Expand Up @@ -329,7 +331,9 @@ public FileRecord convertFileRecord(long telegramId) {
Base64.encode((byte[]) BeanUtil.getProperty(content, "video.minithumbnail.data")),
content.caption.text,
null,
"idle"
"idle",
System.currentTimeMillis(),
null
);
}
}
Expand Down Expand Up @@ -369,7 +373,9 @@ public FileRecord convertFileRecord(long telegramId) {
Base64.encode((byte[]) BeanUtil.getProperty(content, "audio.albumCoverMinithumbnail.data")),
content.caption.text,
null,
"idle"
"idle",
System.currentTimeMillis(),
null
);
}
}
Expand Down Expand Up @@ -409,7 +415,9 @@ public FileRecord convertFileRecord(long telegramId) {
Base64.encode((byte[]) BeanUtil.getProperty(content, "document.minithumbnail.data")),
content.caption.text,
null,
"idle"
"idle",
System.currentTimeMillis(),
null
);
}
}
Expand Down
93 changes: 92 additions & 1 deletion api/src/main/java/telegram/files/TelegramVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.codec.Base64;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateField;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.ArrayUtil;
Expand Down Expand Up @@ -100,6 +103,7 @@ public void start(Promise<Void> startPromise) {
telegramUpdateHandler.setOnFileDownloadsUpdated(this::onFileDownloadsUpdated);
telegramUpdateHandler.setOnMessageReceived(this::onMessageReceived);
client = Client.create(telegramUpdateHandler, this::handleException, this::handleException);
vertx.setPeriodic(60 * 1000, id -> handleSaveAvgSpeed());
this.enableProxy(this.proxyName)
.onSuccess(r -> startPromise.complete())
.onFailure(startPromise::fail);
Expand Down Expand Up @@ -474,6 +478,72 @@ public Future<JsonObject> getDownloadStatistics() {
});
}

public Future<JsonObject> getDownloadStatisticsByPhase(Integer timeRange) {
// 1: 1 hour, 2: 1 day, 3: 1 week, 4: 1 month
long endTime = System.currentTimeMillis();
long startTime = switch (timeRange) {
case 1 -> DateUtil.offsetHour(DateUtil.date(), -1).getTime();
case 2 -> DateUtil.offsetDay(DateUtil.date(), -1).getTime();
case 3 -> DateUtil.offsetWeek(DateUtil.date(), -1).getTime();
case 4 -> DateUtil.offsetMonth(DateUtil.date(), -1).getTime();
default -> throw new IllegalStateException("Unexpected value: " + timeRange);
};

return Future.all(
DataVerticle.statisticRepository.getRangeStatistics(StatisticRecord.Type.speed, this.telegramRecord.id(), startTime, endTime)
.map(statisticRecords -> {
TreeMap<String, List<JsonObject>> groupedSpeedStats = new TreeMap<>(Comparator.comparing(
switch (timeRange) {
case 1, 2 -> (Function<? super String, ? extends DateTime>) time ->
DateUtil.parse(time, DatePattern.NORM_DATETIME_MINUTE_FORMAT);
case 3, 4 -> DateUtil::parseDate;
default -> throw new IllegalStateException("Unexpected value: " + timeRange);
}
));
for (StatisticRecord record : statisticRecords) {
JsonObject data = new JsonObject(record.data());
long timestamp = record.timestamp();
String time = switch (timeRange) {
case 1 ->
MessyUtils.withGrouping5Minutes(DateUtil.toLocalDateTime(DateUtil.date(timestamp))).format(DatePattern.NORM_DATETIME_MINUTE_FORMATTER);
case 2 ->
DateUtil.date(timestamp).setField(DateField.MINUTE, 0).toString(DatePattern.NORM_DATETIME_MINUTE_FORMAT);
case 3, 4 ->
DateUtil.date(timestamp).setField(DateField.MINUTE, 0).toString(DatePattern.NORM_DATE_FORMAT);
default -> throw new IllegalStateException("Unexpected value: " + timeRange);
};
groupedSpeedStats.computeIfAbsent(time, k -> new ArrayList<>()).add(data);
}
return groupedSpeedStats.entrySet().stream()
.map(entry -> {
JsonObject speedStat = entry.getValue().stream().reduce(new JsonObject()
.put("avgSpeed", 0)
.put("medianSpeed", 0)
.put("maxSpeed", 0)
.put("minSpeed", Long.MAX_VALUE),
(a, b) -> new JsonObject()
.put("avgSpeed", a.getLong("avgSpeed") + b.getLong("avgSpeed"))
.put("medianSpeed", a.getLong("medianSpeed") + b.getLong("medianSpeed"))
.put("maxSpeed", Math.max(a.getLong("maxSpeed"), b.getLong("maxSpeed")))
.put("minSpeed", Math.min(a.getLong("minSpeed"), b.getLong("minSpeed")))
);
int size = entry.getValue().size();
speedStat.put("avgSpeed", speedStat.getLong("avgSpeed") / size)
.put("medianSpeed", speedStat.getLong("medianSpeed") / size);
return new JsonObject()
.put("time", entry.getKey())
.put("data", speedStat);
})
.toList();
}),
DataVerticle.fileRepository.getCompletedRangeStatistics(this.telegramRecord.id(), startTime, endTime, timeRange)
)
.map(r -> new JsonObject()
.put("speedStats", r.resultAt(0))
.put("completedStats", r.resultAt(1))
);
}

public Future<TdApi.Proxy> enableProxy(String proxyName) {
if (StrUtil.isBlank(proxyName)) return Future.succeededFuture();
return DataVerticle.settingRepository.<SettingProxyRecords>getByKey(SettingKey.proxys)
Expand Down Expand Up @@ -625,6 +695,23 @@ private void handleException(Throwable e) {
log.error(e);
}

private void handleSaveAvgSpeed() {
if (!authorized || telegramRecord == null) return;
AvgSpeed.SpeedStats speedStats = avgSpeed.getSpeedStats();
if (speedStats.avgSpeed() == 0
&& speedStats.minSpeed() == 0
&& speedStats.medianSpeed() == 0
&& speedStats.maxSpeed() == 0) {
return;
}
JsonObject data = JsonObject.mapFrom(speedStats);
data.remove("interval");
DataVerticle.statisticRepository.create(new StatisticRecord(Convert.toStr(telegramRecord.id()),
StatisticRecord.Type.speed,
System.currentTimeMillis(),
data.encode()));
}

private void onAuthorizationStateUpdated(TdApi.AuthorizationState authorizationState) {
log.debug("[%s] Receive authorization state update: %s".formatted(getRootId(), authorizationState));
this.lastAuthorizationState = authorizationState;
Expand Down Expand Up @@ -695,20 +782,24 @@ private void onFileUpdated(TdApi.UpdateFile updateFile) {
TdApi.File file = updateFile.file;
if (file != null) {
String localPath = null;
Long completionDate = null;
if (file.local != null && file.local.isDownloadingCompleted) {
localPath = file.local.path;
completionDate = System.currentTimeMillis();
}
FileRecord.DownloadStatus downloadStatus = TdApiHelp.getDownloadStatus(file);
DataVerticle.fileRepository.updateStatus(file.id,
file.remote.uniqueId,
localPath,
downloadStatus)
downloadStatus,
completionDate)
.onSuccess(r -> {
if (r == null || r.isEmpty()) return;
sendHttpEvent(EventPayload.build(EventPayload.TYPE_FILE_STATUS, new JsonObject()
.put("fileId", file.id)
.put("downloadStatus", r.getString("downloadStatus"))
.put("localPath", r.getString("localPath"))
.put("completionDate", r.getLong("completionDate"))
));
});
}
Expand Down
47 changes: 47 additions & 0 deletions api/src/main/java/telegram/files/repository/Definition.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package telegram.files.repository;

import cn.hutool.core.lang.Version;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import io.vertx.core.Future;
import io.vertx.sqlclient.SqlConnection;

import java.util.TreeMap;
import java.util.stream.Stream;

public interface Definition {

Log log = LogFactory.get();

String getScheme();

default TreeMap<Version, String[]> getMigrations() {
return new TreeMap<>();
}

default Future<Void> createTable(SqlConnection conn) {
return conn
.query(getScheme())
.execute()
.onFailure(err -> log.error("Failed to create table: %s".formatted(err.getMessage())))
.mapEmpty();
}

default Future<Void> migrate(SqlConnection conn, Version lastVersion, Version currentVersion) {
TreeMap<Version, String[]> migrations = getMigrations();
if (migrations.isEmpty()) {
return Future.succeededFuture();
}
return Future.all(migrations.subMap(lastVersion, false, currentVersion, true).values()
.stream()
.flatMap(arr -> Stream.of(arr)
.map(sql -> conn.query(sql)
.execute()
.onFailure(e -> log.error("Failed to apply migration: %s".formatted(sql), e)))
)
.toList()
)
.onFailure(err -> log.error("Failed to migrate table: %s".formatted(err.getMessage())))
.mapEmpty();
}
}
Loading

0 comments on commit 2de0e63

Please sign in to comment.