聚类算法
聚类算法被广泛应用于数据搜索、样本分类中。想象你要在一个有100万本书的图书馆里,找到与你手中这本书最相似的5本书。如果一本一本地比较,你需要比较100万次。
向量数据库通过多次多层级聚类把相似的数据放在"附近",建立不同层级,搜索时就像在分区清晰的图书馆中找书,快速到达目标区域。
例如可以把图书聚类为10类,判断当前图书距离哪类的质心最近,然后去该类中搜索。该类中又可以聚类为N类,再次判断当前图书距离哪类的质心最近,然后去该类中搜索。多次迭代指数级缩小搜索范围后,最后遍历对比距离即可。
当然,聚类对于边界模糊的数据效果不佳,例如你想找《明朝哪些年》,它既是小说,又是历史。很可能在某一层级距离计算错误,导致结果不准确。虽然可以通过分层聚类解决,但计算量会大大增加。
此 外,聚类结构是基于当前数据快照构建的。当有新数据不断加入或旧数据被删除时,原有的聚类结构可能不再最优甚至失效。增量式更新聚类结构通常比全量重新聚类更复杂且效果可能打折扣。
K 均值算法
这是一种解决聚类问题的非监督式学习算法。这个方法简单地利用了一定数量的集群(假设 K 个集群)对给定数据进行分类。同一集群内的数据点是同类的,不同集群的数据点不同类。
K 均值算法如何划分集群:
-
随机从每个集群中选取 K 个数据点作为质心(centroids),因此同一组数据多次运行,分组结果可能不同。
-
将每一个数据点与距离自己最近的质心划分在同一集群,即生成 K 个新集群。
-
找出新集群的质心,这样就有了新的质心。
-
重复 2 和 3,直到结果收敛,即不再有新的质心出现。
这样的算法基于距离和质心,天然倾向于发现球状或凸形的簇。对于流形、环形、相互缠绕或具有复杂边界的不规则形状的数据分布,效果往往不佳。可以考虑使用DBSCAN算法。
距离计算公式:
一维坐标系中,设 A(x1),B(x2),则 A,B 之间的距离为
二维坐标系中,设 A(x1,y1),B(x2,y2),则 A,B 之间的距离为
三维坐标系中,设 A(x1,y1,z1),B(x2,y2,z2),则 A,B 之间的距离为
以此类推
动画演示
下面的动画使用10*10的网格模拟图片,通过修改网格颜色表示分类。
通过绿色表示样本分类1,深绿色表示其簇中心点,蓝色表示样本分类2,深蓝色表示其簇中心点。
初始簇中心点1在左上角,簇中心点2在中间。
每次迭代停顿5秒。
点击查看动画
function KMeansAnimation() { const gridSize = 10; const [dataPoints, setDataPoints] = React.useState([]); const [centroids, setCentroids] = React.useState([ { x: 0, y: 0 }, { x: 5, y: 5 } ]); const [step, setStep] = React.useState(0); const [iteration, setIteration] = React.useState(0); const [ready, setReady] = React.useState(false); React.useEffect(() => { const generateAllGridPoints = () => { const points = []; for (let i = 0; i < gridSize; i++) { for (let j = 0; j < gridSize; j++) { points.push({ x: i, y: j, cluster: null }); } } return points; }; setDataPoints(generateAllGridPoints()); // 初始化后等待5秒再开始第一次迭代 const initialTimer = setTimeout(() => { setReady(true); }, 5000); return () => clearTimeout(initialTimer); }, []); const distance = (point1, point2) => { return Math.sqrt(Math.pow(point1.x - point2.x, 2) + Math.pow(point1.y - point2.y, 2)); }; React.useEffect(() => { if (dataPoints.length === 0 || !ready) return; const timer = setTimeout(() => { if (step === 0) { const newDataPoints = dataPoints.map(point => { const dist1 = distance(point, centroids[0]); const dist2 = distance(point, centroids[1]); return { ...point, cluster: dist1 <= dist2 ? 0 : 1 }; }); setDataPoints(newDataPoints); setStep(1); } else if (step === 1) { const cluster0Points = dataPoints.filter(p => p.cluster === 0); const cluster1Points = dataPoints.filter(p => p.cluster === 1); if (cluster0Points.length > 0 && cluster1Points.length > 0) { const newX0 = Math.round(cluster0Points.reduce((sum, p) => sum + p.x, 0) / cluster0Points.length); const newY0 = Math.round(cluster0Points.reduce((sum, p) => sum + p.y, 0) / cluster0Points.length); const newX1 = Math.round(cluster1Points.reduce((sum, p) => sum + p.x, 0) / cluster1Points.length); const newY1 = Math.round(cluster1Points.reduce((sum, p) => sum + p.y, 0) / cluster1Points.length); setCentroids([ { x: newX0, y: newY0 }, { x: newX1, y: newY1 } ]); } setStep(0); setIteration(prev => prev + 1); // 每次迭代完成后暂停5秒 setReady(false); setTimeout(() => { setReady(true); }, 5000); } }, 1000); return () => clearTimeout(timer); }, [step, dataPoints, centroids, ready]); const renderGrid = () => { const grid = []; for (let y = 0; y < gridSize; y++) { for (let x = 0; x < gridSize; x++) { const pointAtPosition = dataPoints.find(p => p.x === x && p.y === y); const isCentroid0 = centroids[0].x === x && centroids[0].y === y; const isCentroid1 = centroids[1].x === x && centroids[1].y === y; let cellStyle = { width: '32px', height: '32px', border: '1px solid #cbd5e0', display: 'flex', alignItems: 'center', justifyContent: 'center' }; if (pointAtPosition) { if (pointAtPosition.cluster === 0) { cellStyle.backgroundColor = '#9ae6b4'; } else if (pointAtPosition.cluster === 1) { cellStyle.backgroundColor = '#90cdf4'; } } if (isCentroid0) { cellStyle.backgroundColor = '#276749'; } else if (isCentroid1) { cellStyle.backgroundColor = '#2b6cb0'; } grid.push( <div key={`${x}-${y}`} style={cellStyle}></div> ); } } return grid; }; return ( <div style={{display: 'flex', flexDirection: 'column', alignItems: 'center', padding: '16px'}}> <h2 style={{fontSize: '1.25rem', fontWeight: 'bold', marginBottom: '16px'}}>K-Means 聚类算法可视化</h2> <div style={{marginBottom: '16px'}}> 迭代次数: {iteration} {!ready && <span style={{marginLeft: '10px', color: '#718096'}}>等待中...</span>} </div> <div style={{ display: 'grid', gridTemplateColumns: 'repeat(10, 1fr)', gap: '4px', marginBottom: '16px' }}> {renderGrid()} </div> <div style={{marginTop: '16px', display: 'flex', gap: '24px'}}> <div style={{display: 'flex', alignItems: 'center'}}> <div style={{width: '16px', height: '16px', backgroundColor: '#9ae6b4', marginRight: '8px'}}></div> <span>簇1数据点</span> </div> <div style={{display: 'flex', alignItems: 'center'}}> <div style={{width: '16px', height: '16px', backgroundColor: '#276749', marginRight: '8px'}}></div> <span>簇1中心点</span> </div> <div style={{display: 'flex', alignItems: 'center'}}> <div style={{width: '16px', height: '16px', backgroundColor: '#90cdf4', marginRight: '8px'}}></div> <span>簇2数据点</span> </div> <div style={{display: 'flex', alignItems: 'center'}}> <div style={{width: '16px', height: '16px', backgroundColor: '#2b6cb0', marginRight: '8px'}}></div> <span>簇2中心点</span> </div> </div> </div> ); }
简单示例
- numpy
- sklearn
import numpy as np
from sklearn.datasets import load_iris
def kmeans(datas, k=3, max_iter=1000, tolerance=1e-6):
"""
K-means聚类算法
参数:
datas: 输入数据,numpy数组
k: 聚类数量,默认为2
max_iter: 最大迭代次数,默认为100
tolerance: 收敛容差 ,默认为1e-4
返回:
kernels: 最终的聚类中心
labels: 每个数据点的类别标签
"""
# 随机选择K个点作为聚类中心
kernels = datas[:k].copy().astype(float)
labels = np.zeros(len(datas), dtype=int)
for iteration in range(max_iter):
# 保存上一轮的聚类中心用于检查收敛
old_kernels = kernels.copy()
# 步骤1: 分配每个点到最近的聚类中心
for i, point in enumerate(datas):
# 计算到每个聚类中心的欧几里得距离
distances = np.array([np.sum((kernel - point) ** 2) for kernel in kernels])
# 找到距离最小的值的下标
labels[i] = np.argmin(distances)
# 步骤2: 更新聚类中心为该类别所有点的均值
for j in range(k):
# 找到属于第j个聚类的所有点
cluster_points = datas[labels == j]
if len(cluster_points) > 0:
# 更新聚类中心为该类别所有点的均值
kernels[j] = np.mean(cluster_points, axis=0)
# 检查收敛条件:聚类中心变化很小
if np.all(np.abs(kernels - old_kernels) < tolerance):
print(f"算法在第{iteration + 1}次迭代后收敛")
break
else:
print(f"达到最大迭代次数{max_iter}")
return kernels, labels
# 主程序
if __name__ == "__main__":
# 加载iris数据
iris = load_iris()
iris_X = iris.data
iris_y = iris.target
# 运行自实现的K-means算法
centers, labels = kmeans(iris_X, k=3)
# 计算准确度
'''
K-means算法产生的类别标签(0,1,2)和真实的iris标签(0,1,2)不一定是对应的。
算法可能把真实的第0类识别为第1类
把真实的第1类识别为第2类
把真实的第2类识别为第0类
所以需要将K-means算法产生的类别标签(0,1,2)和真实的iris标签(0,1,2)进行对应
'''
for i,j in zip(labels,iris_y):
print(i,j)
dicts = {0:2,1:1,2:0}
error = 0
for i in range(len(iris_y)):
if iris_y[i] != dicts[labels[i]]:
error += 1
accuracy = 1 - error / len(iris_y)
print(f"准确度: {accuracy:.2f}") # 准确度: 0.89
# 导入必要的库
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.datasets import load_iris
# 加载数据
iris = load_iris()
iris_X = iris.data
iris_y = iris.target
# 创建K均值模型
kmeans = KMeans(n_clusters=3)
# 拟合模型,注意看这是无监督学习,这里只填写了数据集,没有给标签。
kmeans.fit(iris_X)
# 获取簇中心和簇标签
centers = kmeans.cluster_centers_
labels = kmeans.labels_
print(iris_y)
print(labels)
效果评估
# 使用列表推导式将0、1、2转换成1、0、2
exchange={0:1,1:0,2:2}
exchange_labels = [exchange[i] if i in exchange else i for i in labels]
right = 0
error = 0
for i in zip(exchange_labels,iris_y):
if i[0] == i[1]:
right +=1
else:
error +=1
print('正确率:{}%'.format(right/(right+error)*100))
二维可视化结果
# 选取第1、2特征值与中心点
plt.scatter(iris_X[:, 0], iris_X[:, 1], c=labels)
plt.scatter(centers[:, 0], centers[:,1], c="red", marker="x")
plt.title("Kmeans")
plt.show()
# 选取第3、4项特征值与中心点
plt.scatter(iris_X[:, 2], iris_X[:,3], c=labels)
plt.scatter(centers[:, 2], centers[:,3], c="red", marker="x")
plt.show()
寻找最佳 K
怎样确定 K 的最佳值?
如果我们在每个集群中计算集群中所有点到质心的距离平方和,再将不同集群的距离平方和相加,我们就得到了这个集群方案的总平方和。
我们知道,随着集群数量的增加,总平方和会减少。但是如果用总平方和对 K 作图,你会发现在某个 K 值之前总平方和急速减少,但在这个 K 值之后减少的幅度大大降低,这个值就是最佳的集群数。
from sklearn.model_selection import cross_val_score
from sklearn.neighbors import KNeighborsClassifier
import matplotlib.pyplot as plt
from sklearn.datasets import load_iris
iris = load_iris()
X = iris.data
y = iris.target
k_range = range(1, 31)
k_scores = []
for k in k_range:
knn = KNeighborsClassifier(n_neighbors=k)
# loss = -cross_val_score(knn, X, y, cv=10, scoring='mean_squared_error') # for regression
# 10折交叉验证,对于分类问题,scoring参数默认为accuracy,对于回归问题,默认为r2,或mean_squared_error
# 原理是将数据分成10份,每次取其中一份作为测试集,其余9份作为训练集,进行10次训练和测试,最后取平均值
# 是一种常用的验证分类性能好坏的方法
scores = cross_val_score(knn, X, y, cv=10, scoring='accuracy') # for classification
# .mean()方法用于计算平均值
k_scores.append(scores.mean())
plt.plot(k_range, k_scores)
plt.xlabel('Value of K for KNN')
plt.ylabel('Cross-Validated Accuracy')
plt.show()
练习:颜色量化
描述
图片的颜色数量越多,图片就越难以压缩,图片的大小就越大,因此需要对图片进行颜色量化,减少图片的大小,将图像所需的颜色数量从 96615 减少到 64,同时保持整体外观质量。
图像来源sklearn.datasets.load_sample_image("china.jpg")
题解
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.metrics import pairwise_distances_argmin
from sklearn.datasets import load_sample_image
from sklearn.utils import shuffle
from time import time
n_colors = 64
# Load the Summer Palace photo
china = load_sample_image("china.jpg")
# Convert to floats instead of the default 8 bits integer coding. Dividing by
# 255 is important so that plt.imshow behaves works well on float data (need to
# be in the range [0-1])
china = np.array(china, dtype=np.float64) / 255
# Load Image and transform to a 2D numpy array.
w, h, d = original_shape = tuple(china.shape)
assert d == 3
image_array = np.reshape(china, (w * h, d))
print("Fitting model on a small sub-sample of the data")
t0 = time()
image_array_sample = shuffle(image_array, random_state=0)[:1000]
kmeans = KMeans(n_clusters=n_colors, random_state=0).fit(image_array_sample)
print("done in %0.3fs." % (time() - t0))
# Get labels for all points
print("Predicting color indices on the full image (k-means)")
t0 = time()
labels = kmeans.predict(image_array)
print("done in %0.3fs." % (time() - t0))
codebook_random = shuffle(image_array, random_state=0)[:n_colors]
print("Predicting color indices on the full image (random)")
t0 = time()
labels_random = pairwise_distances_argmin(codebook_random,
image_array,
axis=0)
print("done in %0.3fs." % (time() - t0))
def recreate_image(codebook, labels, w, h):
"""Recreate the (compressed) image from the code book & labels"""
d = codebook.shape[1]
image = np.zeros((w, h, d))
label_idx = 0
for i in range(w):
for j in range(h):
image[i][j] = codebook[labels[label_idx]]
label_idx += 1
return image
# Display all results, alongside original image
plt.figure(1)
plt.clf()
plt.axis('off')
plt.title('Original image (96,615 colors)')
plt.imshow(china)
plt.figure(2)
plt.clf()
plt.axis('off')
plt.title('Quantized image (64 colors, K-Means)')
plt.imshow(recreate_image(kmeans.cluster_centers_, labels, w, h))
plt.figure(3)
plt.clf()
plt.axis('off')
plt.title('Quantized image (64 colors, Random)')
plt.imshow(recreate_image(codebook_random, labels_random, w, h))
DBSCAN算法
这是一种基于密度的聚类算法,用于解决聚类问题的非监督式学习算法。DBSCAN(Density-Based Spatial Clustering of Applications with Noise)能够自动发现任意形状的簇,并识别噪声点,无需预先指定簇的数量。使用之前确保数据量足够多,否则效果不佳。
DBSCAN算法相比K-means的优势:
- 无需预设簇数量:算法自动确定簇的数量
- 发现任意形状的簇:不局限于球形或凸形,能处理复杂形状的数据分布
- 噪声检测能力:能够识别和标记异 常值(噪声点)
- 密度适应性:基于数据点的密度进行聚类,适合密度不均匀的数据集
在理解DBSCAN算法之前,需要先了解几个重要概念:
ε-邻域(Epsilon Neighborhood):给定点p的ε-邻域是指与p距离不超过ε的所有点的集合,记作N_ε(p)。
核心点(Core Point):如果点p的ε-邻域内至少包含min_samples个点(包括p本身),则p是核心点。
边界点(Border Point):不是核心点,但在某个核心点的ε-邻域内的点。
噪声点(Noise Point):既不是核心点,也不是边界点的点,被标记为异常值。
密度直达(Directly Density-Reachable):如果点q在点p的ε-邻域内,且p是核心点,则称q从p密度直达。注意:密度直达关系是不对称的。
密度可达(Density-Reachable):如果存在一个点链p₁, p₂, ..., pₙ,其中p₁=p, pₙ=q,且对于每个i,pᵢ₊₁都从pᵢ密度直达,则称q从p密度可达。
密度相连(Density-Connected):如果存在一个点o,使得p和q都从o密度可达,则称p和q密度相连。密度相连关系是对称的。
算法步骤
DBSCAN算法如何划分集群:
1. 初始化参数:设定两个关键参数 - eps(邻域半径)和 min_samples(最小样本数)。所有点初始标记为-1(噪声点)。
2. 遍历所有未处理的数据点:
- 计算当前点的 eps 邻域内的点数
- 如果邻域内点数 ≥ min_samples,则该点为核心点,进入步骤3
- 如果邻域内点数 < min_samples,则跳过该点(保持为噪声点状态)
3. 从核心点开始扩展簇:
- 为当前核心点创建一个新簇(分配新的簇ID)
- 将当前核心点标记为该簇的成员
- 将其所有邻域内的点加入待处理队列,进入步骤4
4. 递归扩展簇:
- 遍历待处理队列中的每个点:
- 如果该点尚未分类(标记为-1),将其加入当前簇
- 如果该点也是核心点,将其邻域内的所有未处理点加入待处理队列
- 继续直到队列为空
5. 重复步骤2-4:直到所有点都被处理完毕
最终结果:
- 核心点和边界点:被分配到对应的簇ID
- 噪声点:保持-1标记,表示不属于任何簇
核心思想:同一个簇中的所有点都是密度相连的,不同簇之间的点不是密度相连的。
简单示例
- numpy实现
- sklearn实现
from sklearn.datasets import load_iris
from sklearn.preprocessing import StandardScaler
import numpy as np
def dbscan(X, eps=0.5, min_samples=5):
"""
DBSCAN聚类算法
"""
n_points = len(X)
labels = np.full(n_points, -1) # 所有点初始标记为-1
cluster_id = 0
def euclidean_distance(p1, p2):
return np.sqrt(np.sum((p1 - p2) ** 2))
def get_neighbors(point_idx):
neighbors = []
for i in range(n_points):
if euclidean_distance(X[point_idx], X[i]) <= eps:
neighbors.append(i)
return neighbors
for i in range(n_points):
if labels[i] != -1:
continue
neighbors = get_neighbors(i)
if len(neighbors) < min_samples:
continue
labels[i] = cluster_id # 到这里说明是核心点
seed_set = neighbors.copy() # 邻域内的点加入待处理队列
j = 0
while j < len(seed_set):
current_point = seed_set[j]
if labels[current_point] == -1:
labels[current_point] = cluster_id
current_neighbors = get_neighbors(current_point)
if len(current_neighbors) >= min_samples:
for neighbor in current_neighbors:
if neighbor not in seed_set:
seed_set.append(neighbor)
j += 1
cluster_id += 1
return labels
# 加载鸢尾花数据集
iris = load_iris()
X, y_true = iris.data, iris.target
# 数据标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 应用DBSCAN聚类
y_pred = dbscan(X_scaled, eps=0.5, min_samples=5)
from sklearn.datasets import load_iris
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
# 加载鸢尾花数据集
iris = load_iris()
X, y_true = iris.data, iris.target
# 数据标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 创建并训练DBSCAN模型
dbscan = DBSCAN(eps=0.5, min_samples=5)
y_pred = dbscan.fit_predict(X_scaled)