diff --git a/api/src/Api/Worker.hs b/api/src/Api/Worker.hs index 486522f..d701889 100644 --- a/api/src/Api/Worker.hs +++ b/api/src/Api/Worker.hs @@ -64,7 +64,7 @@ data WorkerAPI mode = WorkerAPI , trigger :: mode :- "trigger" :> Capture "id" PipelineId :> QueryParam "WORKER_API_KEY" String :> Get '[JSON] NoContent , error :: mode :- "error" :> Capture "id" PipelineId :> - QueryParam "WORKER_API_KEY" String :> ReqBody '[JSON] ErrorBody :> Post '[JSON] NoContent + QueryParam "WORKER_API_KEY" String :> QueryParam "disable" Bool :> ReqBody '[JSON] ErrorBody :> Post '[JSON] NoContent , refresh :: mode :- "auth" :> Capture "service" Service :> "refresh" :> Capture "id" UserId :> QueryParam "WORKER_API_KEY" String :> ReqBody '[JSON] RefreshBody :> Post '[JSON] NoContent } @@ -101,14 +101,14 @@ triggerHandler pId (Just key) = do triggerHandler _ _ = throwError err403 -errorHandler :: PipelineId -> Maybe String -> ErrorBody -> AppM NoContent -errorHandler pId (Just key) (ErrorBody msg) = do +errorHandler :: PipelineId -> Maybe String -> Maybe Bool -> ErrorBody -> AppM NoContent +errorHandler pId (Just key) maybeDisable (ErrorBody msg) = do k <- liftIO $ envAsString "WORKER_API_KEY" "" if k == key then do - errorPipeline' pId msg + errorPipeline' pId msg maybeDisable return NoContent else throwError err403 -errorHandler _ _ _ = throwError err403 +errorHandler _ _ _ _ = throwError err403 refreshHandler :: Service -> UserId -> Maybe String -> RefreshBody -> AppM NoContent refreshHandler service uid (Just key) (RefreshBody at rt ex) = do diff --git a/api/src/Db/Pipeline.hs b/api/src/Db/Pipeline.hs index c2c4553..73a6827 100644 --- a/api/src/Db/Pipeline.hs +++ b/api/src/Db/Pipeline.hs @@ -151,8 +151,8 @@ triggerPipeline pId currTime = , pipelineError = lit Nothing } -errorPipeline :: PipelineId -> Text -> Update Int64 -errorPipeline pId msg = +errorPipeline :: PipelineId -> Text -> Maybe Bool -> Update Int64 +errorPipeline pId msg Nothing = Update { target = pipelineSchema , from = pure () @@ -160,7 +160,20 @@ errorPipeline pId msg = , set = setter , returning = NumberOfRowsAffected } - where - setter = \from row -> row { - pipelineError = lit $ Just msg - } + where + setter = \from row -> row { + pipelineError = lit $ Just msg + } +errorPipeline pId msg (Just enabled) = + Update + { target = pipelineSchema + , from = pure () + , updateWhere = \_ o -> pipelineId o ==. lit pId + , set = setter + , returning = NumberOfRowsAffected + } + where + setter = \from row -> row { + pipelineError = lit $ Just msg + , pipelineEnabled = lit enabled + } diff --git a/api/src/Repository/Pipeline.hs b/api/src/Repository/Pipeline.hs index 6d54ea0..3670a73 100644 --- a/api/src/Repository/Pipeline.hs +++ b/api/src/Repository/Pipeline.hs @@ -31,5 +31,5 @@ triggerPipeline' pId = do currTime <- liftIO getCurrentTime runQuery $ update $ triggerPipeline pId currTime -errorPipeline' :: PipelineId -> Text -> AppM Int64 -errorPipeline' pId msg = runQuery $ update $ errorPipeline pId msg \ No newline at end of file +errorPipeline' :: PipelineId -> Text -> Maybe Bool ->AppM Int64 +errorPipeline' pId msg mDisable = runQuery $ update $ errorPipeline pId msg (fmap not mDisable) \ No newline at end of file diff --git a/worker/src/actions.ts b/worker/src/actions.ts index 3dd25db..1d9a5f8 100644 --- a/worker/src/actions.ts +++ b/worker/src/actions.ts @@ -21,7 +21,7 @@ export class Manager { map((x: Pipeline) => this.createPipeline(x).pipe( map((env: PipelineEnv) => [x, env]), - catchError(err => this.handlePipelineError(x, err)), + catchError(err => this.handlePipelineError(x, err, true)), ) ), switchAll(), @@ -33,7 +33,7 @@ export class Manager { await new Runner(x).run(env); fetch(`${process.env["WORKER_API_URL"]}/trigger/${x.id}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}`); } catch (err) { - this.handlePipelineError(x, err); + this.handlePipelineError(x, err, false); } console.log(`Pipeline finished ${x.name}`) }), @@ -51,13 +51,13 @@ export class Manager { console.log(`Creating an observable for the pipeline ${pipeline.name} - ${pipeline.type} (${pipeline.service})`); return service.getAction(pipeline.type)(pipeline.params) } catch (err) { - return this.handlePipelineError(pipeline, err); + return this.handlePipelineError(pipeline, err, true); } } - handlePipelineError(pipeline: Pipeline, error: Error): Observable { + handlePipelineError(pipeline: Pipeline, error: Error, shouldDisable: boolean): Observable { console.error(`Unhandled exception while trying to listen for the pipeline ${pipeline.name} (type: ${pipeline.type?.toString()}).`, error) - fetch(`${process.env["WORKER_API_URL"]}/error/${pipeline.id}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}`, { + fetch(`${process.env["WORKER_API_URL"]}/error/${pipeline.id}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}&disable=${shouldDisable}`, { method: "POST", headers: { "Content-Type": "application/json", diff --git a/worker/src/services/spotify.ts b/worker/src/services/spotify.ts index bbe618b..5d18581 100644 --- a/worker/src/services/spotify.ts +++ b/worker/src/services/spotify.ts @@ -26,17 +26,22 @@ export class Spotify extends BaseService { private async _refreshIfNeeded(): Promise { if (Date.parse(this._pipeline.userData["Spotify"].expiresAt) >= Date.now() + 100_000) return; + console.log("refreshing spotify") + console.table(this._pipeline.userData['Spotify']) const ret = await this._spotify.refreshAccessToken(); + const data = this._pipeline.userData["Spotify"]; + data.accessToken = ret.body.access_token; + if (ret.body.refresh_token) + data.refreshToken = ret.body.refresh_token; + data.expiresAt = new Date(Date.now() + ret.body.expires_in * 1000).toISOString(); + this._spotify.setRefreshToken(data.refreshToken); + this._spotify.setAccessToken(data.accessToken); fetch(`${process.env["WORKER_API_URL"]}/spotify/${this._pipeline.userId}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}`, { method: "POST", headers: { "Content-Type": "application/json", }, - body: JSON.stringify({ - accessToken: ret.body.access_token, - refreshToken: ret.body.refresh_token, - expiresAt: new Date(Date.now() + ret.body.expires_in), - }), + body: JSON.stringify(data), }); } @@ -44,6 +49,7 @@ export class Spotify extends BaseService { listenAddToPlaylist(params: any): Observable { return Utils.longPulling(async since => { await this._refreshIfNeeded(); + console.log("pulling spotify") let ret = await this._spotify.getPlaylistTracks(params.playlistId); return ret.body.items .filter(x => new Date(x.added_at) >= since)