diff --git a/server/services/Deluge/clientRequestManager.ts b/server/services/Deluge/clientRequestManager.ts index 6327894a..627e810b 100644 --- a/server/services/Deluge/clientRequestManager.ts +++ b/server/services/Deluge/clientRequestManager.ts @@ -1,4 +1,4 @@ -import {deflateSync, inflateSync} from 'zlib'; +import {deflate, inflate} from 'zlib'; import fs from 'fs'; import os from 'os'; import path from 'path'; @@ -34,11 +34,9 @@ class ClientRequestManager { private requestQueue: Record void, (err: Error) => void]> = {}; private rpc?: Promise; private rpcWithAuth?: Promise; - private rpcBuffer?: Buffer; - private rpcBufferSize = 0; private async receive(data: Buffer): Promise { - const response = decode(inflateSync(data)) as RencodableArray; + const response = decode(data) as RencodableArray; switch (response[0]) { case DelugeRpcResponseType.RESPONSE: { const [, request_id, return_value] = response; @@ -74,19 +72,26 @@ class ClientRequestManager { } const requestId = this.requestId++; - const payloadBuf = deflateSync(encode([[requestId, ...request]])); - - const {length} = payloadBuf; - if (length > 0xff_ff_ff_ff) { - throw new Error('Payload is too large.'); - } - - const lengthBuf = Buffer.alloc(4); - lengthBuf.writeUInt32BE(length, 0); return await new Promise((resolve, reject) => { - this.requestQueue[requestId] = [resolve, reject]; - rpc.write(Buffer.concat([protocolVerBuf, lengthBuf, payloadBuf])); + deflate(encode([[requestId, ...request]]), (err, payloadBuf) => { + if (err) { + reject(err); + return; + } + + const {length} = payloadBuf; + if (length > 0xff_ff_ff_ff) { + reject(new Error('Payload is too large.')); + return; + } + + const lengthBuf = Buffer.alloc(4); + lengthBuf.writeUInt32BE(length, 0); + + this.requestQueue[requestId] = [resolve, reject]; + rpc.write(Buffer.concat([protocolVerBuf, lengthBuf, payloadBuf])); + }); }); } @@ -100,8 +105,9 @@ class ClientRequestManager { this.requestId = 0; this.requestQueue = {}; - this.rpcBufferSize = 0; - this.rpcBuffer = undefined; + + let rpcBufferSize = 0; + let rpcBuffer: Buffer | null = null; const tlsSocket = tls.connect({ host: this.connectionSettings.host, @@ -110,33 +116,44 @@ class ClientRequestManager { rejectUnauthorized: false, }); - tlsSocket.on('error', (e) => { - this.rpcBuffer = undefined; - this.rpcBufferSize = 0; + const handleError = (e: Error) => { + tlsSocket.destroy(); + this.rpcWithAuth = this.rpc = Promise.reject(); reject(e); - }); + }; - tlsSocket.on('data', (chunk: Buffer) => { - if (this.rpcBuffer != null) { - this.rpcBuffer = Buffer.concat([this.rpcBuffer, chunk], this.rpcBufferSize); - } else { - if (chunk[0] !== DELUGE_RPC_PROTOCOL_VERSION) { - reject(new Error('Unexpected Deluge RPC version.')); - return; - } - - this.rpcBufferSize = chunk.slice(1, 5).readUInt32BE(0); - this.rpcBuffer = chunk.slice(5); - } - - if (this.rpcBuffer.length >= this.rpcBufferSize) { - this.receive(this.rpcBuffer); - this.rpcBufferSize = 0; - this.rpcBuffer = undefined; - } - }); + tlsSocket.once('error', handleError); + tlsSocket.once('close', handleError); tlsSocket.on('secureConnect', () => { + tlsSocket.on('data', (chunk: Buffer) => { + if (rpcBuffer != null) { + rpcBuffer = Buffer.concat([rpcBuffer, chunk], rpcBufferSize); + } else { + if (chunk[0] !== DELUGE_RPC_PROTOCOL_VERSION) { + handleError(new Error('Unexpected Deluge RPC version.')); + return; + } + + rpcBufferSize = chunk.slice(1, 5).readUInt32BE(0); + rpcBuffer = chunk.slice(5); + } + + if (rpcBuffer.length >= rpcBufferSize) { + const buf = rpcBuffer; + rpcBuffer = null; + rpcBufferSize = 0; + inflate(buf, (err, data) => { + if (err) { + handleError(err); + return; + } + + this.receive(data); + }); + } + }); + resolve(tlsSocket); }); });