|
import csv
|
|
import os
|
|
from operator import itemgetter
|
|
|
|
from mastodon import Mastodon
|
|
from bs4 import BeautifulSoup
|
|
|
|
DOMAIN = 'bgme.me'
|
|
# Scopes: read:accounts read:statuses write:reports admin:read:accounts admin:read:domain_allows admin:read:domain_blocks admin:read:reports admin:write:accounts admin:write:domain_allows admin:write:domain_blocks admin:write:reports
|
|
ACCESS_TOKEN = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
|
|
|
|
IGNORE_ACCOUNTS = []
|
|
|
|
|
|
def get_text_from_html(data: str):
|
|
soup = BeautifulSoup(data, 'html.parser')
|
|
return soup.text
|
|
|
|
|
|
class Main:
|
|
def __init__(self, ACCOUNTS_CSV_FILE: str):
|
|
self.ACCOUNTS_CSV_FILE = ACCOUNTS_CSV_FILE
|
|
if not os.path.exists(self.ACCOUNTS_CSV_FILE):
|
|
raise IOError("Not Found CSV File.")
|
|
|
|
self.spam_accounts = set()
|
|
self.infected_domains = set()
|
|
|
|
mastodon = Mastodon(api_base_url='https://' + DOMAIN, access_token=ACCESS_TOKEN)
|
|
mastodon.account_verify_credentials()
|
|
self.mastodon = mastodon
|
|
|
|
def get_accounts_status(self, account_id: str, limit=20):
|
|
return self.mastodon.account_statuses(account_id, exclude_reblogs=True, limit=limit)
|
|
|
|
def print_latest_account_status(self, account_id: str):
|
|
status = self.get_accounts_status(account_id, limit=1)
|
|
s = status[0]
|
|
# print(s['content'])
|
|
print(get_text_from_html(s['content']))
|
|
return s
|
|
|
|
def report(self, account_id: str, status_id: str):
|
|
r = self.mastodon.report(account_id, status_ids=[status_id], comment="SPAM", forward=True, category="spam")
|
|
print('Report Account {}'.format(account_id))
|
|
return r
|
|
|
|
def start_report_accounts(self):
|
|
with open(self.ACCOUNTS_CSV_FILE, 'r') as csvfile:
|
|
reader = csv.DictReader(csvfile)
|
|
for row in reader:
|
|
account_id, username, domain, display_name, avatar_file_name = itemgetter(
|
|
"id", "username", "domain", "display_name", "avatar_file_name")(row)
|
|
|
|
if account_id in IGNORE_ACCOUNTS:
|
|
continue
|
|
|
|
print(
|
|
'\nReport @{username}@{domain} ({account_id}):'.format(username=username, domain=domain,
|
|
account_id=account_id)
|
|
)
|
|
try:
|
|
s = self.print_latest_account_status(account_id)
|
|
confirm = input(
|
|
"\nWill block account @{username}@{domain}? [Y/n] ".format(username=username, domain=domain))
|
|
if confirm == 'n':
|
|
continue
|
|
self.report(account_id, s['id'])
|
|
self.spam_accounts.add(account_id)
|
|
self.infected_domains.add(domain)
|
|
except BaseException as e:
|
|
print(e)
|
|
|
|
def start_block_accounts(self):
|
|
for account_id in self.spam_accounts:
|
|
try:
|
|
print('Block account {}'.format(account_id))
|
|
self.mastodon.admin_account_moderate(account_id, action='suspend')
|
|
except BaseException as e:
|
|
print(e)
|
|
|
|
def start_limit_domains(self):
|
|
limit_domains = []
|
|
for domain in self.infected_domains:
|
|
try:
|
|
print('Limit Domain {}'.format(domain))
|
|
self.mastodon.admin_create_domain_block(
|
|
domain, severity='silence', private_comment='SPAM', public_comment='SPAM')
|
|
limit_domains.append(domain)
|
|
except BaseException as e:
|
|
print(e)
|
|
|
|
print('\n\nLimit domains:')
|
|
for domain in limit_domains:
|
|
print(domain)
|
|
|
|
def start(self):
|
|
self.start_report_accounts()
|
|
self.start_block_accounts()
|
|
self.start_limit_domains()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='Mastodon Anti-SPAM 管理辅助脚本')
|
|
parser.add_argument('csv_file', help='包含疑似帐户的 CSV 文件')
|
|
args = parser.parse_args()
|
|
|
|
m = Main(args.csv_file)
|
|
m.start()
|