Updating the worker for the api formta

This commit is contained in:
Zoe Roux
2022-03-02 16:59:32 +01:00
parent 428b9fbe71
commit 07b38036a3
3 changed files with 51 additions and 45 deletions
+7 -3
View File
@@ -14,6 +14,7 @@ export class Manager {
async run(): Promise<void> {
await lastValueFrom(this._pipelines
.pipe(
tap(x => console.log(`Found pipeline ${JSON.stringify(x)}`)),
filter(x => x.enabled),
groupBy((x: Pipeline) => x.id),
switchAll(),
@@ -27,7 +28,7 @@ export class Manager {
console.log(`Running pipeline ${x.name}`)
try {
await new Runner(x).run(env);
fetch(`${process.env["API_URL"]}/trigger/${x.id}?API_KEY=${process.env["API_KEY"]}`);
fetch(`${process.env["WORKER_API_URL"]}/trigger/${x.id}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}`);
} catch (err) {
this.handlePipelineError(x, err);
}
@@ -36,11 +37,14 @@ export class Manager {
}
createPipeline(pipeline: Pipeline): Observable<PipelineEnv> {
if (pipeline.type === PipelineType.Never)
if (pipeline.type === PipelineType.Never) {
console.log(`Deleting the pipeline ${pipeline.name}`);
return NEVER;
}
try {
const service = BaseService.createService(pipeline.service, pipeline);
console.log(`Creating an observable for the pipeline ${pipeline.name} - ${pipeline.type} (${pipeline.service})`);
return service.getAction(pipeline.type)(pipeline.params)
} catch (err) {
return this.handlePipelineError(pipeline, err);
@@ -49,7 +53,7 @@ export class Manager {
handlePipelineError(pipeline: Pipeline, error: Error): Observable<never> {
console.error(`Unhandled exception while trying to listen for the pipeline ${pipeline.name} (type: ${pipeline.type.toString()}).`, error)
fetch(`${process.env["API_URL"]}/error/${pipeline.id}?API_KEY=${process.env["API_KEY"]}`, {
fetch(`${process.env["WORKER_API_URL"]}/error/${pipeline.id}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}`, {
method: "POST",
body: JSON.stringify({error}),
});
+8 -6
View File
@@ -1,10 +1,10 @@
import { fromEvent, mergeAll, mergeWith, Observable } from "rxjs";
import { from, fromEvent, mergeAll, mergeWith, Observable } from "rxjs";
import { fromFetch } from 'rxjs/fetch';
import { Manager } from "./actions";
import "./services";
import fetch from 'node-fetch';
import AbortController from 'abort-controller';
import { Pipeline, PipelineType } from "./models/pipeline";
import { Pipeline, pipelineFromApi, PipelineType } from "./models/pipeline";
import { EventEmitter } from "events"
import express from "express";
@@ -15,21 +15,23 @@ global.AbortController = AbortController;
const app = express()
const pipelineEvent = new EventEmitter();
app.put("/workflow/:id", req => {
console.log(`edit pipeline ${req.params.id}`);
fetch(`${process.env["WORKER_API_URL"]}/workflow/${req.params.id}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}`)
.then(res => {
pipelineEvent.emit("event", res.json());
pipelineEvent.emit("event", pipelineFromApi(res.json()));
});
});
app.post("/workflow/:id", req => {
console.log(`new pipeline ${req.params.id}`);
fetch(`${process.env["WORKER_API_URL"]}/workflow/${req.params.id}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}`)
.then(res => {
pipelineEvent.emit("event", res.json());
pipelineEvent.emit("event", pipelineFromApi(res.json()));
});
});
app.delete("/workflow/:id", req => {
console.log(`delete pipeline ${req.params.id}`);
pipelineEvent.emit("event", {
id: req.params.id,
type: PipelineType.Never,
@@ -39,7 +41,7 @@ app.delete("/workflow/:id", req => {
app.listen(5000);
const pipelines = fromFetch<Pipeline[]>(`${process.env["WORKER_API_URL"]}/workflows?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}`, {selector: x => x.json()})
const pipelines = fromFetch<Pipeline[]>(`${process.env["WORKER_API_URL"]}/workflows?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}`, {selector: async x => (await x.json()).map((y: any) => pipelineFromApi(y))})
.pipe(
mergeAll(),
mergeWith(fromEvent(pipelineEvent, "event")),
+36 -36
View File
@@ -37,37 +37,6 @@ export enum PipelineType {
OnDiscordGuildLeave,
};
export enum ReactionType {
Tweet,
// Youtube reactions
@@ -113,16 +82,19 @@ export class Pipeline {
type: PipelineType;
name: string;
params: {[key: string]: string};
userData: {[key: string]: any};
lastTrigger: Date;
triggerCount: number;
enabled: boolean;
userId: number;
userData: {[key: string]: Token};
reactions: [Reaction];
};
export class Token {
accessToken: string;
refreshToken: string;
expiresIn: string;
};
export class Reaction {
id: number;
service: ServiceType;
type: ReactionType;
params: {[key: string]: any};
@@ -131,3 +103,31 @@ export class Reaction {
export class PipelineEnv {
[key: string]: any;
};
export const pipelineFromApi = (data: any): Pipeline => {
const type: string = data.res.action.pType;
return {
id: data.res.action.id,
name: data.res.action.name,
service: ServiceType[type.substring(0, type.indexOf('_')) as keyof typeof ServiceType],
type: PipelineType[type.substring(type.indexOf('_') + 1) as keyof typeof PipelineType],
params: data.res.action.pParams,
enabled: data.res.action.enabled,
reactions: data.res.reactions.map((x: any) => ({
params: x.rParams,
service: ServiceType[x.rType.substring(0, x.rType.indexOf('_')) as keyof typeof ServiceType],
type: ReactionType[x.rType.substring(x.rType.indexOf('_') + 1) as keyof typeof ReactionType],
})),
userId: data.userData.userId,
userData: Object.fromEntries(data.userData.tokens.map((x: any) => [
x.service,
{
accessToken: x.accessToken,
refreshToken: x.refreshToken,
expiresIn: x.expiresIn
} as Token
])),
};
}