1
0
forked from mirror/misskey

ローカルタイムラインとグローバルタイムラインを実装

This commit is contained in:
syuilo 2018-04-17 14:52:28 +09:00
parent 06535a37b5
commit a0e640b118
19 changed files with 599 additions and 183 deletions

View File

@ -9,11 +9,12 @@ import Connection from './scripts/streaming/stream';
import { HomeStreamManager } from './scripts/streaming/home';
import { DriveStreamManager } from './scripts/streaming/drive';
import { ServerStreamManager } from './scripts/streaming/server';
import { RequestsStreamManager } from './scripts/streaming/requests';
import { MessagingIndexStreamManager } from './scripts/streaming/messaging-index';
import { OthelloStreamManager } from './scripts/streaming/othello';
import Err from '../common/views/components/connect-failed.vue';
import { LocalTimelineStreamManager } from './scripts/streaming/local-timeline';
import { GlobalTimelineStreamManager } from './scripts/streaming/global-timeline';
//#region api requests
let spinner = null;
@ -116,15 +117,17 @@ export default class MiOS extends EventEmitter {
* Connection managers
*/
public streams: {
localTimelineStream: LocalTimelineStreamManager;
globalTimelineStream: GlobalTimelineStreamManager;
driveStream: DriveStreamManager;
serverStream: ServerStreamManager;
requestsStream: RequestsStreamManager;
messagingIndexStream: MessagingIndexStreamManager;
othelloStream: OthelloStreamManager;
} = {
localTimelineStream: null,
globalTimelineStream: null,
driveStream: null,
serverStream: null,
requestsStream: null,
messagingIndexStream: null,
othelloStream: null
};
@ -231,13 +234,14 @@ export default class MiOS extends EventEmitter {
public async init(callback) {
//#region Init stream managers
this.streams.serverStream = new ServerStreamManager(this);
this.streams.requestsStream = new RequestsStreamManager(this);
this.once('signedin', () => {
// Init home stream manager
this.stream = new HomeStreamManager(this, this.i);
// Init other stream manager
this.streams.localTimelineStream = new LocalTimelineStreamManager(this, this.i);
this.streams.globalTimelineStream = new GlobalTimelineStreamManager(this, this.i);
this.streams.driveStream = new DriveStreamManager(this, this.i);
this.streams.messagingIndexStream = new MessagingIndexStreamManager(this, this.i);
this.streams.othelloStream = new OthelloStreamManager(this, this.i);

View File

@ -0,0 +1,34 @@
import Stream from './stream';
import StreamManager from './stream-manager';
import MiOS from '../../mios';
/**
* Global timeline stream connection
*/
export class GlobalTimelineStream extends Stream {
constructor(os: MiOS, me) {
super(os, 'global-timeline', {
i: me.token
});
}
}
export class GlobalTimelineStreamManager extends StreamManager<GlobalTimelineStream> {
private me;
private os: MiOS;
constructor(os: MiOS, me) {
super();
this.me = me;
this.os = os;
}
public getConnection() {
if (this.connection == null) {
this.connection = new GlobalTimelineStream(this.os, this.me);
}
return this.connection;
}
}

View File

@ -0,0 +1,34 @@
import Stream from './stream';
import StreamManager from './stream-manager';
import MiOS from '../../mios';
/**
* Local timeline stream connection
*/
export class LocalTimelineStream extends Stream {
constructor(os: MiOS, me) {
super(os, 'local-timeline', {
i: me.token
});
}
}
export class LocalTimelineStreamManager extends StreamManager<LocalTimelineStream> {
private me;
private os: MiOS;
constructor(os: MiOS, me) {
super();
this.me = me;
this.os = os;
}
public getConnection() {
if (this.connection == null) {
this.connection = new LocalTimelineStream(this.os, this.me);
}
return this.connection;
}
}

View File

@ -1,30 +0,0 @@
import Stream from './stream';
import StreamManager from './stream-manager';
import MiOS from '../../mios';
/**
* Requests stream connection
*/
export class RequestsStream extends Stream {
constructor(os: MiOS) {
super(os, 'requests');
}
}
export class RequestsStreamManager extends StreamManager<RequestsStream> {
private os: MiOS;
constructor(os: MiOS) {
super();
this.os = os;
}
public getConnection() {
if (this.connection == null) {
this.connection = new RequestsStream(this.os);
}
return this.connection;
}
}

View File

@ -0,0 +1,175 @@
<template>
<div class="mk-home-timeline">
<mk-friends-maker v-if="src == 'home' && alone"/>
<div class="fetching" v-if="fetching">
<mk-ellipsis-icon/>
</div>
<p class="empty" v-if="notes.length == 0 && !fetching">
%fa:R comments%%i18n:@empty%
</p>
<mk-notes :notes="notes" ref="timeline">
<button slot="footer" @click="more" :disabled="moreFetching" :style="{ cursor: moreFetching ? 'wait' : 'pointer' }">
<template v-if="!moreFetching">%i18n:@load-more%</template>
<template v-if="moreFetching">%fa:spinner .pulse .fw%</template>
</button>
</mk-notes>
</div>
</template>
<script lang="ts">
import Vue from 'vue';
import { url } from '../../../config';
export default Vue.extend({
props: {
src: {
type: String,
required: true
}
},
data() {
return {
fetching: true,
moreFetching: false,
existMore: false,
notes: [],
connection: null,
connectionId: null,
date: null
};
},
computed: {
alone(): boolean {
return (this as any).os.i.followingCount == 0;
},
stream(): any {
return this.src == 'home'
? (this as any).os.stream
: this.src == 'local'
? (this as any).os.streams.localTimelineStream
: (this as any).os.streams.globalTimelineStream;
},
endpoint(): string {
return this.src == 'home'
? 'notes/timeline'
: this.src == 'local'
? 'notes/local-timeline'
: 'notes/global-timeline';
}
},
mounted() {
this.connection = this.stream.getConnection();
this.connectionId = this.stream.use();
this.connection.on('note', this.onNote);
if (this.src == 'home') {
this.connection.on('follow', this.onChangeFollowing);
this.connection.on('unfollow', this.onChangeFollowing);
}
this.fetch();
},
beforeDestroy() {
this.connection.off('note', this.onNote);
if (this.src == 'home') {
this.connection.off('follow', this.onChangeFollowing);
this.connection.off('unfollow', this.onChangeFollowing);
}
this.stream.dispose(this.connectionId);
},
methods: {
fetch(cb?) {
this.fetching = true;
(this as any).api('notes/timeline', {
limit: 11,
untilDate: this.date ? this.date.getTime() : undefined
}).then(notes => {
if (notes.length == 11) {
notes.pop();
this.existMore = true;
}
this.notes = notes;
this.fetching = false;
this.$emit('loaded');
if (cb) cb();
});
},
more() {
if (this.moreFetching || this.fetching || this.notes.length == 0 || !this.existMore) return;
this.moreFetching = true;
(this as any).api('notes/timeline', {
limit: 11,
untilId: this.notes[this.notes.length - 1].id
}).then(notes => {
if (notes.length == 11) {
notes.pop();
} else {
this.existMore = false;
}
this.notes = this.notes.concat(notes);
this.moreFetching = false;
});
},
onNote(note) {
//
if ((this as any).os.isEnableSounds) {
const sound = new Audio(`${url}/assets/post.mp3`);
sound.volume = localStorage.getItem('soundVolume') ? parseInt(localStorage.getItem('soundVolume'), 10) / 100 : 0.5;
sound.play();
}
this.notes.unshift(note);
const isTop = window.scrollY > 8;
if (isTop) this.notes.pop();
},
onChangeFollowing() {
this.fetch();
},
focus() {
(this.$refs.timeline as any).focus();
},
warp(date) {
this.date = date;
this.fetch();
}
}
});
</script>
<style lang="stylus" scoped>
.mk-home-timeline
> .mk-friends-maker
border-bottom solid 1px #eee
> .fetching
padding 64px 0
> .empty
display block
margin 0 auto
padding 32px
max-width 400px
text-align center
color #999
> [data-fa]
display block
margin-bottom 16px
font-size 3em
color #ccc
</style>

View File

@ -1,169 +1,93 @@
<template>
<div class="mk-timeline">
<mk-friends-maker v-if="alone"/>
<div class="fetching" v-if="fetching">
<mk-ellipsis-icon/>
</div>
<p class="empty" v-if="notes.length == 0 && !fetching">
%fa:R comments%%i18n:@empty%
</p>
<mk-notes :notes="notes" ref="timeline">
<button slot="footer" @click="more" :disabled="moreFetching" :style="{ cursor: moreFetching ? 'wait' : 'pointer' }">
<template v-if="!moreFetching">%i18n:@load-more%</template>
<template v-if="moreFetching">%fa:spinner .pulse .fw%</template>
</button>
</mk-notes>
<header>
<span :data-is-active="src == 'home'" @click="src = 'home'">%fa:home% ホーム</span>
<span :data-is-active="src == 'local'" @click="src = 'local'">%fa:R comments% ローカル</span>
<span :data-is-active="src == 'global'" @click="src = 'global'">%fa:globe% グローバル</span>
</header>
<x-core v-if="src == 'home'" ref="tl" key="home" src="home"/>
<x-core v-if="src == 'local'" ref="tl" key="local" src="local"/>
<x-core v-if="src == 'global'" ref="tl" key="global" src="global"/>
</div>
</template>
<script lang="ts">
import Vue from 'vue';
import { url } from '../../../config';
import XCore from './timeline.core.vue';
export default Vue.extend({
components: {
XCore
},
data() {
return {
fetching: true,
moreFetching: false,
existMore: false,
notes: [],
connection: null,
connectionId: null,
date: null
src: 'home'
};
},
computed: {
alone(): boolean {
return (this as any).os.i.followingCount == 0;
}
},
mounted() {
this.connection = (this as any).os.stream.getConnection();
this.connectionId = (this as any).os.stream.use();
this.connection.on('note', this.onNote);
this.connection.on('follow', this.onChangeFollowing);
this.connection.on('unfollow', this.onChangeFollowing);
document.addEventListener('keydown', this.onKeydown);
window.addEventListener('scroll', this.onScroll);
this.fetch();
console.log(this.$refs.tl);
(this.$refs.tl as any).$once('loaded', () => {
this.$emit('loaded');
});
},
beforeDestroy() {
this.connection.off('note', this.onNote);
this.connection.off('follow', this.onChangeFollowing);
this.connection.off('unfollow', this.onChangeFollowing);
(this as any).os.stream.dispose(this.connectionId);
document.removeEventListener('keydown', this.onKeydown);
window.removeEventListener('scroll', this.onScroll);
},
methods: {
fetch(cb?) {
this.fetching = true;
(this as any).api('notes/timeline', {
limit: 11,
untilDate: this.date ? this.date.getTime() : undefined
}).then(notes => {
if (notes.length == 11) {
notes.pop();
this.existMore = true;
}
this.notes = notes;
this.fetching = false;
this.$emit('loaded');
if (cb) cb();
});
},
more() {
if (this.moreFetching || this.fetching || this.notes.length == 0 || !this.existMore) return;
this.moreFetching = true;
(this as any).api('notes/timeline', {
limit: 11,
untilId: this.notes[this.notes.length - 1].id
}).then(notes => {
if (notes.length == 11) {
notes.pop();
} else {
this.existMore = false;
}
this.notes = this.notes.concat(notes);
this.moreFetching = false;
});
},
onNote(note) {
//
if ((this as any).os.isEnableSounds) {
const sound = new Audio(`${url}/assets/post.mp3`);
sound.volume = localStorage.getItem('soundVolume') ? parseInt(localStorage.getItem('soundVolume'), 10) / 100 : 0.5;
sound.play();
}
this.notes.unshift(note);
const isTop = window.scrollY > 8;
if (isTop) this.notes.pop();
},
onChangeFollowing() {
this.fetch();
},
onScroll() {
if ((this as any).os.i.clientSettings.fetchOnScroll !== false) {
const current = window.scrollY + window.innerHeight;
if (current > document.body.offsetHeight - 8) this.more();
if (current > document.body.offsetHeight - 8) (this.$refs.tl as any).more();
}
},
onKeydown(e) {
if (e.target.tagName != 'INPUT' && e.target.tagName != 'TEXTAREA') {
if (e.which == 84) { // t
(this.$refs.timeline as any).focus();
(this.$refs.tl as any).focus();
}
}
},
warp(date) {
this.date = date;
this.fetch();
(this.$refs.tl as any).warp(date);
}
}
});
</script>
<style lang="stylus" scoped>
@import '~const.styl'
.mk-timeline
background #fff
border solid 1px rgba(0, 0, 0, 0.075)
border-radius 6px
> .mk-friends-maker
> header
padding 8px 16px
border-bottom solid 1px #eee
> .fetching
padding 64px 0
> span
margin-right 16px
line-height 27px
font-size 14px
color #555
> .empty
display block
margin 0 auto
padding 32px
max-width 400px
text-align center
color #999
&:not([data-is-active])
color $theme-color
cursor pointer
> [data-fa]
display block
margin-bottom 16px
font-size 3em
color #ccc
&:hover
text-decoration underline
</style>

View File

@ -2,12 +2,12 @@ import * as mongo from 'mongodb';
import db from '../db/mongodb';
const Mute = db.get<IMute>('mute');
Mute.createIndex(['muterId', 'muteeId'], { unique: true });
export default Mute;
export interface IMute {
_id: mongo.ObjectID;
createdAt: Date;
deletedAt: Date;
muterId: mongo.ObjectID;
muteeId: mongo.ObjectID;
}

View File

@ -45,6 +45,14 @@ class MisskeyEvent {
this.publish(`channel-stream:${channelId}`, type, typeof value === 'undefined' ? null : value);
}
public publishLocalTimelineStream(note: any): void {
this.redisClient.publish('misskey:local-timeline', JSON.stringify(note));
}
public publishGlobalTimelineStream(note: any): void {
this.redisClient.publish('misskey:global-timeline', JSON.stringify(note));
}
private publish(channel: string, type: string, value?: any): void {
const message = value == null ?
{ type: type } :
@ -58,16 +66,12 @@ const ev = new MisskeyEvent();
export default ev.publishUserStream.bind(ev);
export const publishLocalTimelineStream = ev.publishLocalTimelineStream.bind(ev);
export const publishGlobalTimelineStream = ev.publishGlobalTimelineStream.bind(ev);
export const publishDriveStream = ev.publishDriveStream.bind(ev);
export const publishNoteStream = ev.publishNoteStream.bind(ev);
export const publishMessagingStream = ev.publishMessagingStream.bind(ev);
export const publishMessagingIndexStream = ev.publishMessagingIndexStream.bind(ev);
export const publishOthelloStream = ev.publishOthelloStream.bind(ev);
export const publishOthelloGameStream = ev.publishOthelloGameStream.bind(ev);
export const publishChannelStream = ev.publishChannelStream.bind(ev);

View File

@ -463,6 +463,22 @@ const endpoints: Endpoint[] = [
max: 100
}
},
{
name: 'notes/local-timeline',
withCredential: true,
limit: {
duration: ms('10minutes'),
max: 100
}
},
{
name: 'notes/global-timeline',
withCredential: true,
limit: {
duration: ms('10minutes'),
max: 100
}
},
{
name: 'notes/mentions',
withCredential: true,

View File

@ -30,7 +30,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => {
}, {
fields: {
data: false,
'profile': false
profile: false
}
});
@ -41,8 +41,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => {
// Check if already muting
const exist = await Mute.findOne({
muterId: muter._id,
muteeId: mutee._id,
deletedAt: { $exists: false }
muteeId: mutee._id
});
if (exist !== null) {

View File

@ -7,10 +7,6 @@ import Mute from '../../../../models/mute';
/**
* Unmute a user
*
* @param {any} params
* @param {any} user
* @return {Promise<any>}
*/
module.exports = (params, user) => new Promise(async (res, rej) => {
const muter = user;
@ -30,7 +26,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => {
}, {
fields: {
data: false,
'profile': false
profile: false
}
});
@ -41,8 +37,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => {
// Check not muting
const exist = await Mute.findOne({
muterId: muter._id,
muteeId: mutee._id,
deletedAt: { $exists: false }
muteeId: mutee._id
});
if (exist === null) {
@ -50,12 +45,8 @@ module.exports = (params, user) => new Promise(async (res, rej) => {
}
// Delete mute
await Mute.update({
await Mute.remove({
_id: exist._id
}, {
$set: {
deletedAt: new Date()
}
});
// Send response

View File

@ -0,0 +1,91 @@
/**
* Module dependencies
*/
import $ from 'cafy';
import Note from '../../../../models/note';
import Mute from '../../../../models/mute';
import { pack } from '../../../../models/note';
/**
* Get timeline of global
*/
module.exports = async (params, user, app) => {
// Get 'limit' parameter
const [limit = 10, limitErr] = $(params.limit).optional.number().range(1, 100).$;
if (limitErr) throw 'invalid limit param';
// Get 'sinceId' parameter
const [sinceId, sinceIdErr] = $(params.sinceId).optional.id().$;
if (sinceIdErr) throw 'invalid sinceId param';
// Get 'untilId' parameter
const [untilId, untilIdErr] = $(params.untilId).optional.id().$;
if (untilIdErr) throw 'invalid untilId param';
// Get 'sinceDate' parameter
const [sinceDate, sinceDateErr] = $(params.sinceDate).optional.number().$;
if (sinceDateErr) throw 'invalid sinceDate param';
// Get 'untilDate' parameter
const [untilDate, untilDateErr] = $(params.untilDate).optional.number().$;
if (untilDateErr) throw 'invalid untilDate param';
// Check if only one of sinceId, untilId, sinceDate, untilDate specified
if ([sinceId, untilId, sinceDate, untilDate].filter(x => x != null).length > 1) {
throw 'only one of sinceId, untilId, sinceDate, untilDate can be specified';
}
// ミュートしているユーザーを取得
const mutedUserIds = (await Mute.find({
muterId: user._id
})).map(m => m.muteeId);
//#region Construct query
const sort = {
_id: -1
};
const query = {
// mute
userId: {
$nin: mutedUserIds
},
'_reply.userId': {
$nin: mutedUserIds
},
'_renote.userId': {
$nin: mutedUserIds
}
} as any;
if (sinceId) {
sort._id = 1;
query._id = {
$gt: sinceId
};
} else if (untilId) {
query._id = {
$lt: untilId
};
} else if (sinceDate) {
sort._id = 1;
query.createdAt = {
$gt: new Date(sinceDate)
};
} else if (untilDate) {
query.createdAt = {
$lt: new Date(untilDate)
};
}
//#endregion
// Issue query
const timeline = await Note
.find(query, {
limit: limit,
sort: sort
});
// Serialize
return await Promise.all(timeline.map(note => pack(note, user)));
};

View File

@ -0,0 +1,94 @@
/**
* Module dependencies
*/
import $ from 'cafy';
import Note from '../../../../models/note';
import Mute from '../../../../models/mute';
import { pack } from '../../../../models/note';
/**
* Get timeline of local
*/
module.exports = async (params, user, app) => {
// Get 'limit' parameter
const [limit = 10, limitErr] = $(params.limit).optional.number().range(1, 100).$;
if (limitErr) throw 'invalid limit param';
// Get 'sinceId' parameter
const [sinceId, sinceIdErr] = $(params.sinceId).optional.id().$;
if (sinceIdErr) throw 'invalid sinceId param';
// Get 'untilId' parameter
const [untilId, untilIdErr] = $(params.untilId).optional.id().$;
if (untilIdErr) throw 'invalid untilId param';
// Get 'sinceDate' parameter
const [sinceDate, sinceDateErr] = $(params.sinceDate).optional.number().$;
if (sinceDateErr) throw 'invalid sinceDate param';
// Get 'untilDate' parameter
const [untilDate, untilDateErr] = $(params.untilDate).optional.number().$;
if (untilDateErr) throw 'invalid untilDate param';
// Check if only one of sinceId, untilId, sinceDate, untilDate specified
if ([sinceId, untilId, sinceDate, untilDate].filter(x => x != null).length > 1) {
throw 'only one of sinceId, untilId, sinceDate, untilDate can be specified';
}
// ミュートしているユーザーを取得
const mutedUserIds = (await Mute.find({
muterId: user._id
})).map(m => m.muteeId);
//#region Construct query
const sort = {
_id: -1
};
const query = {
// mute
userId: {
$nin: mutedUserIds
},
'_reply.userId': {
$nin: mutedUserIds
},
'_renote.userId': {
$nin: mutedUserIds
},
// local
'_user.host': null
} as any;
if (sinceId) {
sort._id = 1;
query._id = {
$gt: sinceId
};
} else if (untilId) {
query._id = {
$lt: untilId
};
} else if (sinceDate) {
sort._id = 1;
query.createdAt = {
$gt: new Date(sinceDate)
};
} else if (untilDate) {
query.createdAt = {
$lt: new Date(untilDate)
};
}
//#endregion
// Issue query
const timeline = await Note
.find(query, {
limit: limit,
sort: sort
});
// Serialize
return await Promise.all(timeline.map(note => pack(note, user)));
};

View File

@ -11,11 +11,6 @@ import { pack } from '../../../../models/note';
/**
* Get timeline of myself
*
* @param {any} params
* @param {any} user
* @param {any} app
* @return {Promise<any>}
*/
module.exports = async (params, user, app) => {
// Get 'limit' parameter
@ -56,9 +51,7 @@ module.exports = async (params, user, app) => {
// ミュートしているユーザーを取得
mutedUserIds: Mute.find({
muterId: user._id,
// 削除されたドキュメントは除く
deletedAt: { $exists: false }
muterId: user._id
}).then(ms => ms.map(m => m.muteeId))
});

View File

@ -0,0 +1,39 @@
import * as websocket from 'websocket';
import * as redis from 'redis';
import { IUser } from '../../../models/user';
import Mute from '../../../models/mute';
export default async function(
request: websocket.request,
connection: websocket.connection,
subscriber: redis.RedisClient,
user: IUser
) {
// Subscribe stream
subscriber.subscribe(`misskey:global-timeline`);
const mute = await Mute.find({ muterId: user._id });
const mutedUserIds = mute.map(m => m.muteeId.toString());
subscriber.on('message', async (_, data) => {
const note = JSON.parse(data);
//#region 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
if (mutedUserIds.indexOf(note.userId) != -1) {
return;
}
if (note.reply != null && mutedUserIds.indexOf(note.reply.userId) != -1) {
return;
}
if (note.renote != null && mutedUserIds.indexOf(note.renote.userId) != -1) {
return;
}
//#endregion
connection.send(JSON.stringify({
type: 'note',
body: note
}));
});
}

View File

@ -21,10 +21,7 @@ export default async function(
// Subscribe Home stream channel
subscriber.subscribe(`misskey:user-stream:${user._id}`);
const mute = await Mute.find({
muterId: user._id,
deletedAt: { $exists: false }
});
const mute = await Mute.find({ muterId: user._id });
const mutedUserIds = mute.map(m => m.muteeId.toString());
subscriber.on('message', async (channel, data) => {
@ -33,6 +30,7 @@ export default async function(
try {
const x = JSON.parse(data);
//#region 流れてきたメッセージがミュートしているユーザーが関わるものだったら無視する
if (x.type == 'note') {
if (mutedUserIds.indexOf(x.body.userId) != -1) {
return;
@ -48,6 +46,7 @@ export default async function(
return;
}
}
//#endregion
connection.send(data);
} catch (e) {

View File

@ -0,0 +1,39 @@
import * as websocket from 'websocket';
import * as redis from 'redis';
import { IUser } from '../../../models/user';
import Mute from '../../../models/mute';
export default async function(
request: websocket.request,
connection: websocket.connection,
subscriber: redis.RedisClient,
user: IUser
) {
// Subscribe stream
subscriber.subscribe(`misskey:local-timeline`);
const mute = await Mute.find({ muterId: user._id });
const mutedUserIds = mute.map(m => m.muteeId.toString());
subscriber.on('message', async (_, data) => {
const note = JSON.parse(data);
//#region 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
if (mutedUserIds.indexOf(note.userId) != -1) {
return;
}
if (note.reply != null && mutedUserIds.indexOf(note.reply.userId) != -1) {
return;
}
if (note.renote != null && mutedUserIds.indexOf(note.renote.userId) != -1) {
return;
}
//#endregion
connection.send(JSON.stringify({
type: 'note',
body: note
}));
});
}

View File

@ -4,6 +4,8 @@ import * as redis from 'redis';
import config from '../../config';
import homeStream from './stream/home';
import localTimelineStream from './stream/local-timeline';
import globalTimelineStream from './stream/global-timeline';
import driveStream from './stream/drive';
import messagingStream from './stream/messaging';
import messagingIndexStream from './stream/messaging-index';
@ -64,8 +66,10 @@ module.exports = (server: http.Server) => {
return;
}
const channel =
const channel: any =
request.resourceURL.pathname === '/' ? homeStream :
request.resourceURL.pathname === '/local-timeline' ? localTimelineStream :
request.resourceURL.pathname === '/global-timeline' ? globalTimelineStream :
request.resourceURL.pathname === '/drive' ? driveStream :
request.resourceURL.pathname === '/messaging' ? messagingStream :
request.resourceURL.pathname === '/messaging-index' ? messagingIndexStream :

View File

@ -1,6 +1,6 @@
import Note, { pack, INote } from '../../models/note';
import User, { isLocalUser, IUser, isRemoteUser } from '../../models/user';
import stream from '../../publishers/stream';
import stream, { publishLocalTimelineStream, publishGlobalTimelineStream } from '../../publishers/stream';
import Following from '../../models/following';
import { deliver } from '../../queue';
import renderNote from '../../remote/activitypub/renderer/note';
@ -105,11 +105,17 @@ export default async (user: IUser, data: {
// タイムラインへの投稿
if (note.channelId == null) {
// Publish event to myself's stream
if (isLocalUser(user)) {
// Publish event to myself's stream
stream(note.userId, 'note', noteObj);
// Publish note to local timeline stream
publishLocalTimelineStream(noteObj);
}
// Publish note to global timeline stream
publishGlobalTimelineStream(noteObj);
// Fetch all followers
const followers = await Following.aggregate([{
$lookup: {