diff --git a/package-lock.json b/package-lock.json index 00fb68d7..4c8c8553 100644 --- a/package-lock.json +++ b/package-lock.json @@ -118,6 +118,7 @@ "mobx-react": "^7.0.5", "morgan": "^1.10.0", "nedb": "^1.8.0", + "nedb-promises": "^4.1.0", "node-sass": "^5.0.0", "overlayscrollbars": "^1.13.0", "overlayscrollbars-react": "^0.2.2", @@ -13749,6 +13750,15 @@ "underscore": "~1.4.4" } }, + "node_modules/nedb-promises": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/nedb-promises/-/nedb-promises-4.1.0.tgz", + "integrity": "sha512-nTdx7jX/Vu24L05Cy0ee7CL3L4SEHCb1jlLlegPl0VlE8jsUXgnSyNOjq3FEc3cdUSDK05X7hSzb3+a07PigmQ==", + "dev": true, + "dependencies": { + "nedb": "^1.8.0" + } + }, "node_modules/nedb/node_modules/async": { "version": "0.2.10", "resolved": "https://registry.npmjs.org/async/-/async-0.2.10.tgz", @@ -36051,6 +36061,15 @@ } } }, + "nedb-promises": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/nedb-promises/-/nedb-promises-4.1.0.tgz", + "integrity": "sha512-nTdx7jX/Vu24L05Cy0ee7CL3L4SEHCb1jlLlegPl0VlE8jsUXgnSyNOjq3FEc3cdUSDK05X7hSzb3+a07PigmQ==", + "dev": true, + "requires": { + "nedb": "^1.8.0" + } + }, "negotiator": { "version": "0.6.2", "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.2.tgz", diff --git a/package.json b/package.json index dcde244e..71bf861f 100644 --- a/package.json +++ b/package.json @@ -167,6 +167,7 @@ "mobx-react": "^7.0.5", "morgan": "^1.10.0", "nedb": "^1.8.0", + "nedb-promises": "^4.1.0", "node-sass": "^5.0.0", "overlayscrollbars": "^1.13.0", "overlayscrollbars-react": "^0.2.2", diff --git a/server/middleware/clientActivityStream.ts b/server/middleware/clientActivityStream.ts index da3e856e..9b139b76 100644 --- a/server/middleware/clientActivityStream.ts +++ b/server/middleware/clientActivityStream.ts @@ -25,10 +25,6 @@ export default async (req: Request>>( emitter: T, @@ -43,64 +39,56 @@ export default async (req: Request { - serverEvent.emit(diskUsageChange.id, 'DISK_USAGE_CHANGE', diskUsageChange.disks); - }); - - // Trigger an immediate update - DiskUsage.updateDisks().catch((e) => console.error(e)); - - const torrentList = (await fetchTorrentList) || serviceInstances.torrentService.getTorrentListSummary(); - const taxonomy = serviceInstances.taxonomyService.getTaxonomy(); - const transferSummary = serviceInstances.historyService.getTransferSummary(); - - serverEvent.emit(torrentList.id, 'TORRENT_LIST_FULL_UPDATE', torrentList.torrents); - serverEvent.emit(taxonomy.id, 'TAXONOMY_FULL_UPDATE', taxonomy.taxonomy); - serverEvent.emit(transferSummary.id, 'TRANSFER_SUMMARY_FULL_UPDATE', transferSummary.transferSummary); - serverEvent.emit( - Date.now(), - 'NOTIFICATION_COUNT_CHANGE', - serviceInstances.notificationService.getNotificationCount(), - ); + // Don't proceed if client connection setting is unusable + if (serviceInstances.clientGatewayService == null) { + return; + } + // Client connection status change event handleEvents(serviceInstances.clientGatewayService, 'CLIENT_CONNECTION_STATE_CHANGE', (isConnected: boolean) => { serverEvent.emit(Date.now(), 'CLIENT_CONNECTIVITY_STATUS_CHANGE', { isConnected, }); }); + // Trigger a retry if client connection failed if (serviceInstances.clientGatewayService.errorCount !== 0) { serviceInstances.clientGatewayService.testGateway().catch(console.error); } - // Get user's specified history snapshot current history. - serviceInstances.historyService.getHistory({snapshot: historySnapshot}, (snapshot, error) => { - const {timestamps: lastTimestamps} = snapshot || {timestamps: []}; - const lastTimestamp = lastTimestamps[lastTimestamps.length - 1]; + // Transfer history + await serviceInstances.historyService.getHistory({snapshot: historySnapshot}).then( + (snapshot) => { + const {timestamps: lastTimestamps} = snapshot || {timestamps: []}; + const lastTimestamp = lastTimestamps[lastTimestamps.length - 1]; - if (error == null && snapshot != null && lastTimestamp != null) { - serverEvent.emit(lastTimestamp, 'TRANSFER_HISTORY_FULL_UPDATE', snapshot); - } else { - const fallbackHistory: TransferHistory = { - download: [0], - upload: [0], - timestamps: [Date.now()], - }; - serverEvent.emit(Date.now(), 'TRANSFER_HISTORY_FULL_UPDATE', fallbackHistory); - } + if (snapshot != null && lastTimestamp != null) { + serverEvent.emit(lastTimestamp, 'TRANSFER_HISTORY_FULL_UPDATE', snapshot); + } else { + const fallbackHistory: TransferHistory = { + download: [0], + upload: [0], + timestamps: [Date.now()], + }; + serverEvent.emit(Date.now(), 'TRANSFER_HISTORY_FULL_UPDATE', fallbackHistory); + } + }, + () => undefined, + ); + + // Disk usage + const disks = DiskUsage.getDiskUsage(); + serverEvent.emit(disks.id, 'DISK_USAGE_CHANGE', disks.disks); + handleEvents(DiskUsage, 'DISK_USAGE_CHANGE', (diskUsageChange: DiskUsageSummary) => { + serverEvent.emit(diskUsageChange.id, 'DISK_USAGE_CHANGE', diskUsageChange.disks); }); - // Add user's specified history snapshot change event listener. - handleEvents(serviceInstances.notificationService, 'NOTIFICATION_COUNT_CHANGE', (payload) => { - const {data, id} = payload; - serverEvent.emit(id, 'NOTIFICATION_COUNT_CHANGE', data); - }); - - // Add diff event listeners. + // Torrent list + const torrentList = (await fetchTorrentList) || serviceInstances.torrentService.getTorrentListSummary(); + serverEvent.emit(torrentList.id, 'TORRENT_LIST_FULL_UPDATE', torrentList.torrents); handleEvents( serviceInstances.torrentService, 'TORRENT_LIST_DIFF_CHANGE', @@ -109,11 +97,17 @@ export default async (req: Request { const {diff, id} = payload; serverEvent.emit(id, 'TAXONOMY_DIFF_CHANGE', diff); }); + // Transfer summary + const transferSummary = serviceInstances.historyService.getTransferSummary(); + serverEvent.emit(transferSummary.id, 'TRANSFER_SUMMARY_FULL_UPDATE', transferSummary.transferSummary); handleEvents( serviceInstances.historyService, 'TRANSFER_SUMMARY_DIFF_CHANGE', @@ -121,4 +115,15 @@ export default async (req: Request { + const {data, id} = payload; + serverEvent.emit(id, 'NOTIFICATION_COUNT_CHANGE', data); + }); }; diff --git a/server/models/HistoryEra.ts b/server/models/HistoryEra.ts index 58734a02..a74ba7b7 100644 --- a/server/models/HistoryEra.ts +++ b/server/models/HistoryEra.ts @@ -1,7 +1,7 @@ import type {UserInDatabase} from '@shared/schema/Auth'; import type {TransferData, TransferSnapshot} from '@shared/types/TransferData'; -import Datastore from 'nedb'; +import Datastore from 'nedb-promises'; import path from 'path'; import config from '../../config'; @@ -19,20 +19,40 @@ const CUMULATIVE_DATA_BUFFER_DIFF = 500; // 500 milliseconds class HistoryEra { data = []; - ready = false; + ready: Promise; lastUpdate = 0; startedAt = Date.now(); opts: HistoryEraOpts; - db: Datastore; + db: Datastore; autoCleanupInterval?: NodeJS.Timeout; nextEraUpdateInterval?: NodeJS.Timeout; constructor(user: UserInDatabase, opts: HistoryEraOpts) { this.opts = opts; - this.db = this.loadDatabase(user._id, opts.name); + this.db = Datastore.create({ + autoload: true, + filename: path.join(config.dbPath, user._id, 'history', `${opts.name}.db`), + }); + this.ready = this.prepareDatabase(); + } - this.setLastUpdate(this.db); - this.removeOutdatedData(this.db); + private async prepareDatabase(): Promise { + let lastUpdate = 0; + + await this.db.find({}).then( + (snapshots) => { + snapshots.forEach((snapshot) => { + if (snapshot.timestamp > lastUpdate) { + lastUpdate = snapshot.timestamp; + } + }); + + this.lastUpdate = lastUpdate; + }, + () => undefined, + ); + + await this.removeOutdatedData(); let cleanupInterval = this.opts.maxTime; @@ -40,104 +60,100 @@ class HistoryEra { cleanupInterval = config.dbCleanInterval; } - this.autoCleanupInterval = setInterval(this.cleanup, cleanupInterval); + this.autoCleanupInterval = setInterval(this.removeOutdatedData, cleanupInterval); } - loadDatabase(userId: UserInDatabase['_id'], dbName: string) { - const db = new Datastore({ - autoload: true, - filename: path.join(config.dbPath, userId, 'history', `${dbName}.db`), - }); + private removeOutdatedData = async (): Promise => { + if (this.opts.maxTime > 0) { + const minTimestamp = Date.now() - this.opts.maxTime; + return this.db.remove({timestamp: {$lt: minTimestamp}}, {multi: true}).then( + () => undefined, + () => undefined, + ); + } + }; - this.ready = true; - return db; - } - - addData(data: TransferData) { - if (!this.ready) { - console.error('database is not ready'); + private updateNextEra = async (): Promise => { + if (this.opts.nextEraUpdateInterval == null) { return; } + const minTimestamp = Date.now() - this.opts.nextEraUpdateInterval; + + return this.db + .find({timestamp: {$gte: minTimestamp}}) + .then((snapshots) => { + if (this.opts.nextEra == null) { + return; + } + + let downTotal = 0; + let upTotal = 0; + + snapshots.forEach((snapshot) => { + downTotal += Number(snapshot.download); + upTotal += Number(snapshot.upload); + }); + + this.opts.nextEra.addData({ + download: Number(Number(downTotal / snapshots.length).toFixed(1)), + upload: Number(Number(upTotal / snapshots.length).toFixed(1)), + }); + }); + }; + + async addData(data: TransferData): Promise { + await this.ready; + const currentTime = Date.now(); if (currentTime - this.lastUpdate >= this.opts.interval - CUMULATIVE_DATA_BUFFER_DIFF) { this.lastUpdate = currentTime; - this.db.insert({ - timestamp: currentTime, - ...data, - }); + await this.db + .insert({ + timestamp: currentTime, + ...data, + }) + .catch(() => undefined); } else { - this.db.find({timestamp: this.lastUpdate}, (err: Error, snapshots: Array) => { - if (err) { - return; - } + await this.db + .find({timestamp: this.lastUpdate}) + .then( + async (snapshots) => { + if (snapshots.length !== 0) { + const snapshot = snapshots[0]; + const numUpdates = snapshot.numUpdates || 1; - if (snapshots.length !== 0) { - const snapshot = snapshots[0]; - const numUpdates = snapshot.numUpdates || 1; + // calculate average and update + const updatedSnapshot: TransferSnapshot = { + timestamp: this.lastUpdate, + upload: Number(((snapshot.upload * numUpdates + data.upload) / (numUpdates + 1)).toFixed(1)), + download: Number(((snapshot.download * numUpdates + data.download) / (numUpdates + 1)).toFixed(1)), + numUpdates: numUpdates + 1, + }; - // calculate average and update - const updatedSnapshot: TransferSnapshot = { - timestamp: this.lastUpdate, - upload: Number(((snapshot.upload * numUpdates + data.upload) / (numUpdates + 1)).toFixed(1)), - download: Number(((snapshot.download * numUpdates + data.download) / (numUpdates + 1)).toFixed(1)), - numUpdates: numUpdates + 1, - }; - - this.db.update({timestamp: this.lastUpdate}, updatedSnapshot); - } - }); + await this.db.update({timestamp: this.lastUpdate}, updatedSnapshot).catch(() => undefined); + } + }, + () => undefined, + ); } } - cleanup = () => { - this.removeOutdatedData(this.db); - this.db.persistence.compactDatafile(); - }; + async getData(): Promise { + await this.ready; - getData(callback: (snapshots: Array | null, error?: Error) => void) { const minTimestamp = Date.now() - this.opts.maxTime; - this.db - .find({timestamp: {$gte: minTimestamp}}) + return this.db + .find({timestamp: {$gte: minTimestamp}}) .sort({timestamp: 1}) - .exec((err, snapshots: Array) => { - if (err) { - callback(null, err); - return; - } - - callback(snapshots.slice(snapshots.length - config.maxHistoryStates)); - }); + .then((snapshots) => snapshots.slice(snapshots.length - config.maxHistoryStates)); } - removeOutdatedData(db: this['db']) { - if (this.opts.maxTime > 0) { - const minTimestamp = Date.now() - this.opts.maxTime; - db.remove({timestamp: {$lt: minTimestamp}}, {multi: true}); - } - } + async setNextEra(nextEra: HistoryEra): Promise { + await this.ready; - setLastUpdate(db: this['db']): void { - let lastUpdate = 0; - - db.find({}, (err: Error, snapshots: Array) => { - if (err) { - return; - } - - snapshots.forEach((snapshot) => { - if (snapshot.timestamp > lastUpdate) { - lastUpdate = snapshot.timestamp; - } - }); - - this.lastUpdate = lastUpdate; - }); - } - - setNextEra(nextEra: HistoryEra): void { this.opts.nextEra = nextEra; let {nextEraUpdateInterval} = this.opts; @@ -150,33 +166,6 @@ class HistoryEra { this.nextEraUpdateInterval = setInterval(this.updateNextEra, nextEraUpdateInterval); } } - - updateNextEra = (): void => { - if (this.opts.nextEraUpdateInterval == null) { - return; - } - - const minTimestamp = Date.now() - this.opts.nextEraUpdateInterval; - - this.db.find({timestamp: {$gte: minTimestamp}}, (err: Error, snapshots: Array) => { - if (err || this.opts.nextEra == null) { - return; - } - - let downTotal = 0; - let upTotal = 0; - - snapshots.forEach((snapshot) => { - downTotal += Number(snapshot.download); - upTotal += Number(snapshot.upload); - }); - - this.opts.nextEra.addData({ - download: Number(Number(downTotal / snapshots.length).toFixed(1)), - upload: Number(Number(upTotal / snapshots.length).toFixed(1)), - }); - }); - }; } export default HistoryEra; diff --git a/server/routes/api/index.ts b/server/routes/api/index.ts index 7285279e..8aeece39 100644 --- a/server/routes/api/index.ts +++ b/server/routes/api/index.ts @@ -50,8 +50,24 @@ router.get('/directory-list', (req, r }); }); +/** + * GET /api/history + * @summary Gets transfer history in the given interval + * @tags Flood + * @security User + * @param {HistorySnapshot} snapshot.query - interval + * @return {TransferHistory} 200 - success response - application/json + * @return {Error} 500 - failure response - application/json + */ router.get('/history', (req, res) => { - req.services?.historyService.getHistory(req.query, getResponseFn(res)); + req.services?.historyService.getHistory(req.query).then( + (snapshot) => { + res.json(snapshot); + }, + (err) => { + res.status(500).json(err); + }, + ); }); router.get('/notifications', (req, res) => { diff --git a/server/services/historyService.ts b/server/services/historyService.ts index 1c72be92..062218d7 100644 --- a/server/services/historyService.ts +++ b/server/services/historyService.ts @@ -93,19 +93,7 @@ class HistoryService extends BaseService { }; } - deferFetchTransferSummary(interval = config.torrentClientPollInterval) { - this.pollTimeout = setTimeout(this.fetchCurrentTransferSummary, interval); - } - - destroy() { - if (this.pollTimeout != null) { - clearTimeout(this.pollTimeout); - } - - super.destroy(); - } - - fetchCurrentTransferSummary = () => { + private fetchCurrentTransferSummary = () => { if (this.pollTimeout != null) { clearTimeout(this.pollTimeout); } @@ -116,36 +104,11 @@ class HistoryService extends BaseService { .catch(this.handleFetchTransferSummaryError); }; - getTransferSummary() { - return { - id: Date.now(), - transferSummary: this.transferSummary, - } as const; + private deferFetchTransferSummary(interval = config.torrentClientPollInterval) { + this.pollTimeout = setTimeout(this.fetchCurrentTransferSummary, interval); } - getHistory({snapshot}: {snapshot: HistorySnapshot}, callback: (data: TransferHistory | null, error?: Error) => void) { - this.snapshots[snapshot]?.getData((transferSnapshots, error) => { - if (error || transferSnapshots == null) { - callback(null, error); - return; - } - - callback( - transferSnapshots.reduce( - (history, transferSnapshot) => { - history.download.push(transferSnapshot.download); - history.upload.push(transferSnapshot.upload); - history.timestamps.push(transferSnapshot.timestamp); - - return history; - }, - {upload: [], download: [], timestamps: []} as TransferHistory, - ), - ); - }); - } - - handleFetchTransferSummarySuccess = (nextTransferSummary: TransferSummary) => { + private handleFetchTransferSummarySuccess = async (nextTransferSummary: TransferSummary): Promise => { const summaryDiff = jsonpatch.compare(this.transferSummary, nextTransferSummary); this.emit('TRANSFER_SUMMARY_DIFF_CHANGE', { @@ -155,7 +118,8 @@ class HistoryService extends BaseService { this.errorCount = 0; this.transferSummary = nextTransferSummary; - this.snapshots.FIVE_MINUTE.addData({ + + await this.snapshots.FIVE_MINUTE.addData({ upload: nextTransferSummary.upRate, download: nextTransferSummary.downRate, }); @@ -165,7 +129,7 @@ class HistoryService extends BaseService { this.emit('FETCH_TRANSFER_SUMMARY_SUCCESS'); }; - handleFetchTransferSummaryError = () => { + private handleFetchTransferSummaryError = () => { let nextInterval = config.torrentClientPollInterval; // If more than 2 consecutive errors have occurred, then we delay the next request. @@ -178,6 +142,36 @@ class HistoryService extends BaseService { this.emit('FETCH_TRANSFER_SUMMARY_ERROR'); }; + + destroy() { + if (this.pollTimeout != null) { + clearTimeout(this.pollTimeout); + } + + super.destroy(); + } + + getTransferSummary() { + return { + id: Date.now(), + transferSummary: this.transferSummary, + } as const; + } + + async getHistory({snapshot}: {snapshot: HistorySnapshot}): Promise { + return this.snapshots[snapshot]?.getData().then((transferSnapshots) => + transferSnapshots.reduce( + (history, transferSnapshot) => { + history.download.push(transferSnapshot.download); + history.upload.push(transferSnapshot.upload); + history.timestamps.push(transferSnapshot.timestamp); + + return history; + }, + {upload: [], download: [], timestamps: []} as TransferHistory, + ), + ); + } } export default HistoryService;