Merge branch 'master' of github.com:AnonymusRaccoon/Aeris into github_login

This commit is contained in:
Clément Le Bihan
2022-03-06 15:08:22 +01:00
14 changed files with 131 additions and 74 deletions
+1 -1
View File
@@ -38,7 +38,7 @@ app.post("/workflow/:id", (req, res) => {
app.delete("/workflow/:id", (req, res) => {
console.log(`delete pipeline ${req.params.id}`);
pipelineEvent.emit("event", {
id: req.params.id,
id: parseInt(req.params.id),
type: PipelineType.Never,
});
res.send()
+1 -1
View File
@@ -54,7 +54,7 @@ export class BaseService {
getReaction(reaction: ReactionType): (params: any) => Promise<PipelineEnv> {
const metadata: ActionMetadata = BaseService._reactions[this.constructor.name][ReactionType[reaction]];
if (!metadata)
throw new TypeError(`Invalid reaction: ${action}`);
throw new TypeError(`Invalid reaction: ${reaction}`);
return this._runWithParamsCheck(metadata);
}
+8 -6
View File
@@ -73,11 +73,11 @@ export enum ReactionType {
ToggleFavourite,
UpdateAbout,
// Twitter
followUser,
postTweet,
replyToTweet,
likeTweet,
retweet
FollowUser,
PostTweet,
ReplyToTweet,
LikeTweet,
Retweet
};
export class Pipeline {
@@ -96,6 +96,7 @@ export class Token {
accessToken: string;
refreshToken: string;
expiresAt: string;
providerId: string;
};
export class Reaction {
@@ -131,7 +132,8 @@ export const pipelineFromApi = (data: any): Pipeline => {
{
accessToken: x.accessToken,
refreshToken: x.refreshToken,
expiresAt: x.expiresAt
expiresAt: x.expiresAt,
providerId: x.providerId,
} as Token
])),
};
-3
View File
@@ -26,8 +26,6 @@ export class Spotify extends BaseService {
private async _refreshIfNeeded(): Promise<void> {
if (Date.parse(this._pipeline.userData["Spotify"].expiresAt) >= Date.now() + 100_000)
return;
console.log("refreshing spotify")
console.table(this._pipeline.userData['Spotify'])
const ret = await this._spotify.refreshAccessToken();
const data = this._pipeline.userData["Spotify"];
data.accessToken = ret.body.access_token;
@@ -49,7 +47,6 @@ export class Spotify extends BaseService {
listenAddToPlaylist(params: any): Observable<PipelineEnv> {
return Utils.longPulling(async since => {
await this._refreshIfNeeded();
console.log("pulling spotify")
let ret = await this._spotify.getPlaylistTracks(params.playlistId);
return ret.body.items
.filter(x => new Date(x.added_at) >= since)
+53 -51
View File
@@ -1,46 +1,46 @@
import { exhaustMap, from, fromEventPattern, map, Observable } from "rxjs";
import { Pipeline, PipelineEnv, PipelineType, ReactionType, ServiceType } from "../models/pipeline";
import { ETwitterStreamEvent, TweetStream, TwitterApi } from "twitter-api-v2";
import { action, BaseService, reaction, service } from "../models/base-service";
import { Pipeline, PipelineEnv, ReactionType, ServiceType } from "../models/pipeline";
import { TwitterApi } from "twitter-api-v2";
import { BaseService, reaction, service } from "../models/base-service";
@service(ServiceType.Twitter)
export class Twitter extends BaseService {
constructor(_: Pipeline) {
private _twitter: TwitterApi;
private _pipeline: Pipeline;
constructor(pipeline: Pipeline) {
super();
this._pipeline = pipeline;
this._twitter = new TwitterApi(pipeline.userData["Twitter"].accessToken);
}
private static _createTwitter() {
return new TwitterApi(); ///TODO Get API KEY
private async _refreshIfNeeded(): Promise<void> {
if (Date.parse(this._pipeline.userData["Twitter"].expiresAt) >= Date.now() + 100_000)
return;
const ret = await (new TwitterApi({
clientId: process.env["TWITTER_CLIENT_ID"],
clientSecret: process.env["TWITTER_SECRET"],
})).refreshOAuth2Token(this._pipeline.userData["Twitter"].refreshToken);
const data = this._pipeline.userData["Twitter"];
this._twitter = ret.client;
data.accessToken = ret.accessToken;
if (ret.refreshToken)
data.refreshToken = ret.refreshToken;
data.expiresAt = new Date(Date.now() + ret.expiresIn * 1000).toISOString();
fetch(`${process.env["WORKER_API_URL"]}/twitter/${this._pipeline.userId}?WORKER_API_KEY=${process.env["WORKER_API_KEY"]}`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(data),
});
}
private static async _createStream(): Promise<TweetStream> {
const client: TwitterApi = this._createTwitter();
const stream = await client.v2.sampleStream();
stream.on(ETwitterStreamEvent.Connected, () => console.log('Stream is started.'));
stream.on(ETwitterStreamEvent.ConnectionError, err => console.log('Connection error!', err));
stream.on(ETwitterStreamEvent.ConnectionClosed, () => console.log('Connection has been closed.'));
return stream;
}
@action(PipelineType.OnTweet, [])
static listenTweet(params: any): Observable<PipelineEnv> {
return from(Twitter._createStream())
.pipe(
exhaustMap((stream: TweetStream) =>
fromEventPattern(
handler => stream.on(ETwitterStreamEvent.Data, handler),
() => stream.close()
)
)
);
}
@reaction(ReactionType.followUser, ['user_name'])
static async followUser(params: any): Promise<PipelineEnv> {
let client: TwitterApi = this._createTwitter();
let user = await client.v2.userByUsername(params['user_name']);
client.v2.follow((await client.currentUser()).id_str, user.data.id);
@reaction(ReactionType.FollowUser, ['user_name'])
async followUser(params: any): Promise<PipelineEnv> {
await this._refreshIfNeeded();
let user = await this._twitter.v2.userByUsername(params['user_name']);
const me = (await this._twitter.v2.me()).data.id;
this._twitter.v2.follow(me, user.data.id);
return {
FOLLOWED_ID: user.data.id,
FOLLOWED_NAME: user.data.name,
@@ -49,20 +49,20 @@ export class Twitter extends BaseService {
}
}
@reaction(ReactionType.postTweet, ['tweet_content'])
static async postTweet(params: any): Promise<PipelineEnv> {
let client: TwitterApi = this._createTwitter();
let tweet = await client.v2.tweet(params['tweet_content']);
@reaction(ReactionType.PostTweet, ['tweet_content'])
async postTweet(params: any): Promise<PipelineEnv> {
await this._refreshIfNeeded();
let tweet = await this._twitter.v2.tweet(params['tweet_content']);
return {
TWEET_ID: tweet.data.id,
TWEET_CONTENT: tweet.data.text,
}
}
@reaction(ReactionType.replyToTweet, ['tweet_id', 'reply_body'])
static async replyToTweet(params: any): Promise<PipelineEnv> {
let client: TwitterApi = this._createTwitter();
let reply = await client.v2.reply(
@reaction(ReactionType.ReplyToTweet, ['tweet_id', 'reply_body'])
async replyToTweet(params: any): Promise<PipelineEnv> {
await this._refreshIfNeeded();
let reply = await this._twitter.v2.reply(
params['reply_body'],
params['tweet_id'],
);
@@ -73,10 +73,12 @@ export class Twitter extends BaseService {
}
}
@reaction(ReactionType.likeTweet, ['tweet_id'])
static async likeTweet(params: any): Promise<PipelineEnv> {
let client: TwitterApi = this._createTwitter();
let tweet = (await client.v2.tweets([params['tweet_id']])).data[0];
@reaction(ReactionType.LikeTweet, ['tweet_id'])
async likeTweet(params: any): Promise<PipelineEnv> {
await this._refreshIfNeeded();
const me = (await this._twitter.v2.me()).data.id;
await this._twitter.v2.like(me, params['tweet_id']);
let tweet = (await this._twitter.v2.tweets([params['tweet_id']])).data[0];
return {
TWEET_ID: tweet.id,
TWEET_CONTENT: tweet.text,
@@ -84,10 +86,10 @@ export class Twitter extends BaseService {
}
}
@reaction(ReactionType.retweet, ['tweet_id'])
static async retweet(params: any): Promise<PipelineEnv> {
let client: TwitterApi = this._createTwitter();
let tweet = await client.v2.retweet((await client.currentUser()).id_str, params['tweet_id']);
@reaction(ReactionType.Retweet, ['tweet_id'])
async retweet(params: any): Promise<PipelineEnv> {
await this._refreshIfNeeded();
let tweet = await this._twitter.v2.retweet((await this._twitter.v2.me()).data.id, params['tweet_id']);
return {
TWEET_ID: params['tweet_id']
}
+5 -3
View File
@@ -40,12 +40,12 @@ export class Youtube extends BaseService {
});
}
@action(PipelineType.OnYtUpload, ["channel"])
@action(PipelineType.OnYtUpload, ["channel_id"])
listenChannel(params: any): Observable<PipelineEnv> {
return Utils.longPulling(async (since) => {
const ret = await this._youtube.activities.list({
part: ["snippet"],
channelId: params.channel,
part: ["snippet", "contentDetails"],
channelId: params.channel_id,
maxResults: 25,
publishedAfter: since.toISOString(),
}, {});
@@ -134,10 +134,12 @@ export class Youtube extends BaseService {
@reaction(ReactionType.YtAddToPlaylist, ["videoId", "playlistId"])
async reactPlaylist(params: any): Promise<PipelineEnv> {
await this._youtube.playlistItems.insert({
part: ["snippet"],
requestBody: {
snippet: {
resourceId: {
videoId: params.videoId,
kind: "youtube#video"
},
playlistId: params.playlistId,
},