|
import psycopg2
|
|
from mastodon import Mastodon
|
|
from bs4 import BeautifulSoup
|
|
|
|
DOMAIN = 'bgme.me'
|
|
# Scopes: read:accounts read:statuses write:reports admin:read:accounts admin:read:domain_allows admin:read:domain_blocks admin:read:reports admin:write:accounts admin:write:domain_allows admin:write:domain_blocks admin:write:reports
|
|
ACCESS_TOKEN = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
|
|
|
|
DB_PARMS = {
|
|
"dbname": "mastodon_production",
|
|
"user": "mastodon",
|
|
"password": "xxxxxxxxxxxxxxxxxxxxxxxxx",
|
|
"host": "127.0.0.1",
|
|
"port": "5432"
|
|
}
|
|
SQL = '''-- 查找所有位于未进行管理操作实例的疑似 SPAM 帐户
|
|
select
|
|
id,
|
|
username,
|
|
domain,
|
|
display_name,
|
|
avatar_file_name
|
|
from
|
|
accounts
|
|
where
|
|
username ~ '^[0-9a-z]{10}$'
|
|
and (username = display_name or display_name = '')
|
|
and created_at > '2024-02-15 00:00:00'
|
|
and suspended_at is null
|
|
and avatar_file_name is null
|
|
and "domain" is not null
|
|
and "domain" not in (
|
|
select
|
|
db."domain"
|
|
from
|
|
domain_blocks db
|
|
);'''
|
|
|
|
IGNORE_ACCOUNTS: list[int] = []
|
|
|
|
# Scopes: read:accounts read:statuses write:statuses
|
|
NOTICE_ACCESS_TOKEN = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
|
|
NOTICE_STATUS_ID = 111952338188940608
|
|
|
|
|
|
def get_text_from_html(data: str):
|
|
soup = BeautifulSoup(data, 'html.parser')
|
|
return soup.text
|
|
|
|
|
|
class Main:
|
|
def __init__(self):
|
|
self.spam_accounts = set()
|
|
self.infected_domains = set()
|
|
|
|
mastodon = Mastodon(api_base_url='https://' + DOMAIN, access_token=ACCESS_TOKEN)
|
|
mastodon.account_verify_credentials()
|
|
self.mastodon = mastodon
|
|
|
|
notice_mastodon = Mastodon(api_base_url='https://' + DOMAIN, access_token=NOTICE_ACCESS_TOKEN)
|
|
notice_mastodon.account_verify_credentials()
|
|
self.notice_mastodon = notice_mastodon
|
|
|
|
def get_suspect_accounts_from_db(self):
|
|
conn = psycopg2.connect(
|
|
dbname=DB_PARMS['dbname'], user=DB_PARMS['user'], password=DB_PARMS['password'],
|
|
host=DB_PARMS['host'], port=DB_PARMS['port']
|
|
)
|
|
cur = conn.cursor()
|
|
cur.execute(SQL)
|
|
results = cur.fetchall()
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
return results
|
|
|
|
def get_accounts_status(self, account_id: str, limit=20):
|
|
return self.mastodon.account_statuses(account_id, exclude_reblogs=True, limit=limit)
|
|
|
|
def print_latest_account_status(self, account_id: str):
|
|
status = self.get_accounts_status(account_id, limit=1)
|
|
s = status[0]
|
|
# print(s['content'])
|
|
print(get_text_from_html(s['content']))
|
|
return s
|
|
|
|
def report(self, account_id: str, status_id: str):
|
|
r = self.mastodon.report(account_id, status_ids=[status_id], comment="SPAM", forward=True, category="spam")
|
|
print('Report Account {}'.format(account_id))
|
|
return r
|
|
|
|
def start_report_accounts(self):
|
|
for account_id, username, domain, display_name, avatar_file_name in self.get_suspect_accounts_from_db():
|
|
if account_id in IGNORE_ACCOUNTS:
|
|
continue
|
|
|
|
print(
|
|
'\nReport @{username}@{domain} ({account_id}):'.format(username=username, domain=domain,
|
|
account_id=account_id)
|
|
)
|
|
try:
|
|
s = self.print_latest_account_status(account_id)
|
|
confirm = input(
|
|
"\nWill block account @{username}@{domain}? [Y/n] ".format(username=username, domain=domain))
|
|
if confirm == 'n':
|
|
continue
|
|
self.report(account_id, s['id'])
|
|
self.spam_accounts.add(account_id)
|
|
self.infected_domains.add(domain)
|
|
except BaseException as e:
|
|
print(e)
|
|
|
|
def start_block_accounts(self):
|
|
for account_id in self.spam_accounts:
|
|
try:
|
|
print('Block account {}'.format(account_id))
|
|
self.mastodon.admin_account_moderate(account_id, action='suspend')
|
|
except BaseException as e:
|
|
print(e)
|
|
|
|
def start_limit_domains(self) -> list[str]:
|
|
limit_domains = []
|
|
for domain in self.infected_domains:
|
|
try:
|
|
print('Limit Domain {}'.format(domain))
|
|
self.mastodon.admin_create_domain_block(
|
|
domain, severity='silence', private_comment='SPAM', public_comment='SPAM')
|
|
limit_domains.append(domain)
|
|
except BaseException as e:
|
|
print(e)
|
|
|
|
print('\n\nLimit domains:')
|
|
for domain in limit_domains:
|
|
print(domain)
|
|
|
|
return limit_domains
|
|
|
|
def modify_notice_status(self, new_limit_domains: list[str]):
|
|
print('\nUpdate notice status...')
|
|
source = self.notice_mastodon.status_source(NOTICE_STATUS_ID)
|
|
domain_set = set(
|
|
filter(lambda x: x != '',
|
|
map(lambda x: x.strip(), source.text.split('\n'))
|
|
)
|
|
)
|
|
for nd in new_limit_domains:
|
|
domain_set.add(nd.strip())
|
|
new_text = '\n'.join(
|
|
sorted(list(domain_set))
|
|
)
|
|
self.notice_mastodon.status_update(source['id'], status=new_text, spoiler_text=source['spoiler_text'])
|
|
|
|
def start(self):
|
|
self.start_report_accounts()
|
|
self.start_block_accounts()
|
|
limit_domains = self.start_limit_domains()
|
|
|
|
if len(limit_domains) > 0:
|
|
self.modify_notice_status(limit_domains)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
m = Main()
|
|
m.start()
|