该项目适合作为小型的项目原型,适合教学和练手。最初这个项目的灵感源于我的个人需求,我需要一个工具来查看主流话题,同时又不想下载一堆 APP 来接收推送。项目最终的数据使用github pages来展示。
可以结合我的另一个项目pocwatchdog
实现定时执行加发送邮件。
项目开源地址:https://github.com/jiangyangcreate/SocialMood
项目查看地址:https://jiangyangcreate.github.io/SocialMood/
使用流程
# 安装依赖
cd python
python setup.py
# 运行数据抓取,生成静态网页
python crawler.py
依赖安装
所需依赖可以通过 setup.py
下载安装。因为有些模块不是pip就算安装好的。
import subprocess
import sys
import nltk
def install_requirements():
print("正在安装依赖...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"])
def install_browser():
print("正在安装浏览器...")
subprocess.check_call([sys.executable, "-m", "playwright", "install"])
def download_nltk_data():
print("正在下载NLTK词典文件...")
nltk.download('vader_lexicon')
def download_qwen2_model():
try:
print("正在下载Qwen2大模型(7B)...")
from transformers import AutoModelForCausalLM, AutoTokenizer
model_name = "Qwen/Qwen2.5-7B-Instruct"
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype="auto",
device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
prompt = "请回复:你好,我是Qwen2大模型,有什么可以帮您的吗?"
messages = [
{"role": "system", "content": "You are Qwen, created by Alibaba Cloud. You are a helpful assistant."},
{"role": "user", "content": prompt}
]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)
generated_ids = model.generate(
**model_inputs,
max_new_tokens=512
)
generated_ids = [
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]
response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
print('Qwen2大模型(7B)响应:', response)
except Exception as e:
print(f"跳过下载Qwen2大模型(7B),出现错误:{e}")
def main():
try:
install_requirements()
install_browser()
download_nltk_data()
download_qwen2_model()
print("所有安装步骤已完成!")
except subprocess.CalledProcessError as e:
print(f"安装过程中出现错误:{e}")
sys.exit(1)
if __name__ == "__main__":
main()
主要功能
该系统的主要功能包括:
- 抓取热搜数据:从微博、抖音、B站等平台抓取热搜数据。
这里也可以通过API获取,爬取注意不要变成DDOS攻击。
- 数据处理:使用 Pandas 进行数 据清洗与处理。
使用pandas主要是处理一些文本型的数据,譬如10万要换算为100000。
使用jieba分词用于后续词云图生成,需要剔除一些单字与标点符号。当然,最近b站很喜欢在标题中加空格,所以要先去空格再分词。
有些数据的热度值还没计算出来,可以使用幂律分布的线性回归填补热度缺失值。这里使用指数回归、普通线性回归效果都不好。
幂律分布常见的样子:个人UP主粉丝排名前几名粉丝差距有百万,像指数分布。但是后续排名的up粉丝差距就很小,接近线性分布。如果我随机扣掉一个排名的up主的粉丝数,让你预测,你可以预测多准。我面对的大概就是这样的一个问题。
下面是我的解决方案:
def estimate_missing_heat(self):
"""估算缺失的热度值"""
features = ["排名"]
target = "处理后热度"
def impute_group(group):
X = group[features]
y = group[target]
# 如果没有缺失值,直接返回原始组
if not y.isnull().any():
return group
# 幂律分布拟合(通过对数变换实现)
X_train = X[y.notnull()]
y_train = y[y.notnull()]
# 将 X_train 转换为浮点数类型
X_train = X_train.astype(float)
# 对X和y进行对数变换,处理可能的零值
log_X_train = np.log(X_train + 1)
log_y_train = np.log(y_train + 1)
lr = LinearRegression()
lr.fit(log_X_train, log_y_train)
# 获取拟合的系数
slope = lr.coef_[0]
intercept = lr.intercept_
# print(f"拟合的幂律方程为: y = {np.exp(intercept):.2f} * x^{slope:.2f}")
# 预测缺失值
X_missing = X[y.isnull()]
if not X_missing.empty:
# 将 X_missing 转换为浮点数
X_missing = X_missing.astype(float)
log_X_missing = np.log(X_missing + 1)
log_predictions = lr.predict(log_X_missing)
predictions = np.exp(log_predictions) - 1 # 反向变换
# 确保预测值不小于0
predictions = np.maximum(predictions, 0)
# 确保预测值符合排名顺序
for i, pred in enumerate(predictions):
rank = X_missing.iloc[i]["排名"]
higher_ranks = y[(X["排名"] < rank) & y.notnull()]
lower_ranks = y[(X["排名"] > rank) & y.notnull()]
if len(higher_ranks) > 0:
pred = min(pred, higher_ranks.min())
if len(lower_ranks) > 0:
pred = max(pred, lower_ranks.max())
predictions[i] = pred
group.loc[y.isnull(), target] = predictions
return group
# 对每个信息来源分别进行缺失值填充
self.df = self.df.groupby("信息来源").apply(impute_group)
# 删除仍然为None的行(如果有的话)
self.df = self.df.dropna(subset=["处理后热度"])
self.df["处理后热度"] = self.df["处理后热度"].astype(int)
- 情绪分析:监测和分析公众情绪。算出单条标题的情绪数值之后,标准化到
(-1,1)
这个区间之中。最后通过热度与排名计算出对社会的情感影响力。正数数则是积极影响,负数则是负面影响。
如果你的电脑性能还不错,可以使用 Qwen2.5 本地模型作为情绪分析的核心,根据自己的设备选择模型大小。
class Qwen2Model:
_model = None
_tokenizer = None
_device = None
model_name = "Qwen/Qwen2.5-7B-Instruct"
@classmethod
def _initialize(cls):
if cls._model is None or cls._tokenizer is None:
print("初始化Qwen2模型")
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
torch.cuda.empty_cache() # 清理缓存
cls._device = "cuda"
cls._model = AutoModelForCausalLM.from_pretrained(
cls.model_name,
torch_dtype=torch.float16,
device_map="auto" if torch.cuda.is_available() else None
).to(cls._device)
cls._tokenizer = AutoTokenizer.from_pretrained(cls.model_name)
@classmethod
def get_sentiment_score(cls,text):
prompt = "请仅对用户提供的句子进行情感评分,并返回介于-1到1之间的分数。-1表示非常负面,1表示非常正面,0表示中立。无需提供其他信息或上下文。"
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": text}
]
text = cls._tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
model_inputs = cls._tokenizer([text], return_tensors="pt").to(cls._model.device)
generated_ids = cls._model.generate(
**model_inputs,
max_new_tokens=512
)
generated_ids = [
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]
response = cls._tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
return float(response)
以上代码均在 crawler.py
。
数据存储
系统使用 sqlite3
保存中间数据,相关代码在 database.py
中,数据文件为 news.db
,包含两张表:
- news:清洗后的可用数据。
- raw_html:原始网页数据。
这样可以保证速度的同时,可以保持文件夹的整洁,如果数据量大可以直接平滑的迁移到正式数据库中。
# encoding: utf-8
'''
将数据存储到数据库
'''
import sqlite3
import pandas as pd
from collections import Counter
import json
class NewsDatabase:
def __init__(self, db_name='news.db'):
self.db_name = db_name
self.create_tables()
def create_tables(self):
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS news
(id INTEGER PRIMARY KEY AUTOINCREMENT,
日期 TEXT,
时间 TEXT,
信息来源 TEXT,
标题 TEXT,
排名 INTEGER,
热度 TEXT,
处理后热度 TEXT,
链接 TEXT,
情感得分 REAL,
加权情感得分 REAL,
分词 TEXT)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS raw_html
(id INTEGER PRIMARY KEY AUTOINCREMENT,
日期 TEXT,
时间 TEXT,
内容 TEXT)''')
conn.commit()
conn.close()
def save_data(self, df: pd.DataFrame, table_name='news'):
conn = sqlite3.connect(self.db_name)
if table_name == 'news':
df['分词'] = df['分词'].apply(json.dumps)
df.to_sql(table_name, conn, if_exists='append', index=False)
conn.close()
def save_raw_html(self, html_content: str):
df = pd.DataFrame({
'日期': [pd.Timestamp.now().strftime('%Y-%m-%d')],
'时间': [pd.Timestamp.now().strftime('%H:%M:%S')],
'内容': [html_content]
})
self.save_data(df, 'raw_html')
def read_data(self, query="SELECT * FROM news"):
conn = sqlite3.connect(self.db_name)
df = pd.read_sql_query(query, conn)
if '分词' in df.columns:
df['分词'] = df['分词'].apply(json.loads)
conn.close()
return df
def execute_query(self, query, params=None):
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
conn.commit()
conn.close()
def fetch_data(self, query, params=None):
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
rows = cursor.fetchall()
conn.close()
return rows
def delete_table(self,table_name:str):
"""删除news子表"""
conn = sqlite3.connect(self.db_name)
try:
with conn:
conn.execute(f"DROP TABLE IF EXISTS {table_name}")
print("news表已成功删除")
except sqlite3.Error as e:
print(f"删除news表时发生错误:{e}")
def clear_table(self, table_name):
"""清空指定表的所有数据"""
conn = sqlite3.connect(self.db_name)
try:
with conn:
conn.execute(f"DELETE FROM {table_name}")
print(f"{table_name}表已成功清空")
except sqlite3.Error as e:
print(f"清空{table_name}表时发生错误:{e}")
def export_word_cloud_data(self, query="SELECT 分词 FROM news WHERE 日期 = (SELECT MAX(日期) FROM news)"):
"""导出词云数据"""
rows = self.fetch_data(query)
words = []
for row in rows:
words.extend(eval(row[0])) # 使用 eval 来解析字符串列表
word_counts = Counter(words)
word_cloud_data = [(word,count) for word, count in word_counts.items()]
return word_cloud_data
def export_pie_chart_data(self, query="SELECT 信息来源, SUM(CAST(处理后热度 AS INTEGER)) FROM news WHERE 日期 = (SELECT MAX(日期) FROM news) GROUP BY 信息来源"):
"""导出饼图数据"""
rows = self.fetch_data(query)
pie_chart_data = [(row[0],row[1]) for row in rows]
return pie_chart_data
def export_line_chart_data(self, query = """
SELECT 日期, SUM(加权情感得分) as total_sentiment
FROM news
GROUP BY 日期
ORDER BY 日期
"""):
"""导出折线图数据"""
rows = self.fetch_data(query)
line_chart_data = [(row[0], int(row[1])) for row in rows]
return line_chart_data
def export_bar_chart_data(self, query="""
SELECT 信息来源, 加权情感得分
FROM news
WHERE 日期 = (SELECT MAX(日期) FROM news)
"""):
"""导出柱状图数据"""
rows = self.fetch_data(query)
bar_chart_data = {}
for row in rows:
source = row[0]
sentiment_score = row[1]
if source not in bar_chart_data:
bar_chart_data[source] = {"positive": 0, "negative": 0}
if sentiment_score > 0:
bar_chart_data[source]["positive"] += sentiment_score
elif sentiment_score < 0:
bar_chart_data[source]["negative"] += sentiment_score
formatted_data = [ (source, int(data["positive"]), abs(int(data["negative"])),abs(int(data["positive"])+int(data["negative"])))
for source, data in bar_chart_data.items()]
return formatted_data
数据可视化
接下来绘制一些图表,包括:
- 各个平台的情绪红绿图。
- 公众情绪的涨跌折线图。
- 每日全网词云图。
通过pyecharts的脚手架,导出为静态网页。这里为了代码直观,封装为了类。
class ChartGenerator:
def create_bar_chart(self, data , y_label="Sentiment",title="Sentiment Scores by Source") -> Bar:
"""Generate a bar chart with positive, negative, and absolute sentiment scores."""
x_data, y_data_positive, y_data_negative, y_data_absolute = zip(*data)
bar = (
Bar()
.add_xaxis(x_data)
.add_yaxis("Positive Sentiment", y_data_positive, stack="stack1", category_gap="50%")
.add_yaxis("Negative Sentiment", y_data_negative, stack="stack2", category_gap="50%")
.add_yaxis("Absolute Sentiment", y_data_absolute, stack="stack3", category_gap="50%")
.set_global_opts(
title_opts=opts.TitleOpts(title=title, pos_left="center"),
xaxis_opts=opts.AxisOpts(name="Source"),
yaxis_opts=opts.AxisOpts(name="Sentiment Score"),
legend_opts=opts.LegendOpts(pos_top="10%", pos_right="10%"),
)
)
return bar
def create_line_chart(self,data,y_label="Weighted Sentiment Score Sum",title="Sum of Weighted Sentiment Scores Over Time") -> Line:
"""Generate a line chart."""
x_data, y_data = zip(*data)
line = (
Line()
.add_xaxis(x_data)
.add_yaxis(y_label, y_data, is_smooth=True)
.set_global_opts(
title_opts=opts.TitleOpts(title=title, pos_left="center"),
xaxis_opts=opts.AxisOpts(name="Date", axislabel_opts=opts.LabelOpts(rotate=45)),
yaxis_opts=opts.AxisOpts(name=y_label),
legend_opts=opts.LegendOpts(pos_bottom="0%")
)
)
return line
def create_pie_chart(self, data_pairs, title="Information Source Heat") -> Pie:
"""Generate a pie chart."""
pie = (
Pie()
.add("", data_pairs)
.set_global_opts(
title_opts=opts.TitleOpts(title=title, pos_left="center"),
legend_opts=opts.LegendOpts(pos_bottom="0%") # Move legend to the bottom
)
.set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {d}%"))
)
return pie
def create_wordcloud(self, words, title="Word Cloud") -> WordCloud:
"""Generate a word cloud."""
wordcloud = (
WordCloud()
.add("", words, word_size_range=[20, 100],shape="circle")
.set_global_opts(title_opts=opts.TitleOpts(title=title, pos_left="center"))
)
return wordcloud
def render_charts(self, charts, output_file=None):
"""Render multiple charts to a single HTML file."""
if output_file is None:
output_file = os.path.join("docs", "index.html")
page = Page(layout=Page.SimplePageLayout)
page.add(*charts)
page.render(output_file)
代码设置
随着时间的推移,爬虫部分的代码可能需要自己修改。你也可以在main函数中,将debug设为True,这样不会真的爬取,而是调用本地的已爬取的网页。生成后的内容在docs/index.html
。