Merge pull request #105 from AnonymusRaccoon/fix/refresh

Fix/refresh
This commit is contained in:
Zoe Roux
2022-03-05 21:37:36 +01:00
committed by GitHub
5 changed files with 42 additions and 23 deletions
+5 -5
View File
@@ -64,7 +64,7 @@ data WorkerAPI mode = WorkerAPI
, trigger :: mode :- "trigger" :> Capture "id" PipelineId :>
QueryParam "WORKER_API_KEY" String :> Get '[JSON] NoContent
, error :: mode :- "error" :> Capture "id" PipelineId :>
QueryParam "WORKER_API_KEY" String :> ReqBody '[JSON] ErrorBody :> Post '[JSON] NoContent
QueryParam "WORKER_API_KEY" String :> QueryParam "disable" Bool :> ReqBody '[JSON] ErrorBody :> Post '[JSON] NoContent
, refresh :: mode :- "auth" :> Capture "service" Service :> "refresh" :> Capture "id" UserId :>
QueryParam "WORKER_API_KEY" String :> ReqBody '[JSON] RefreshBody :> Post '[JSON] NoContent
}
@@ -101,14 +101,14 @@ triggerHandler pId (Just key) = do
triggerHandler _ _ = throwError err403
errorHandler :: PipelineId -> Maybe String -> ErrorBody -> AppM NoContent
errorHandler pId (Just key) (ErrorBody msg) = do
errorHandler :: PipelineId -> Maybe String -> Maybe Bool -> ErrorBody -> AppM NoContent
errorHandler pId (Just key) maybeDisable (ErrorBody msg) = do
k <- liftIO $ envAsString "WORKER_API_KEY" ""
if k == key then do
errorPipeline' pId msg
errorPipeline' pId msg maybeDisable
return NoContent
else throwError err403
errorHandler _ _ _ = throwError err403
errorHandler _ _ _ _ = throwError err403
refreshHandler :: Service -> UserId -> Maybe String -> RefreshBody -> AppM NoContent
refreshHandler service uid (Just key) (RefreshBody at rt ex) = do
+19 -6
View File
@@ -151,8 +151,8 @@ triggerPipeline pId currTime =
, pipelineError = lit Nothing
}
errorPipeline :: PipelineId -> Text -> Update Int64
errorPipeline pId msg =
errorPipeline :: PipelineId -> Text -> Maybe Bool -> Update Int64
errorPipeline pId msg Nothing =
Update
{ target = pipelineSchema
, from = pure ()
@@ -160,7 +160,20 @@ errorPipeline pId msg =
, set = setter
, returning = NumberOfRowsAffected
}
where
setter = \from row -> row {
pipelineError = lit $ Just msg
}
where
setter = \from row -> row {
pipelineError = lit $ Just msg
}
errorPipeline pId msg (Just enabled) =
Update
{ target = pipelineSchema
, from = pure ()
, updateWhere = \_ o -> pipelineId o ==. lit pId
, set = setter
, returning = NumberOfRowsAffected
}
where
setter = \from row -> row {
pipelineError = lit $ Just msg
, pipelineEnabled = lit enabled
}
+2 -2
View File
@@ -31,5 +31,5 @@ triggerPipeline' pId = do
currTime <- liftIO getCurrentTime
runQuery $ update $ triggerPipeline pId currTime
errorPipeline' :: PipelineId -> Text -> AppM Int64
errorPipeline' pId msg = runQuery $ update $ errorPipeline pId msg
errorPipeline' :: PipelineId -> Text -> Maybe Bool ->AppM Int64
errorPipeline' pId msg mDisable = runQuery $ update $ errorPipeline pId msg (fmap not mDisable)
+5 -5
View File
@@ -21,7 +21,7 @@ export class Manager {
map((x: Pipeline) =>
this.createPipeline(x).pipe(
map((env: PipelineEnv) => [x, env]),
catchError(err => this.handlePipelineError(x, err)),
catchError(err => this.handlePipelineError(x, err, true)),
)
),
switchAll(),
@@ -33,7 +33,7 @@ export class Manager {
await new Runner(x).run(env);
fetch(`${process.env["WORKER_API_URL"]}/trigger/${x.id}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}`);
} catch (err) {
this.handlePipelineError(x, err);
this.handlePipelineError(x, err, false);
}
console.log(`Pipeline finished ${x.name}`)
}),
@@ -51,13 +51,13 @@ export class Manager {
console.log(`Creating an observable for the pipeline ${pipeline.name} - ${pipeline.type} (${pipeline.service})`);
return service.getAction(pipeline.type)(pipeline.params)
} catch (err) {
return this.handlePipelineError(pipeline, err);
return this.handlePipelineError(pipeline, err, true);
}
}
handlePipelineError(pipeline: Pipeline, error: Error): Observable<never> {
handlePipelineError(pipeline: Pipeline, error: Error, shouldDisable: boolean): Observable<never> {
console.error(`Unhandled exception while trying to listen for the pipeline ${pipeline.name} (type: ${pipeline.type?.toString()}).`, error)
fetch(`${process.env["WORKER_API_URL"]}/error/${pipeline.id}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}`, {
fetch(`${process.env["WORKER_API_URL"]}/error/${pipeline.id}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}&disable=${shouldDisable}`, {
method: "POST",
headers: {
"Content-Type": "application/json",
+11 -5
View File
@@ -26,17 +26,22 @@ export class Spotify extends BaseService {
private async _refreshIfNeeded(): Promise<void> {
if (Date.parse(this._pipeline.userData["Spotify"].expiresAt) >= Date.now() + 100_000)
return;
console.log("refreshing spotify")
console.table(this._pipeline.userData['Spotify'])
const ret = await this._spotify.refreshAccessToken();
const data = this._pipeline.userData["Spotify"];
data.accessToken = ret.body.access_token;
if (ret.body.refresh_token)
data.refreshToken = ret.body.refresh_token;
data.expiresAt = new Date(Date.now() + ret.body.expires_in * 1000).toISOString();
this._spotify.setRefreshToken(data.refreshToken);
this._spotify.setAccessToken(data.accessToken);
fetch(`${process.env["WORKER_API_URL"]}/spotify/${this._pipeline.userId}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
accessToken: ret.body.access_token,
refreshToken: ret.body.refresh_token,
expiresAt: new Date(Date.now() + ret.body.expires_in),
}),
body: JSON.stringify(data),
});
}
@@ -44,6 +49,7 @@ export class Spotify extends BaseService {
listenAddToPlaylist(params: any): Observable<PipelineEnv> {
return Utils.longPulling(async since => {
await this._refreshIfNeeded();
console.log("pulling spotify")
let ret = await this._spotify.getPlaylistTracks(params.playlistId);
return ret.body.items
.filter(x => new Date(x.added_at) >= since)