class MyDecisionTree: def __init__(self, *args, **kwargs): # 初始化参数 def fit(self, ...): # 训练模型 def predict(self, ...): # 模型预测 def evaluate(self, ...): # 模型评估 def save_model(self, ...): # 保存模型 def load_model(self, ...): # 加载模型 def show_tree(self, ...): # 绘制决策树
大体思路
1. 创建决策树 My_Decision_Tree 类,类函数 __init__() 初始化参数、fit() 进行决策树模型训练、predict() 进行预测、evaluate() 进行模型评估、save_model() 保存模型 (csv 格式)、load_model() 加载模型、show_tree() 使用 Pillow 库绘制决策树以及其他一些封装的功能函数;
2.最佳划分点的度量通常有 Gini 值、Entropy、Classification error 等,本程序采用 Gini 值,计算方法如下:
Entropy = -6/12 * log2(6/12) - 6/12 * log2(6/12) = 1.0
Gini = 6/12 * (1 - 6/12) + 6/12 * (1 - 6/12) = 0.5
左
Gini(N1) = 5/7 * (1 - 5/7) + 2/7 * (1 - 2/7) = 1 - (5/7)2 - (2/7)2 = 0.408
Gini(N2) = 1/5 * (1 - 1/5) + 4/5 * (1 - 4/5) = 1 - (1/5)2 - (4/5)2 = 0.32 Ginisplit = 7/12 * 0.408 + 5/12 * 0.32 = 0.37
右
Gini(N1) = 4/7 * (1 - 4/7) + 3/7 * (1 - 3/7) = 0.49 Gini(N2) = 2/5 * (1 - 2/5) + 3/5 * (1 - 3/5) = 0.48 Ginisplit = 7/12 * 0.49 + 5/12 * 0.48 = 0.486
构建决策树模型所用到的功能函数 (gini 值计算相关) 说明
- get_col_gini(self, threshold_point, value_series, label_series)
- 计算一列数据在某一阈值划分点下的 gini_split 值
- get_best_point(self, value_series, label_series)
- 对连续型数据进行大小排序, 分别计算切分点及其对应的 gini_split 值, 找到使得 gini_split 值最小的最佳切分点
- get_best_column(self, data, label_series)
- 遍历数据表中的属性列, 计算每列的最佳划分点和 gini_split 值, 并对比找到最适合划分的一列及其对应的划分点
class MyDecisionTree: ... def get_col_gini(self, ...): # 计算一列数据在某一阈值划分点下的 gini_split 值 def get_best_point(self, ...): # 对连续型数据进行大小排序, 分别计算切分点及其对应的 gini_split 值, 找到使得 gini_split 值最小的最佳切分点 def get_best_column(self, ...): # 遍历数据表中的属性列, 计算每列的最佳划分点和 gini_split 值, 并对比找到最适合划分的一列及其对应的划分点
决策树构建 (训练) 的逻辑
- 1. 程序开始以 root 为父节点, 先找到剩余属性中最适合划分的列和划分点, 并将其划分为两个子节点, 记录判断条件和此节点位置. 删除子节点中的该列属性以避免最终决策树模型倾向于某个属性, 利用剩余的属性继续构造决策树
- 2. 判断各个子节点中的标签是否相同, 如果为同一类则移到叶子节点中, 否则将此节点更新到下个流程中的父节点汇总
- 3. 一直循环以上过程, 当父节点为空时结束训练, 如果所有直接点都为叶子节点时, 则决策树完美将数据分类; 当然也会出现所有属性都用完但子节点中标签依旧不唯一的情况, 这时以该节点中个数较多的标签类作为分类结果, 终止模型构建
代码实现
$ cat decision_tree.py
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from PIL import Image
from PIL import ImageDraw
from PIL import ImageFont
class MyDecisionTree(object):
"""决策树框架"""
def __init__(self, arg=None):
## 初始化类参数
self.arg = arg
# 存放决策树中的各层判断条件
self.decision_df = pd.DataFrame(
columns=['parent_depth', 'parent_pos', 'this_depth', 'this_pos', 'column', 'point', 'label'])
# 存放父节点和子节点的 DataFrame
self.parent_node_list = []
self.child_node_list = []
# 存放划分好的叶节点
self.leaf_list = []
# 存放未能划分出来的节点
self.branch_list = []
def fit(self, x_train, y_train):
"""传入训练集和标签构造决策树"""
'''
程序逻辑:
(1) 程序开始以 root 为父节点,先找到剩余属性中最适合划分的列和划分点,并将其划分为两个子节点,
记录判断条件和此节点位置。删除子节点中的该列属性以避免最终决策树模型倾向于某个属性,利用剩余的属性继续构造决策树
(2) 判断各个子节点中的标签是否相同,如果为同一类则移到叶节点中,否则将此节点更新到下个流程中的父节点中
(3) 一直循环以上过程,当父节点为空时结束训练,如果所有子节点都为叶节点时则决策树完美将数据分类;当然也会出现所有属性都用完但子节点
中标签依旧不唯一的情况,这时以该节点中个数较多的标签类作为分类结果,终止模型构建。
'''
x_data = x_train.copy()
if (y_train.name not in x_data.columns):
x_data[y_train.name] = y_train
# 把第一层(原数据)放入父节点列表 parent_node_list
self.parent_node_list.append(x_data)
# 写入第一层的决策 (根节点)
decision = {'this_depth': 1, 'this_pos': 0, 'column': 'root', 'label': '#'}
self.decision_df = self.decision_df.append(decision, ignore_index=True)
# 开始循环计算分类节点
parent_count = 0 # 循环的父节点数
child_pos = 0 # 子节点的位置
depth = 2 # 第几层节点
while True:
parent_node = self.parent_node_list[parent_count]
# 找到第一个适合划分的列和划分点
col_1, point_1 = self.get_best_column(parent_node, parent_node[y_train.name])
print('decision condition:', col_1, point_1)
# 根据条件把父节点划分为两部分
child_node1 = parent_node.loc[parent_node[col_1] <= point_1]
child_node2 = parent_node.loc[parent_node[col_1] > point_1]
# 每一部分的分类结果
result = []
for child_node in [child_node1, child_node2]:
# 删除已使用过的属性
del (child_node[col_1])
# 判断子节点标签是否为一类,是则将其放入叶节点列表中
if (len(child_node[y_train.name].unique()) == 1):
self.leaf_list.append(child_node)
print('添加一个叶节点,标签类型:', child_node[y_train.name].unique()[0], '数据大小:', child_node.shape)
result.append(child_node[y_train.name].unique()[0])
# 判断子节点标签是否还有剩余属性可以作为分类依据,如果有则添加到子节点列表中用于后续分类
elif (child_node.shape[1] != 1):
self.child_node_list.append(child_node)
print('添加一个子节点,数据大小:', child_node.shape)
result.append('#')
# 都不满足说明该节点没有分完但是分不下去了,提示错误
else:
self.branch_list.append(child_node)
print('child_node 节点已用完所有属性,仍然没分出来, 剩余数据大小:')
print(child_node[y_train.name].value_counts())
values = child_node[y_train.name].value_counts()
if (len(values) == 0):
replace = list(parent_node[y_train.name].value_counts().index)[0]
else:
replace = list(values.index)[0]
print('用 %s 作为该条件下的预测结果' % replace)
result.append(replace)
# 找到该父节点在该层中所对应的位置
p_pos_list = self.decision_df.loc[
(self.decision_df['this_depth'] == depth - 1) & (self.decision_df['label'] == '#'), 'this_pos']
p_pos_list = list(p_pos_list)
# print('p_pos_list:', p_pos_list)
# 判断完一个父节点之后, 把判断条件加入 decision_df 中
decision1 = {'parent_depth': depth - 1, 'parent_pos': p_pos_list[parent_count], 'this_depth': depth,
'this_pos': child_pos,
'column': col_1, 'point': point_1, 'label': result[0]}
decision2 = {'parent_depth': depth - 1, 'parent_pos': p_pos_list[parent_count], 'this_depth': depth,
'this_pos': child_pos + 1,
'column': col_1, 'point': point_1, 'label': result[1]}
self.decision_df = self.decision_df.append([decision1, decision2], ignore_index=True)
# 当遍历完父节点列表所有值后,将子节点更新为父节点
child_pos += 2
parent_count += 1
if (parent_count == len(self.parent_node_list)):
parent_count = 0
child_pos = 0
depth += 1
print('该层决策结束, 进行下一层决策\n')
self.parent_node_list = self.child_node_list.copy()
self.child_node_list.clear()
print('更新 parent_node_list, 大小:%d' % len(self.parent_node_list))
# 判断父节点列表中是否还有未分类的节点,如果没有则表示已经全部分好,结束训练
if (len(self.parent_node_list) == 0):
print('决策树构建完成')
# 显示构建好的决策树:判断条件及结果(叶节点)
print(self.decision_df)
break
def predict(self, x_test):
'''输入测试数据进行决策判断'''
y_predict = list()
if (type(x_test) == pd.core.series.Series):
pred = self.get_ylabel(x_test)
y_predict.append(pred)
else:
for index, row in x_test.iterrows():
pred = self.get_ylabel(row)
y_predict.append(pred)
y_predict = np.array(y_predict, dtype=str)
return y_predict
def evaluate(self, x_test, y_test):
"""输入测试集和标签评估决策树准确性,返回 acc"""
y_true = np.array(y_test, dtype=str)
y_pred = self.predict(x_test)
# print(y_pred)
# print(y_true)
label_list = list(self.decision_df['label'].unique())
label_list.remove('#')
label_list = np.array(label_list, dtype=str) # 类型转换
# 创建混淆矩阵 (index 为 true,columns 为 pred)
confusion_matrix = pd.DataFrame(data=0, columns=label_list, index=label_list)
for i in range(len(y_true)):
confusion_matrix.loc[y_true[i], y_pred[i]] += 1
print('混淆矩阵:')
print(confusion_matrix)
# 计算准确率
acc = 0
for i in range(len(label_list)):
acc += confusion_matrix.iloc[i, i]
acc /= len(y_true)
print('acc: %.5f' % acc)
return acc
def save_model(self, path):
"""以 csv 格式保存模型"""
self.decision_df.to_csv(path, index=False)
def load_model(self, path):
"""以csv格式读取模型"""
self.decision_df = pd.read_csv(path)
def get_col_gini(self, threshold_point, value_series, label_series):
"""Gini 值计算函数"""
# 将输入进行重组
df_input = pd.DataFrame()
df_input['value'] = value_series
df_input['label'] = label_series
# print(df_input)
# 设计 Gini 值的计算表格
label_cols = label_series.value_counts()
df_gini = pd.DataFrame(columns=['node1', 'node2'], index=label_cols.index)
for c in label_cols.index:
df_c = df_input.loc[df_input['label'] == c]
df_gini.loc[c, 'node1'] = df_c.loc[df_c['value'] <= threshold_point].shape[0]
df_gini.loc[c, 'node2'] = df_c.loc[df_c['value'] > threshold_point].shape[0]
# 计算 node1、node2 节点 gini 值中和的部分
sum_n1 = df_gini['node1'].sum()
sum_n2 = df_gini['node2'].sum()
# print(df_gini)
# 计算 node 节点 gini 值
gini_n1 = gini_n2 = 0
if (sum_n1 == 0):
for c in label_cols.index:
gini_n2 += (df_gini.loc[c, 'node2'] / sum_n2) ** 2
elif (sum_n2 == 0):
for c in label_cols.index:
gini_n1 += (df_gini.loc[c, 'node1'] / sum_n1) ** 2
else:
for c in label_cols.index:
gini_n1 += (df_gini.loc[c, 'node1'] / sum_n1) ** 2
gini_n2 += (df_gini.loc[c, 'node2'] / sum_n2) ** 2
gini_n1 = 1 - gini_n1
gini_n2 = 1 - gini_n2
# 计算 gini_split
gini_split = sum_n1 / (sum_n1 + sum_n2) * gini_n1 + sum_n2 / (sum_n1 + sum_n2) * gini_n2
# print("point: %f, gini_split: %f" %(threshold_point, gini_split))
return gini_split
def get_best_point(self, value_series, label_series):
"""找到一列属性中最适合划分 (gini 值最小) 的点"""
value_array = np.array(value_series)
value_array = np.sort(value_array)
df_point = pd.DataFrame(columns=['point', 'gini_value'])
# 循环属性值列,计算划分点及其 gini 值并添加至 df_point 数据表中
for i in range(len(value_array) + 1):
if (i == 0):
point = value_array[i] - 1
elif (i == len(value_array)):
point = value_array[i - 1]
else:
point = 0.5 * (value_array[i] + value_array[i - 1])
gini = self.get_col_gini(point, value_series, label_series)
s = pd.Series(data={'point': point, 'gini_value': gini})
df_point.loc[i] = s
df_point.sort_values(by='gini_value', inplace=True)
best_point = df_point.iloc[0, 0]
best_gini = df_point.iloc[0, 1]
# print("best point for column '%s': %f" %(value_series.name, best_point))
# print(df_point)
return best_point, best_gini
def get_best_column(self, data, label_series):
"""遍历 data 中的属性列,计算其最佳划分点及 gini 值,找出最适合划分的一列和划分点"""
x_data = data.copy()
if (label_series.name in x_data.columns):
del (x_data[label_series.name])
gini_columns = pd.DataFrame(columns=['point', 'gini'], index=x_data.columns)
for col_name in x_data.columns:
point, gini = self.get_best_point(x_data[col_name], label_series)
s = pd.Series({'point': point, 'gini': gini})
gini_columns.loc[col_name] = [point, gini]
# gini_columns = gini_columns.append(s, ignore_index=True) # append 会更改索引
gini_columns.sort_values(by='gini', inplace=True)
# print(gini_columns)
best_col = gini_columns.index[0]
best_point = gini_columns.iloc[0, 0]
return best_col, best_point
def get_ylabel(self, x_series):
"""计算一行 x 数据 (Series) 对应的标签 """
model = self.decision_df
y_pred = '#'
x_index = 1
parent_index = []
child_df = pd.DataFrame()
# for i in range(1):
while (y_pred == '#'):
# 判断条件
condition = [model.loc[x_index, 'column'], model.loc[x_index, 'point']]
if (x_series[condition[0]] > condition[1]):
x_index += 1
# print('%s>%f' %(condition[0],condition[1]))
# else:
# print('%s<=%f' %(condition[0],condition[1]))
y_pred = model.loc[x_index, 'label']
# 更新父节点索引并找到其子节点
parent_index = [model.loc[x_index, 'this_depth'], model.loc[x_index, 'this_pos']]
child_df = model.loc[(model['parent_depth'] == parent_index[0]) & (model['parent_pos'] == parent_index[1])]
# 找到标签时结束
if (child_df.shape[0] != 0):
x_index = list(child_df.index)[0]
# print('跳到第%d行继续判断' %x_index)
# else:
# print('预测结束')
# print('pred:', y_pred)
return y_pred
def show_tree(self):
"""将决策树进行可视化"""
def add_text(im_draw, text_str, xy, multiline=1):
'''在绘图对象的某个位置添加文字'''
# 设置大小
font_h, font_w = 25, 14
font_h *= multiline
text_len = round(len(text_str) / multiline)
font = ImageFont.truetype('Times.ttc', size=20)
im_draw.text(xy=(xy[0] - font_w * 3, xy[1]), text=text_str, font=font, fill='black', align='center')
# 绘制矩形
# im_draw.rectangle(xy=(xy[0],xy[1],xy[0]+font_w*text_len,xy[1]+font_h),outline='black',width=2)
interval_x, interval_y = 60, 80
model = self.decision_df.copy()
model['x_pos'] = model['this_pos']
model['y_pos'] = (model['this_depth'] - 1) * interval_y
model['text'] = 'text'
max_depth = model.iloc[-1, 2]
# 创建图像
img_w, img_h = 1500, 600
tree_img = Image.new(mode='RGB', size=(img_w, img_h), color='white')
draw = ImageDraw.Draw(tree_img) # 创建绘图对象
parent_pos = []
parent_x_pos = 0
x_pos = 0
for x_index in model.index:
text = model.loc[x_index, 'column']
if (str(model.loc[x_index, 'point']) == 'nan'):
x_pos = img_w / 4
else:
# 跟新text内容和x位置
model.loc[x_index, 'x_pos'] = x_pos
parent_pos = [model.loc[x_index, 'parent_depth'], model.loc[x_index, 'parent_pos']]
parent_x_pos = model.loc[
(model['this_depth'] == parent_pos[0]) & (model['this_pos'] == parent_pos[1]), 'x_pos']
depth = model.loc[x_index, 'this_depth'] - 1
if (model.loc[x_index, 'this_pos'] % 2 == 0):
text += '\n<=' + ('%.3f' % model.loc[x_index, 'point'])
x_pos = parent_x_pos - interval_x * np.sqrt(max_depth - depth)
else:
text += '\n>' + ('%.3f' % model.loc[x_index, 'point'])
x_pos = parent_x_pos + interval_x * np.sqrt(max_depth - depth)
x_pos = x_pos.iloc[0]
if (model.loc[x_index, 'label'] != '#'):
text += '\nClass:' + str(model.loc[x_index, 'label'])
# 将文字和位置添加到
model.loc[x_index, 'text'] = text
model.loc[x_index, 'x_pos'] = x_pos
# 调整节点横坐标位置
gap = 140
for depth in model['this_depth'].unique():
if (depth != 1):
same_depth = model.loc[model['this_depth'] == depth]
for x_index in same_depth.index[:-1]:
if (x_index == same_depth.index[0]):
if ((model.loc[same_depth.index[0], 'x_pos'] - model.loc[
same_depth.index[-1], 'x_pos']) < gap * len(same_depth.index)):
# 如果整体太挤,整层先往左移一段
for i in same_depth.index:
model.loc[i, 'x_pos'] -= gap * len(same_depth.index) / 8
# 如果相邻两个靠太近,右边的往右移一点
if ((model.loc[x_index + 1, 'x_pos'] - model.loc[x_index, 'x_pos']) < gap):
model.loc[x_index + 1, 'x_pos'] = model.loc[x_index, 'x_pos'] + gap
# model.loc[x_index,'x_pos']-=gap/2
# 绘制文字和线
this_img_pos = []
parent_img_pos = []
for x_index in model.index:
# 绘制直线
if (x_index != 0):
this_img_pos = [model.loc[x_index, 'x_pos'], model.loc[x_index, 'y_pos']]
parent_pos = [model.loc[x_index, 'parent_depth'], model.loc[x_index, 'parent_pos']]
parent_img_pos = model.loc[
(model['this_depth'] == parent_pos[0]) & (model['this_pos'] == parent_pos[1]), ['x_pos', 'y_pos']]
parent_img_pos = [parent_img_pos.iloc[0, 0], parent_img_pos.iloc[0, 1]]
draw.line(xy=(parent_img_pos[0], parent_img_pos[1], this_img_pos[0], this_img_pos[1]), fill='gray',
width=1)
# 添加文字
this_pos = (model.loc[x_index, 'x_pos'], model.loc[x_index, 'y_pos'])
text = model.loc[x_index, 'text']
add_text(im_draw=draw, text_str=text, xy=this_pos)
# 显示图片
tree_img.show()
if __name__ == '__main__':
# 读取文件
data = pd.read_csv('./paras_labels.csv')
# 数据按 8:2 进行训练集、测试集切分
x_train, x_test, y_train, y_test = train_test_split(data, data['label'], test_size=0.2, random_state=7)
ds_tree = MyDecisionTree()
ds_tree.fit(x_train, y_train)
# ds_tree.save_model('my_decision_tree%d.csv' %e)
# ds_tree.load_model('my_decision_tree%d.csv' %e)
# print(ds_tree.decision_df)
print('训练集评估模型: ')
ds_tree.evaluate(x_train, y_train)
print('测试集评估模型: ')
ds_tree.evaluate(x_test, y_test)
ds_tree.show_tree()
~ cat paras_labels.csv edu,exp,label 1,69,0 3,89,1 2,97,1 3,78,1 2,70,0 1,99,1 2,90,1 2,87,1 1,50,0 3,70,1 1,60,0 2,60,0 3,60,1 1,90,1 2,90,1 3,90,1 1,66,0 1,56,0 2,89,1 3,78,1
结果
decision condition: exp 74.0 添加一个子节点,数据大小: (8, 2) 添加一个叶节点,标签类型: 1 数据大小: (8, 2) 该层决策结束, 进行下一层决策 更新 parent_node_list, 大小:1 decision condition: edu 2.0 添加一个叶节点,标签类型: 0 数据大小: (6, 1) 添加一个叶节点,标签类型: 1 数据大小: (2, 1) 该层决策结束, 进行下一层决策 更新 parent_node_list, 大小:0 决策树构建完成 parent_depth parent_pos this_depth this_pos column point label 0 NaN NaN 1 0 root NaN # 1 1 0 2 0 exp 74.0 # 2 1 0 2 1 exp 74.0 1 3 2 0 3 0 edu 2.0 0 4 2 0 3 1 edu 2.0 1 训练集评估模型: 混淆矩阵: 1 0 1 10 0 0 0 6 acc:1.00000 测试集评估模型: 混淆矩阵: 1 0 1 3 0 0 0 1 acc:1.00000
reference
https://blog.csdn.net/weixin_42444020/article/details/120959947
https://github.com/scikit-learn/scikit-learn/tree/main/sklearn/tree