/* * SPDX-FileCopyrightText: syuilo and other misskey contributors * SPDX-License-Identifier: AGPL-3.0-only */ import { Inject, Injectable } from '@nestjs/common'; import * as Bull from 'bullmq'; import { DI } from '@/di-symbols.js'; import type { InstancesRepository } from '@/models/_.js'; import type Logger from '@/logger.js'; import { MetaService } from '@/core/MetaService.js'; import { ApRequestService } from '@/core/activitypub/ApRequestService.js'; import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js'; import { MemorySingleCache } from '@/misc/cache.js'; import type { MiInstance } from '@/models/Instance.js'; import InstanceChart from '@/core/chart/charts/instance.js'; import ApRequestChart from '@/core/chart/charts/ap-request.js'; import FederationChart from '@/core/chart/charts/federation.js'; import { StatusError } from '@/misc/status-error.js'; import { UtilityService } from '@/core/UtilityService.js'; import { bindThis } from '@/decorators.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { DeliverJobData } from '../types.js'; @Injectable() export class DeliverProcessorService { private logger: Logger; private suspendedHostsCache: MemorySingleCache; private latest: string | null; constructor( @Inject(DI.instancesRepository) private instancesRepository: InstancesRepository, private metaService: MetaService, private utilityService: UtilityService, private federatedInstanceService: FederatedInstanceService, private fetchInstanceMetadataService: FetchInstanceMetadataService, private apRequestService: ApRequestService, private instanceChart: InstanceChart, private apRequestChart: ApRequestChart, private federationChart: FederationChart, private queueLoggerService: QueueLoggerService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('deliver'); this.suspendedHostsCache = new MemorySingleCache(1000 * 60 * 60); } @bindThis public async process(job: Bull.Job): Promise { const { host } = new URL(job.data.to); // ブロックしてたら中断 const meta = await this.metaService.fetch(); if (this.utilityService.isBlockedHost(meta.blockedHosts, this.utilityService.toPuny(host))) { return 'skip (blocked)'; } // isSuspendedなら中断 let suspendedHosts = this.suspendedHostsCache.get(); if (suspendedHosts == null) { suspendedHosts = await this.instancesRepository.find({ where: { isSuspended: true, }, }); this.suspendedHostsCache.set(suspendedHosts); } if (suspendedHosts.map(x => x.host).includes(this.utilityService.toPuny(host))) { return 'skip (suspended)'; } try { await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content, job.data.digest); // Update stats this.federatedInstanceService.fetch(host).then(i => { if (i.isNotResponding) { this.federatedInstanceService.update(i.id, { isNotResponding: false, }); } this.fetchInstanceMetadataService.fetchInstanceMetadata(i); this.apRequestChart.deliverSucc(); this.federationChart.deliverd(i.host, true); if (meta.enableChartsForFederatedInstances) { this.instanceChart.requestSent(i.host, true); } }); return 'Success'; } catch (res) { // Update stats this.federatedInstanceService.fetch(host).then(i => { if (!i.isNotResponding) { this.federatedInstanceService.update(i.id, { isNotResponding: true, }); } this.apRequestChart.deliverFail(); this.federationChart.deliverd(i.host, false); if (meta.enableChartsForFederatedInstances) { this.instanceChart.requestSent(i.host, false); } }); if (res instanceof StatusError) { // 4xx if (!res.isRetryable) { // 相手が閉鎖していることを明示しているため、配送停止する if (job.data.isSharedInbox && res.statusCode === 410) { this.federatedInstanceService.fetch(host).then(i => { this.federatedInstanceService.update(i.id, { isSuspended: true, }); }); throw new Bull.UnrecoverableError(`${host} is gone`); } throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`); } // 5xx etc. throw new Error(`${res.statusCode} ${res.statusMessage}`); } else { // DNS error, socket error, timeout ... throw res; } } } }