diff --git a/worker/src/actions.ts b/worker/src/actions.ts index 3dd25db..1d9a5f8 100644 --- a/worker/src/actions.ts +++ b/worker/src/actions.ts @@ -21,7 +21,7 @@ export class Manager { map((x: Pipeline) => this.createPipeline(x).pipe( map((env: PipelineEnv) => [x, env]), - catchError(err => this.handlePipelineError(x, err)), + catchError(err => this.handlePipelineError(x, err, true)), ) ), switchAll(), @@ -33,7 +33,7 @@ export class Manager { await new Runner(x).run(env); fetch(`${process.env["WORKER_API_URL"]}/trigger/${x.id}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}`); } catch (err) { - this.handlePipelineError(x, err); + this.handlePipelineError(x, err, false); } console.log(`Pipeline finished ${x.name}`) }), @@ -51,13 +51,13 @@ export class Manager { console.log(`Creating an observable for the pipeline ${pipeline.name} - ${pipeline.type} (${pipeline.service})`); return service.getAction(pipeline.type)(pipeline.params) } catch (err) { - return this.handlePipelineError(pipeline, err); + return this.handlePipelineError(pipeline, err, true); } } - handlePipelineError(pipeline: Pipeline, error: Error): Observable { + handlePipelineError(pipeline: Pipeline, error: Error, shouldDisable: boolean): Observable { console.error(`Unhandled exception while trying to listen for the pipeline ${pipeline.name} (type: ${pipeline.type?.toString()}).`, error) - fetch(`${process.env["WORKER_API_URL"]}/error/${pipeline.id}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}`, { + fetch(`${process.env["WORKER_API_URL"]}/error/${pipeline.id}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}&disable=${shouldDisable}`, { method: "POST", headers: { "Content-Type": "application/json", diff --git a/worker/src/services/spotify.ts b/worker/src/services/spotify.ts index bbe618b..5d18581 100644 --- a/worker/src/services/spotify.ts +++ b/worker/src/services/spotify.ts @@ -26,17 +26,22 @@ export class Spotify extends BaseService { private async _refreshIfNeeded(): Promise { if (Date.parse(this._pipeline.userData["Spotify"].expiresAt) >= Date.now() + 100_000) return; + console.log("refreshing spotify") + console.table(this._pipeline.userData['Spotify']) const ret = await this._spotify.refreshAccessToken(); + const data = this._pipeline.userData["Spotify"]; + data.accessToken = ret.body.access_token; + if (ret.body.refresh_token) + data.refreshToken = ret.body.refresh_token; + data.expiresAt = new Date(Date.now() + ret.body.expires_in * 1000).toISOString(); + this._spotify.setRefreshToken(data.refreshToken); + this._spotify.setAccessToken(data.accessToken); fetch(`${process.env["WORKER_API_URL"]}/spotify/${this._pipeline.userId}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}`, { method: "POST", headers: { "Content-Type": "application/json", }, - body: JSON.stringify({ - accessToken: ret.body.access_token, - refreshToken: ret.body.refresh_token, - expiresAt: new Date(Date.now() + ret.body.expires_in), - }), + body: JSON.stringify(data), }); } @@ -44,6 +49,7 @@ export class Spotify extends BaseService { listenAddToPlaylist(params: any): Observable { return Utils.longPulling(async since => { await this._refreshIfNeeded(); + console.log("pulling spotify") let ret = await this._spotify.getPlaylistTracks(params.playlistId); return ret.body.items .filter(x => new Date(x.added_at) >= since)