二分K-means及其python代码实现

it2023-05-03  73

1. 二分K-means算法

     算法描述

     该算法首先将所有点作为一个簇,然后把这个簇一分为二。再选择其中一个簇继续进行划分,选择哪一个簇继续进行划分取决于对其划分是否可以最大程度降低SSE的值。该划分过程一直重复,直至划分的簇的数目与用户指定的簇数目k相等。

     伪代码描述

2. python实现二分K-means算法

加载数据集 首先,需要准备一个数据集。这里我是从文件加载数据集,此处附上该文件的网盘下载地址:testSet2数据集            提取码:tmac def load_data_set(fileName): """加载数据集""" dataSet = [] # 初始化一个空列表 fr = open(fileName) for line in fr.readlines(): # 按tab分割字段,将每行元素分割为list的元素 curLine = line.strip().split('\t') # 用list函数把map函数返回的迭代器遍历展开成一个列表 # 其中map(float, curLine)表示把列表的每个值用float函数转成float型,并返回迭代器 fltLine = list(map(float, curLine)) dataSet.append(fltLine) return dataSet 计算距离函数 此处采用的距离计算公式是欧氏距离 def distance_euclidean(vector1, vector2): """计算欧氏距离""" return sqrt(sum(power(vector1-vector2, 2))) # 返回两个向量的距离 构建簇质心 为给定的数据集构建一个包含k个随机质心的集合。随机质心要在整个数据集的边界内,可以通过找到数据集每一维的最小和最大值来完成。然后生成0到1.0之间的随机数并通过取值范围和最小值,确保随机点在数据的边界之内。 def rand_center(dataSet, k): """构建一个包含K个随机质心的集合""" n = shape(dataSet)[1] # 获取样本特征值 # 初始化质心,创建(k,n)个以0填充的矩阵 centroids = mat(zeros((k, n))) # 每个质心有n个坐标值,总共要k个质心 # 遍历特征值 for j in range(n): # 计算每一列的最小值 minJ = min(dataSet[:, j]) # 计算每一列的范围值 rangeJ = float(max(dataSet[:, j]) - minJ) # 计算每一列的质心,并将其赋给centroids centroids[:, j] = minJ + rangeJ * random.rand(k, 1) return centroids # 返回质心 k-means函数 def k_means(dataSet, k, distMeas = distance_euclidean, creatCent = rand_center): """K-means聚类算法""" m = shape(dataSet)[0] # 行数 # 建立簇分配结果矩阵,第一列存放该数据所属中心点,第二列是该数据到中心点的距离 clusterAssment = mat(zeros((m, 2))) centroids = creatCent(dataSet, k) # 质心,即聚类点 # 用来判定聚类是否收敛 clusterChanged = True while clusterChanged: clusterChanged = False for i in range(m): # 把每一个数据划分到离他最近的中心点 minDist = inf # 无穷大 minIndex = -1 #初始化 for j in range(k): # 计算各点与新的聚类中心的距离 distJI = distMeas(centroids[j,:],dataSet[i,:]) if distJI < minDist: # 如果第i个数据点到第j中心点更近,则将i归属为j minDist = distJI minIndex = j # 如果分配发生变化,则需要继续迭代 if clusterAssment[i, 0] != minIndex: clusterChanged = True # 并将第i个数据点的分配情况存入字典 clusterAssment[i, :] = minIndex, minDist**2 # print(centroids) for cent in range(k): # 重新计算中心点 # 去第一列等于cent的所有列 ptsInClust = dataSet[nonzero(clusterAssment[:, 0].A == cent)[0]] # 算出这些数据的中心点 centroids[cent, :] = mean(ptsInClust, axis=0) return centroids, clusterAssment 二分k-means函数 def biKmeans(dataMat, k, distMeas=distance_euclidean): """二分k-means算法""" m = shape(dataMat)[0] # 创建一个矩阵来存储数据集中每个点的簇分配结果及平方误差 clusterAssment = mat(zeros((m, 2))) # 根据数据集均值获取第一个质心 centroid0 = mean(dataMat, axis=0).tolist()[0] # 用一个列表来保留所有的质心 centList = [centroid0] # 遍历数据集中所有点来计算每个点到质心的距离 for j in range(m): clusterAssment[j, 1] = distMeas(mat(centroid0), dataMat[j, :]) ** 2 # 对簇不停的进行划分,直到得到想要的簇数目为止 while (len(centList) < k): # 初始化最小SSE为无穷大,用于比较划分前后的SSE lowestSSE = inf # 无穷大 # 通过考察簇列表中的值来获得当前簇的数目,遍历所有的簇来决定最佳的簇进行划分 for i in range(len(centList)): # 对每一个簇,将该簇中的所有点看成一个小的数据集 ptsInCurrCluster = dataMat[nonzero(clusterAssment[:, 0].A == i)[0], :] # 将ptsInCurrCluster输入到函数kMeans中进行处理,k=2, # kMeans会生成两个质心(簇),同时给出每个簇的误差值 centroidMat, splitClustAss = k_means(ptsInCurrCluster, 2, distMeas) # 划分数据的SSE与未划分的之和作为本次划分的总误差 sseSplit = sum(splitClustAss[:, 1]) # 划分数据集的SSE sseNotSplit = sum(clusterAssment[nonzero(clusterAssment[:, 0].A != i)[0], 1]) # 未划分数据集的SSE print('划分数据集的SSE, and 未划分的SSE: ', sseSplit, sseNotSplit) # 将划分与未划分的SSE求和与最小SSE相比较 确定是否划分 if (sseSplit + sseNotSplit) < lowestSSE: bestCentToSplit = i # 当前最适合做划分的中心点 bestNewCents = centroidMat # 划分后的两个新中心点 bestClustAss = splitClustAss.copy() # 划分点的聚类信息 lowestSSE = sseSplit + sseNotSplit # 找出最好的簇分配结果 # 调用kmeans函数并且指定簇数为2时,会得到两个编号分别为0和1的结果簇 bestClustAss[nonzero(bestClustAss[:, 0].A == 1)[0], 0] = len(centList) # 更新为最佳质心 bestClustAss[nonzero(bestClustAss[:, 0].A == 0)[0], 0] = bestCentToSplit print('本次最适合划分的质心: ', bestCentToSplit) print('被划分数据集样本数量: ', len(bestClustAss)) # 更新质心列表 # 更新原质心list中的第i个质心为使用二分kMeans后bestNewCents的第一个质心 centList[bestCentToSplit] = bestNewCents[0, :].tolist()[0] # 添加bestNewCents的第二个质心 centList.append(bestNewCents[1, :].tolist()[0]) # 重新分配最好簇下的数据(质心)以及SSE clusterAssment[nonzero(clusterAssment[:, 0].A == bestCentToSplit)[0], :] = bestClustAss return mat(centList), clusterAssment 测试 datMat = mat(load_data_set('testSet2.txt')) centList, clusterAssment = biKmeans(datMat, 3) print("质心结果:", centList) print("聚类结果:", clusterAssment)

运行结果:

画图 datMat = mat(load_data_set('testSet2.txt')) centList, clusterAssment = biKmeans(datMat, 3) # 可视化 plt.scatter(array(datMat)[:, 0], array(datMat)[:, 1], c=array(clusterAssment)[:, 0].T) plt.scatter(centList[:, 0].tolist(), centList[:, 1].tolist(), c="r") plt.show()

效果图:

完整代码 from numpy import * from matplotlib import pyplot as plt def load_data_set(fileName): """加载数据集""" dataSet = [] # 初始化一个空列表 fr = open(fileName) for line in fr.readlines(): # 按tab分割字段,将每行元素分割为list的元素 curLine = line.strip().split('\t') # 用list函数把map函数返回的迭代器遍历展开成一个列表 # 其中map(float, curLine)表示把列表的每个值用float函数转成float型,并返回迭代器 fltLine = list(map(float, curLine)) dataSet.append(fltLine) return dataSet def distance_euclidean(vector1, vector2): """计算欧氏距离""" return sqrt(sum(power(vector1-vector2, 2))) # 返回两个向量的距离 def rand_center(dataSet, k): """构建一个包含K个随机质心的集合""" n = shape(dataSet)[1] # 获取样本特征值 # 初始化质心,创建(k,n)个以0填充的矩阵 centroids = mat(zeros((k, n))) # 每个质心有n个坐标值,总共要k个质心 # 遍历特征值 for j in range(n): # 计算每一列的最小值 minJ = min(dataSet[:, j]) # 计算每一列的范围值 rangeJ = float(max(dataSet[:, j]) - minJ) # 计算每一列的质心,并将其赋给centroids centroids[:, j] = minJ + rangeJ * random.rand(k, 1) return centroids # 返回质心 def k_means(dataSet, k, distMeas = distance_euclidean, creatCent = rand_center): """K-means聚类算法""" m = shape(dataSet)[0] # 行数 # 建立簇分配结果矩阵,第一列存放该数据所属中心点,第二列是该数据到中心点的距离 clusterAssment = mat(zeros((m, 2))) centroids = creatCent(dataSet, k) # 质心,即聚类点 # 用来判定聚类是否收敛 clusterChanged = True while clusterChanged: clusterChanged = False for i in range(m): # 把每一个数据划分到离他最近的中心点 minDist = inf # 无穷大 minIndex = -1 #初始化 for j in range(k): # 计算各点与新的聚类中心的距离 distJI = distMeas(centroids[j,:],dataSet[i,:]) if distJI < minDist: # 如果第i个数据点到第j中心点更近,则将i归属为j minDist = distJI minIndex = j # 如果分配发生变化,则需要继续迭代 if clusterAssment[i, 0] != minIndex: clusterChanged = True # 并将第i个数据点的分配情况存入字典 clusterAssment[i, :] = minIndex, minDist**2 # print(centroids) for cent in range(k): # 重新计算中心点 # 去第一列等于cent的所有列 ptsInClust = dataSet[nonzero(clusterAssment[:, 0].A == cent)[0]] # 算出这些数据的中心点 centroids[cent, :] = mean(ptsInClust, axis=0) return centroids, clusterAssment def biKmeans(dataMat, k, distMeas=distance_euclidean): """二分k-means算法""" m = shape(dataMat)[0] # 创建一个矩阵来存储数据集中每个点的簇分配结果及平方误差 clusterAssment = mat(zeros((m, 2))) # 根据数据集均值获取第一个质心 centroid0 = mean(dataMat, axis=0).tolist()[0] # 用一个列表来保留所有的质心 centList = [centroid0] # 遍历数据集中所有点来计算每个点到质心的距离 for j in range(m): clusterAssment[j, 1] = distMeas(mat(centroid0), dataMat[j, :]) ** 2 # 对簇不停的进行划分,直到得到想要的簇数目为止 while (len(centList) < k): # 初始化最小SSE为无穷大,用于比较划分前后的SSE lowestSSE = inf # 无穷大 # 通过考察簇列表中的值来获得当前簇的数目,遍历所有的簇来决定最佳的簇进行划分 for i in range(len(centList)): # 对每一个簇,将该簇中的所有点看成一个小的数据集 ptsInCurrCluster = dataMat[nonzero(clusterAssment[:, 0].A == i)[0], :] # 将ptsInCurrCluster输入到函数kMeans中进行处理,k=2, # kMeans会生成两个质心(簇),同时给出每个簇的误差值 centroidMat, splitClustAss = k_means(ptsInCurrCluster, 2, distMeas) # 划分数据的SSE与未划分的之和作为本次划分的总误差 sseSplit = sum(splitClustAss[:, 1]) # 划分数据集的SSE sseNotSplit = sum(clusterAssment[nonzero(clusterAssment[:, 0].A != i)[0], 1]) # 未划分数据集的SSE print('划分数据集的SSE, and 未划分的SSE: ', sseSplit, sseNotSplit) # 将划分与未划分的SSE求和与最小SSE相比较 确定是否划分 if (sseSplit + sseNotSplit) < lowestSSE: bestCentToSplit = i # 当前最适合做划分的中心点 bestNewCents = centroidMat # 划分后的两个新中心点 bestClustAss = splitClustAss.copy() # 划分点的聚类信息 lowestSSE = sseSplit + sseNotSplit # 找出最好的簇分配结果 # 调用kmeans函数并且指定簇数为2时,会得到两个编号分别为0和1的结果簇 bestClustAss[nonzero(bestClustAss[:, 0].A == 1)[0], 0] = len(centList) # 更新为最佳质心 bestClustAss[nonzero(bestClustAss[:, 0].A == 0)[0], 0] = bestCentToSplit print('本次最适合划分的质心: ', bestCentToSplit) print('被划分数据集样本数量: ', len(bestClustAss)) # 更新质心列表 # 更新原质心list中的第i个质心为使用二分kMeans后bestNewCents的第一个质心 centList[bestCentToSplit] = bestNewCents[0, :].tolist()[0] # 添加bestNewCents的第二个质心 centList.append(bestNewCents[1, :].tolist()[0]) # 重新分配最好簇下的数据(质心)以及SSE clusterAssment[nonzero(clusterAssment[:, 0].A == bestCentToSplit)[0], :] = bestClustAss return mat(centList), clusterAssment # 测试 datMat = mat(load_data_set('testSet2.txt')) centList, clusterAssment = biKmeans(datMat, 3) print("质心结果:", centList) print("聚类结果:", clusterAssment) # 可视化 plt.scatter(array(datMat)[:, 0], array(datMat)[:, 1], c=array(clusterAssment)[:, 0].T) plt.scatter(centList[:, 0].tolist(), centList[:, 1].tolist(), c="r") plt.show()

3. 小结

      由上面的运行结果可以看到,该算法的聚类会收敛到全局最小值,相较于单纯的k-means算法有更好的聚类效果。

最新回复(0)