mirror of
https://github.com/zoriya/flood.git
synced 2025-12-06 07:16:18 +00:00
server: remove unused history snapshots and make it in-mem only
This commit is contained in:
@@ -10,18 +10,12 @@ import TransferDataStore from '@client/stores/TransferDataStore';
|
||||
import UIStore from '@client/stores/UIStore';
|
||||
|
||||
import type {DirectoryListResponse} from '@shared/types/api';
|
||||
import type {HistorySnapshot} from '@shared/constants/historySnapshotTypes';
|
||||
import type {NotificationFetchOptions, NotificationState} from '@shared/types/Notification';
|
||||
import type {ServerEvents} from '@shared/types/ServerEvents';
|
||||
|
||||
interface ActivityStreamOptions {
|
||||
historySnapshot: HistorySnapshot;
|
||||
}
|
||||
|
||||
const {baseURI} = ConfigStore;
|
||||
|
||||
let activityStreamEventSource: EventSource | null = null;
|
||||
let lastActivityStreamOptions: ActivityStreamOptions;
|
||||
let visibilityChangeTimeout: NodeJS.Timeout;
|
||||
|
||||
// TODO: Use standard Event interfaces
|
||||
@@ -120,26 +114,14 @@ const FloodActions = {
|
||||
|
||||
restartActivityStream() {
|
||||
this.closeActivityStream();
|
||||
this.startActivityStream(lastActivityStreamOptions);
|
||||
this.startActivityStream();
|
||||
},
|
||||
|
||||
startActivityStream(options: ActivityStreamOptions = {historySnapshot: 'FIVE_MINUTE'}) {
|
||||
const {historySnapshot} = options;
|
||||
const didHistorySnapshotChange =
|
||||
lastActivityStreamOptions && lastActivityStreamOptions.historySnapshot !== historySnapshot;
|
||||
|
||||
lastActivityStreamOptions = options;
|
||||
|
||||
// When the user requests a new history snapshot during an open session,
|
||||
// we need to close and re-open the event stream.
|
||||
if (didHistorySnapshotChange && activityStreamEventSource != null) {
|
||||
this.closeActivityStream();
|
||||
}
|
||||
|
||||
startActivityStream() {
|
||||
// If the user requested a new history snapshot, or the event source has not
|
||||
// alraedy been created, we open the event stream.
|
||||
if (didHistorySnapshotChange || activityStreamEventSource == null) {
|
||||
activityStreamEventSource = new EventSource(`${baseURI}api/activity-stream?historySnapshot=${historySnapshot}`);
|
||||
if (activityStreamEventSource == null) {
|
||||
activityStreamEventSource = new EventSource(`${baseURI}api/activity-stream`);
|
||||
|
||||
Object.entries(ServerEventHandlers).forEach(([event, handler]) => {
|
||||
if (activityStreamEventSource != null) {
|
||||
@@ -162,7 +144,7 @@ const handleWindowVisibilityChange = () => {
|
||||
global.clearTimeout(visibilityChangeTimeout);
|
||||
|
||||
if (activityStreamEventSource == null) {
|
||||
FloodActions.startActivityStream(lastActivityStreamOptions);
|
||||
FloodActions.startActivityStream();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
17
server/bin/migrations/02-HistoryEra.ts
Normal file
17
server/bin/migrations/02-HistoryEra.ts
Normal file
@@ -0,0 +1,17 @@
|
||||
import fs from 'fs-extra';
|
||||
import path from 'path';
|
||||
|
||||
import config from '../../../config';
|
||||
import Users from '../../models/Users';
|
||||
|
||||
const migration = () => {
|
||||
return Users.listUsers().then((users) => {
|
||||
return Promise.all(
|
||||
users.map((user) => fs.rm(path.join(config.dbPath, user._id, 'history'), {recursive: true})),
|
||||
).catch(() => {
|
||||
// do nothing.
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
export default migration;
|
||||
@@ -1,8 +1,13 @@
|
||||
import UserInDatabase2 from './UserInDatabase2';
|
||||
import UserInDatabase3 from './UserInDatabase3';
|
||||
import UserInDatabase2 from './00-UserInDatabase2';
|
||||
import UserInDatabase3 from './01-UserInDatabase3';
|
||||
import HistoryEra from './02-HistoryEra';
|
||||
|
||||
const migrations = [UserInDatabase2, UserInDatabase3];
|
||||
const migrations = [UserInDatabase2, UserInDatabase3, HistoryEra];
|
||||
|
||||
const migrate = () => Promise.all(migrations.map((migration) => migration()));
|
||||
const migrate = async () => {
|
||||
for await (const migrate of migrations) {
|
||||
await migrate();
|
||||
}
|
||||
};
|
||||
|
||||
export default migrate;
|
||||
|
||||
@@ -2,8 +2,6 @@ import type {Operation} from 'fast-json-patch';
|
||||
import type {Request, Response} from 'express';
|
||||
import type TypedEmitter from 'typed-emitter';
|
||||
|
||||
import type {HistorySnapshot} from '@shared/constants/historySnapshotTypes';
|
||||
|
||||
import DiskUsage from '../models/DiskUsage';
|
||||
import {getAllServices} from '../services';
|
||||
import ServerEvent from '../models/ServerEvent';
|
||||
@@ -11,11 +9,8 @@ import ServerEvent from '../models/ServerEvent';
|
||||
import type {DiskUsageSummary} from '../models/DiskUsage';
|
||||
import type {TransferHistory} from '../../shared/types/TransferData';
|
||||
|
||||
export default async (req: Request<unknown, unknown, unknown, {historySnapshot: HistorySnapshot}>, res: Response) => {
|
||||
const {
|
||||
query: {historySnapshot = 'FIVE_MINUTE'},
|
||||
user,
|
||||
} = req;
|
||||
export default async (req: Request, res: Response) => {
|
||||
const {user} = req;
|
||||
|
||||
if (user == null) {
|
||||
return;
|
||||
@@ -60,7 +55,7 @@ export default async (req: Request<unknown, unknown, unknown, {historySnapshot:
|
||||
}
|
||||
|
||||
// Transfer history
|
||||
await serviceInstances.historyService.getHistory({snapshot: historySnapshot}).then(
|
||||
await serviceInstances.historyService.getHistory().then(
|
||||
(snapshot) => {
|
||||
const {timestamps: lastTimestamps} = snapshot || {timestamps: []};
|
||||
const lastTimestamp = lastTimestamps[lastTimestamps.length - 1];
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
import type {UserInDatabase} from '@shared/schema/Auth';
|
||||
import type {TransferData, TransferSnapshot} from '@shared/types/TransferData';
|
||||
|
||||
import Datastore from 'nedb-promises';
|
||||
import path from 'path';
|
||||
import {setInterval} from 'timers';
|
||||
|
||||
import config from '../../config';
|
||||
|
||||
@@ -10,49 +9,18 @@ interface HistoryEraOpts {
|
||||
interval: number;
|
||||
maxTime: number;
|
||||
name: string;
|
||||
nextEraUpdateInterval?: number;
|
||||
nextEra?: HistoryEra;
|
||||
}
|
||||
|
||||
const MAX_NEXT_ERA_UPDATE_INTERVAL = 1000 * 60 * 60 * 12; // 12 hours
|
||||
const CUMULATIVE_DATA_BUFFER_DIFF = 500; // 500 milliseconds
|
||||
|
||||
class HistoryEra {
|
||||
data = [];
|
||||
ready: Promise<void>;
|
||||
lastUpdate = 0;
|
||||
startedAt = Date.now();
|
||||
opts: HistoryEraOpts;
|
||||
db: Datastore;
|
||||
autoCleanupInterval?: NodeJS.Timeout;
|
||||
nextEraUpdateInterval?: NodeJS.Timeout;
|
||||
private lastUpdate = 0;
|
||||
private opts: HistoryEraOpts;
|
||||
private db: Datastore;
|
||||
|
||||
constructor(user: UserInDatabase, opts: HistoryEraOpts) {
|
||||
constructor(opts: HistoryEraOpts) {
|
||||
this.opts = opts;
|
||||
this.db = Datastore.create({
|
||||
autoload: true,
|
||||
filename: path.join(config.dbPath, user._id, 'history', `${opts.name}.db`),
|
||||
});
|
||||
this.ready = this.prepareDatabase();
|
||||
}
|
||||
|
||||
private async prepareDatabase(): Promise<void> {
|
||||
let lastUpdate = 0;
|
||||
|
||||
await this.db.find<TransferSnapshot>({}).then(
|
||||
(snapshots) => {
|
||||
snapshots.forEach((snapshot) => {
|
||||
if (snapshot.timestamp > lastUpdate) {
|
||||
lastUpdate = snapshot.timestamp;
|
||||
}
|
||||
});
|
||||
|
||||
this.lastUpdate = lastUpdate;
|
||||
},
|
||||
() => undefined,
|
||||
);
|
||||
|
||||
await this.removeOutdatedData();
|
||||
this.db = Datastore.create();
|
||||
|
||||
let cleanupInterval = this.opts.maxTime;
|
||||
|
||||
@@ -60,49 +28,18 @@ class HistoryEra {
|
||||
cleanupInterval = config.dbCleanInterval;
|
||||
}
|
||||
|
||||
this.autoCleanupInterval = setInterval(this.removeOutdatedData, cleanupInterval);
|
||||
setInterval(this.removeOutdatedData, cleanupInterval);
|
||||
}
|
||||
|
||||
private removeOutdatedData = async (): Promise<void> => {
|
||||
if (this.opts.maxTime > 0) {
|
||||
private removeOutdatedData = (): Promise<void> => {
|
||||
const minTimestamp = Date.now() - this.opts.maxTime;
|
||||
return this.db.remove({timestamp: {$lt: minTimestamp}}, {multi: true}).then(
|
||||
() => undefined,
|
||||
() => undefined,
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
private updateNextEra = async (): Promise<void> => {
|
||||
if (this.opts.nextEraUpdateInterval == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
const minTimestamp = Date.now() - this.opts.nextEraUpdateInterval;
|
||||
|
||||
return this.db.find<TransferSnapshot>({timestamp: {$gte: minTimestamp}}).then((snapshots) => {
|
||||
if (this.opts.nextEra == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
let downTotal = 0;
|
||||
let upTotal = 0;
|
||||
|
||||
snapshots.forEach((snapshot) => {
|
||||
downTotal += Number(snapshot.download);
|
||||
upTotal += Number(snapshot.upload);
|
||||
});
|
||||
|
||||
this.opts.nextEra.addData({
|
||||
download: Number(Number(downTotal / snapshots.length).toFixed(1)),
|
||||
upload: Number(Number(upTotal / snapshots.length).toFixed(1)),
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
async addData(data: TransferData): Promise<void> {
|
||||
await this.ready;
|
||||
|
||||
const currentTime = Date.now();
|
||||
|
||||
if (currentTime - this.lastUpdate >= this.opts.interval - CUMULATIVE_DATA_BUFFER_DIFF) {
|
||||
@@ -137,8 +74,6 @@ class HistoryEra {
|
||||
}
|
||||
|
||||
async getData(): Promise<TransferSnapshot[]> {
|
||||
await this.ready;
|
||||
|
||||
const minTimestamp = Date.now() - this.opts.maxTime;
|
||||
|
||||
return this.db
|
||||
@@ -146,22 +81,6 @@ class HistoryEra {
|
||||
.sort({timestamp: 1})
|
||||
.then((snapshots) => snapshots.slice(snapshots.length - config.maxHistoryStates));
|
||||
}
|
||||
|
||||
async setNextEra(nextEra: HistoryEra): Promise<void> {
|
||||
await this.ready;
|
||||
|
||||
this.opts.nextEra = nextEra;
|
||||
|
||||
let {nextEraUpdateInterval} = this.opts;
|
||||
|
||||
if (nextEraUpdateInterval && nextEraUpdateInterval > MAX_NEXT_ERA_UPDATE_INTERVAL) {
|
||||
nextEraUpdateInterval = MAX_NEXT_ERA_UPDATE_INTERVAL;
|
||||
}
|
||||
|
||||
if (nextEraUpdateInterval) {
|
||||
this.nextEraUpdateInterval = setInterval(this.updateNextEra, nextEraUpdateInterval);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export default HistoryEra;
|
||||
|
||||
@@ -7,7 +7,6 @@ import rateLimit from 'express-rate-limit';
|
||||
import {contentTokenSchema} from '@shared/schema/api/torrents';
|
||||
|
||||
import type {FloodSettings} from '@shared/types/FloodSettings';
|
||||
import type {HistorySnapshot} from '@shared/constants/historySnapshotTypes';
|
||||
import type {NotificationFetchOptions, NotificationState} from '@shared/types/Notification';
|
||||
import type {DirectoryListQuery, DirectoryListResponse, SetFloodSettingsOptions} from '@shared/types/api/index';
|
||||
|
||||
@@ -163,12 +162,11 @@ router.get<unknown, unknown, unknown, DirectoryListQuery>(
|
||||
* @summary Gets transfer history in the given interval
|
||||
* @tags Flood
|
||||
* @security User
|
||||
* @param {HistorySnapshot} snapshot.query - interval
|
||||
* @return {TransferHistory} 200 - success response - application/json
|
||||
* @return {Error} 500 - failure response - application/json
|
||||
*/
|
||||
router.get<unknown, unknown, unknown, {snapshot: HistorySnapshot}>('/history', (req, res) => {
|
||||
req.services.historyService.getHistory(req.query).then(
|
||||
router.get('/history', (req, res) => {
|
||||
req.services.historyService.getHistory().then(
|
||||
(snapshot) => {
|
||||
res.json(snapshot);
|
||||
},
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import type {HistorySnapshot} from '@shared/constants/historySnapshotTypes';
|
||||
import type {TransferHistory, TransferSummary} from '@shared/types/TransferData';
|
||||
|
||||
import BaseService from './BaseService';
|
||||
@@ -12,80 +11,25 @@ interface HistoryServiceEvents {
|
||||
}
|
||||
|
||||
class HistoryService extends BaseService<HistoryServiceEvents> {
|
||||
errorCount = 0;
|
||||
pollTimeout?: NodeJS.Timeout;
|
||||
lastSnapshots: Partial<Record<HistorySnapshot, TransferHistory>> = {};
|
||||
private errorCount = 0;
|
||||
private pollTimeout?: NodeJS.Timeout;
|
||||
|
||||
transferSummary: TransferSummary = {
|
||||
private transferSummary: TransferSummary = {
|
||||
downRate: 0,
|
||||
downTotal: 0,
|
||||
upRate: 0,
|
||||
upTotal: 0,
|
||||
};
|
||||
|
||||
snapshots: Record<HistorySnapshot, HistoryEra> = {
|
||||
YEAR: new HistoryEra(this.user, {
|
||||
interval: 1000 * 60 * 60 * 24 * 7, // 7 days
|
||||
maxTime: 0, // infinite
|
||||
name: 'yearSnapshot',
|
||||
}),
|
||||
|
||||
MONTH: new HistoryEra(this.user, {
|
||||
interval: 1000 * 60 * 60 * 12, // 12 hours
|
||||
maxTime: 1000 * 60 * 60 * 24 * 365, // 365 days
|
||||
name: 'monthSnapshot',
|
||||
nextEraUpdateInterval: 1000 * 60 * 60 * 24 * 7, // 7 days
|
||||
}),
|
||||
|
||||
WEEK: new HistoryEra(this.user, {
|
||||
interval: 1000 * 60 * 60 * 4, // 4 hours
|
||||
maxTime: 1000 * 60 * 60 * 24 * 7 * 24, // 24 weeks
|
||||
name: 'weekSnapshot',
|
||||
nextEraUpdateInterval: 1000 * 60 * 60 * 12, // 12 hours
|
||||
}),
|
||||
|
||||
DAY: new HistoryEra(this.user, {
|
||||
interval: 1000 * 60 * 60, // 60 minutes
|
||||
maxTime: 1000 * 60 * 60 * 24 * 30, // 30 days
|
||||
name: 'daySnapshot',
|
||||
nextEraUpdateInterval: 1000 * 60 * 60 * 4, // 4 hours
|
||||
}),
|
||||
|
||||
HOUR: new HistoryEra(this.user, {
|
||||
interval: 1000 * 60 * 15, // 15 minutes
|
||||
maxTime: 1000 * 60 * 60 * 24, // 24 hours
|
||||
name: 'hourSnapshot',
|
||||
nextEraUpdateInterval: 1000 * 60 * 60, // 60 minutes
|
||||
}),
|
||||
|
||||
THIRTY_MINUTE: new HistoryEra(this.user, {
|
||||
interval: 1000 * 20, // 20 seconds
|
||||
maxTime: 1000 * 60 * 30, // 30 minutes
|
||||
name: 'thirtyMinSnapshot',
|
||||
nextEraUpdateInterval: 1000 * 60 * 15, // 15 minutes
|
||||
}),
|
||||
|
||||
FIVE_MINUTE: new HistoryEra(this.user, {
|
||||
private snapshot = new HistoryEra({
|
||||
interval: 1000 * 5, // 5 seconds
|
||||
maxTime: 1000 * 60 * 5, // 5 minutes
|
||||
name: 'fiveMinSnapshot',
|
||||
nextEraUpdateInterval: 1000 * 20, // 20 seconds
|
||||
}),
|
||||
} as const;
|
||||
});
|
||||
|
||||
constructor(...args: ConstructorParameters<typeof BaseService>) {
|
||||
super(...args);
|
||||
|
||||
let nextEra: HistoryEra;
|
||||
Object.values(this.snapshots).forEach((snapshot, index) => {
|
||||
if (index === 0) {
|
||||
nextEra = snapshot;
|
||||
return;
|
||||
}
|
||||
snapshot.setNextEra(nextEra);
|
||||
nextEra = snapshot;
|
||||
});
|
||||
|
||||
this.onServicesUpdated = () => {
|
||||
this.fetchCurrentTransferSummary();
|
||||
};
|
||||
@@ -115,7 +59,7 @@ class HistoryService extends BaseService<HistoryServiceEvents> {
|
||||
this.errorCount = 0;
|
||||
this.transferSummary = nextTransferSummary;
|
||||
|
||||
await this.snapshots.FIVE_MINUTE.addData({
|
||||
await this.snapshot.addData({
|
||||
upload: nextTransferSummary.upRate,
|
||||
download: nextTransferSummary.downRate,
|
||||
});
|
||||
@@ -154,8 +98,8 @@ class HistoryService extends BaseService<HistoryServiceEvents> {
|
||||
} as const;
|
||||
}
|
||||
|
||||
async getHistory({snapshot}: {snapshot: HistorySnapshot}): Promise<TransferHistory> {
|
||||
return this.snapshots[snapshot]?.getData().then((transferSnapshots) =>
|
||||
async getHistory(): Promise<TransferHistory> {
|
||||
return this.snapshot.getData().then((transferSnapshots) =>
|
||||
transferSnapshots.reduce(
|
||||
(history, transferSnapshot) => {
|
||||
history.download.push(transferSnapshot.download);
|
||||
|
||||
@@ -1,4 +0,0 @@
|
||||
const historySnapshotTypes = ['FIVE_MINUTE', 'THIRTY_MINUTE', 'HOUR', 'DAY', 'WEEK', 'MONTH', 'YEAR'] as const;
|
||||
|
||||
export default historySnapshotTypes;
|
||||
export type HistorySnapshot = typeof historySnapshotTypes[number];
|
||||
Reference in New Issue
Block a user