Python 手写实现决策树算法




class MyDecisionTree:

    def __init__(self, *args, **kwargs):
        # 初始化参数

    def fit(self, ...):
        # 训练模型

    def predict(self, ...):
        # 模型预测

    def evaluate(self, ...):
        # 模型评估

    def save_model(self, ...): 
        # 保存模型

    def load_model(self, ...):
        # 加载模型

    def show_tree(self, ...):
        # 绘制决策树


大体思路

1. 创建决策树 My_Decision_Tree 类,类函数 __init__() 初始化参数、fit() 进行决策树模型训练、predict() 进行预测、evaluate() 进行模型评估、save_model() 保存模型 (csv 格式)、load_model() 加载模型、show_tree() 使用 Pillow 库绘制决策树以及其他一些封装的功能函数;

2.最佳划分点的度量通常有 Gini 值、Entropy、Classification error 等,本程序采用 Gini 值,计算方法如下:



Entropy = -6/12 * log2(6/12) - 6/12 * log2(6/12) = 1.0

Gini = 6/12 * (1 - 6/12) + 6/12 * (1 - 6/12) = 0.5


Gini(N1) = 5/7 * (1 - 5/7) + 2/7 * (1 - 2/7) = 1 - (5/7)2 - (2/7)2 = 0.408
Gini(N2) = 1/5 * (1 - 1/5) + 4/5 * (1 - 4/5) = 1 - (1/5)2 - (4/5)2 = 0.32 Ginisplit = 7/12 * 0.408 + 5/12 * 0.32 = 0.37



Gini(N1) = 4/7 * (1 - 4/7) + 3/7 * (1 - 3/7) = 0.49
Gini(N2) = 2/5 * (1 - 2/5) + 3/5 * (1 - 3/5) = 0.48
Ginisplit = 7/12 * 0.49 + 5/12 * 0.48 = 0.486

 




构建决策树模型所用到的功能函数 (gini 值计算相关) 说明


  • get_col_gini(self, threshold_point, value_series, label_series) 
    • 计算一列数据在某一阈值划分点下的 gini_split 值
  • get_best_point(self, value_series, label_series)
    • 对连续型数据进行大小排序, 分别计算切分点及其对应的 gini_split 值, 找到使得 gini_split 值最小的最佳切分点
  • get_best_column(self, data, label_series)
    • 遍历数据表中的属性列, 计算每列的最佳划分点和 gini_split 值, 并对比找到最适合划分的一列及其对应的划分点


class MyDecisionTree:
    ...

    def get_col_gini(self, ...):
        # 计算一列数据在某一阈值划分点下的 gini_split 值

    def get_best_point(self, ...):
        # 对连续型数据进行大小排序, 分别计算切分点及其对应的 gini_split 值, 找到使得 gini_split 值最小的最佳切分点

    def get_best_column(self, ...):
        # 遍历数据表中的属性列, 计算每列的最佳划分点和 gini_split 值, 并对比找到最适合划分的一列及其对应的划分点



决策树构建 (训练) 的逻辑


  • 1. 程序开始以 root 为父节点, 先找到剩余属性中最适合划分的列和划分点, 并将其划分为两个子节点, 记录判断条件和此节点位置. 删除子节点中的该列属性以避免最终决策树模型倾向于某个属性, 利用剩余的属性继续构造决策树
  • 2. 判断各个子节点中的标签是否相同, 如果为同一类则移到叶子节点中, 否则将此节点更新到下个流程中的父节点汇总
  • 3. 一直循环以上过程, 当父节点为空时结束训练, 如果所有直接点都为叶子节点时, 则决策树完美将数据分类; 当然也会出现所有属性都用完但子节点中标签依旧不唯一的情况, 这时以该节点中个数较多的标签类作为分类结果, 终止模型构建




代码实现


$ cat decision_tree.py
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from PIL import Image
from PIL import ImageDraw
from PIL import ImageFont

class MyDecisionTree(object):
    """决策树框架"""

    def __init__(self, arg=None):
        ## 初始化类参数
        self.arg = arg

        # 存放决策树中的各层判断条件
        self.decision_df = pd.DataFrame(
            columns=['parent_depth', 'parent_pos', 'this_depth', 'this_pos', 'column', 'point', 'label'])
        # 存放父节点和子节点的 DataFrame
        self.parent_node_list = []
        self.child_node_list = []
        # 存放划分好的叶节点
        self.leaf_list = []
        # 存放未能划分出来的节点
        self.branch_list = []

    def fit(self, x_train, y_train):
        """传入训练集和标签构造决策树"""

        '''
        程序逻辑:
        (1) 程序开始以 root 为父节点,先找到剩余属性中最适合划分的列和划分点,并将其划分为两个子节点,
            记录判断条件和此节点位置。删除子节点中的该列属性以避免最终决策树模型倾向于某个属性,利用剩余的属性继续构造决策树
        (2) 判断各个子节点中的标签是否相同,如果为同一类则移到叶节点中,否则将此节点更新到下个流程中的父节点中
        (3) 一直循环以上过程,当父节点为空时结束训练,如果所有子节点都为叶节点时则决策树完美将数据分类;当然也会出现所有属性都用完但子节点
            中标签依旧不唯一的情况,这时以该节点中个数较多的标签类作为分类结果,终止模型构建。
        '''

        x_data = x_train.copy()
        if (y_train.name not in x_data.columns):
            x_data[y_train.name] = y_train

        # 把第一层(原数据)放入父节点列表 parent_node_list
        self.parent_node_list.append(x_data)
        # 写入第一层的决策 (根节点)
        decision = {'this_depth': 1, 'this_pos': 0, 'column': 'root', 'label': '#'}
        self.decision_df = self.decision_df.append(decision, ignore_index=True)
        # 开始循环计算分类节点
        parent_count = 0  # 循环的父节点数
        child_pos = 0  # 子节点的位置
        depth = 2  # 第几层节点
        while True:
            parent_node = self.parent_node_list[parent_count]
            # 找到第一个适合划分的列和划分点
            col_1, point_1 = self.get_best_column(parent_node, parent_node[y_train.name])
            print('decision condition:', col_1, point_1)

            # 根据条件把父节点划分为两部分
            child_node1 = parent_node.loc[parent_node[col_1] <= point_1]
            child_node2 = parent_node.loc[parent_node[col_1] > point_1]
            # 每一部分的分类结果
            result = []
            for child_node in [child_node1, child_node2]:
                # 删除已使用过的属性
                del (child_node[col_1])

                # 判断子节点标签是否为一类,是则将其放入叶节点列表中
                if (len(child_node[y_train.name].unique()) == 1):
                    self.leaf_list.append(child_node)
                    print('添加一个叶节点,标签类型:', child_node[y_train.name].unique()[0], '数据大小:', child_node.shape)
                    result.append(child_node[y_train.name].unique()[0])

                # 判断子节点标签是否还有剩余属性可以作为分类依据,如果有则添加到子节点列表中用于后续分类
                elif (child_node.shape[1] != 1):
                    self.child_node_list.append(child_node)
                    print('添加一个子节点,数据大小:', child_node.shape)
                    result.append('#')
                # 都不满足说明该节点没有分完但是分不下去了,提示错误
                else:
                    self.branch_list.append(child_node)
                    print('child_node 节点已用完所有属性,仍然没分出来, 剩余数据大小:')
                    print(child_node[y_train.name].value_counts())

                    values = child_node[y_train.name].value_counts()
                    if (len(values) == 0):
                        replace = list(parent_node[y_train.name].value_counts().index)[0]
                    else:
                        replace = list(values.index)[0]
                    print('用 %s 作为该条件下的预测结果' % replace)
                    result.append(replace)

            # 找到该父节点在该层中所对应的位置
            p_pos_list = self.decision_df.loc[
                (self.decision_df['this_depth'] == depth - 1) & (self.decision_df['label'] == '#'), 'this_pos']
            p_pos_list = list(p_pos_list)
            # print('p_pos_list:', p_pos_list)

            # 判断完一个父节点之后, 把判断条件加入 decision_df 中
            decision1 = {'parent_depth': depth - 1, 'parent_pos': p_pos_list[parent_count], 'this_depth': depth,
                         'this_pos': child_pos,
                         'column': col_1, 'point': point_1, 'label': result[0]}
            decision2 = {'parent_depth': depth - 1, 'parent_pos': p_pos_list[parent_count], 'this_depth': depth,
                         'this_pos': child_pos + 1,
                         'column': col_1, 'point': point_1, 'label': result[1]}
            self.decision_df = self.decision_df.append([decision1, decision2], ignore_index=True)

            # 当遍历完父节点列表所有值后,将子节点更新为父节点
            child_pos += 2
            parent_count += 1
            if (parent_count == len(self.parent_node_list)):
                parent_count = 0
                child_pos = 0
                depth += 1
                print('该层决策结束, 进行下一层决策\n')
                self.parent_node_list = self.child_node_list.copy()
                self.child_node_list.clear()
                print('更新 parent_node_list, 大小:%d' % len(self.parent_node_list))

                # 判断父节点列表中是否还有未分类的节点,如果没有则表示已经全部分好,结束训练
                if (len(self.parent_node_list) == 0):
                    print('决策树构建完成')
                    # 显示构建好的决策树:判断条件及结果(叶节点)
                    print(self.decision_df)

                    break

    def predict(self, x_test):
        '''输入测试数据进行决策判断'''
        y_predict = list()

        if (type(x_test) == pd.core.series.Series):
            pred = self.get_ylabel(x_test)
            y_predict.append(pred)
        else:
            for index, row in x_test.iterrows():
                pred = self.get_ylabel(row)
                y_predict.append(pred)

        y_predict = np.array(y_predict, dtype=str)

        return y_predict

    def evaluate(self, x_test, y_test):
        """输入测试集和标签评估决策树准确性,返回 acc"""

        y_true = np.array(y_test, dtype=str)
        y_pred = self.predict(x_test)
        # print(y_pred)
        # print(y_true)
        label_list = list(self.decision_df['label'].unique())
        label_list.remove('#')
        label_list = np.array(label_list, dtype=str)  # 类型转换
        # 创建混淆矩阵 (index 为 true,columns 为 pred)
        confusion_matrix = pd.DataFrame(data=0, columns=label_list, index=label_list)
        for i in range(len(y_true)):
            confusion_matrix.loc[y_true[i], y_pred[i]] += 1
        print('混淆矩阵:')
        print(confusion_matrix)
        # 计算准确率
        acc = 0
        for i in range(len(label_list)):
            acc += confusion_matrix.iloc[i, i]
        acc /= len(y_true)
        print('acc: %.5f' % acc)

        return acc

    def save_model(self, path):
        """以 csv 格式保存模型"""

        self.decision_df.to_csv(path, index=False)

    def load_model(self, path):
        """以csv格式读取模型"""

        self.decision_df = pd.read_csv(path)

    def get_col_gini(self, threshold_point, value_series, label_series):
        """Gini 值计算函数"""

        # 将输入进行重组
        df_input = pd.DataFrame()
        df_input['value'] = value_series
        df_input['label'] = label_series
        # print(df_input)
        # 设计 Gini 值的计算表格
        label_cols = label_series.value_counts()
        df_gini = pd.DataFrame(columns=['node1', 'node2'], index=label_cols.index)

        for c in label_cols.index:
            df_c = df_input.loc[df_input['label'] == c]
            df_gini.loc[c, 'node1'] = df_c.loc[df_c['value'] <= threshold_point].shape[0]
            df_gini.loc[c, 'node2'] = df_c.loc[df_c['value'] > threshold_point].shape[0]

        # 计算 node1、node2 节点 gini 值中和的部分
        sum_n1 = df_gini['node1'].sum()
        sum_n2 = df_gini['node2'].sum()
        # print(df_gini)

        # 计算 node 节点 gini 值
        gini_n1 = gini_n2 = 0
        if (sum_n1 == 0):
            for c in label_cols.index:
                gini_n2 += (df_gini.loc[c, 'node2'] / sum_n2) ** 2
        elif (sum_n2 == 0):
            for c in label_cols.index:
                gini_n1 += (df_gini.loc[c, 'node1'] / sum_n1) ** 2
        else:
            for c in label_cols.index:
                gini_n1 += (df_gini.loc[c, 'node1'] / sum_n1) ** 2
                gini_n2 += (df_gini.loc[c, 'node2'] / sum_n2) ** 2
        gini_n1 = 1 - gini_n1
        gini_n2 = 1 - gini_n2
        # 计算 gini_split
        gini_split = sum_n1 / (sum_n1 + sum_n2) * gini_n1 + sum_n2 / (sum_n1 + sum_n2) * gini_n2
        # print("point: %f, gini_split: %f" %(threshold_point, gini_split))

        return gini_split

    def get_best_point(self, value_series, label_series):
        """找到一列属性中最适合划分 (gini 值最小) 的点"""

        value_array = np.array(value_series)
        value_array = np.sort(value_array)
        df_point = pd.DataFrame(columns=['point', 'gini_value'])

        # 循环属性值列,计算划分点及其 gini 值并添加至 df_point 数据表中
        for i in range(len(value_array) + 1):
            if (i == 0):
                point = value_array[i] - 1
            elif (i == len(value_array)):
                point = value_array[i - 1]
            else:
                point = 0.5 * (value_array[i] + value_array[i - 1])
            gini = self.get_col_gini(point, value_series, label_series)

            s = pd.Series(data={'point': point, 'gini_value': gini})
            df_point.loc[i] = s

        df_point.sort_values(by='gini_value', inplace=True)
        best_point = df_point.iloc[0, 0]
        best_gini = df_point.iloc[0, 1]
        # print("best point for column '%s': %f" %(value_series.name, best_point))
        # print(df_point)

        return best_point, best_gini

    def get_best_column(self, data, label_series):
        """遍历 data 中的属性列,计算其最佳划分点及 gini 值,找出最适合划分的一列和划分点"""

        x_data = data.copy()
        if (label_series.name in x_data.columns):
            del (x_data[label_series.name])

        gini_columns = pd.DataFrame(columns=['point', 'gini'], index=x_data.columns)
        for col_name in x_data.columns:
            point, gini = self.get_best_point(x_data[col_name], label_series)
            s = pd.Series({'point': point, 'gini': gini})
            gini_columns.loc[col_name] = [point, gini]
            # gini_columns = gini_columns.append(s, ignore_index=True) # append 会更改索引
        gini_columns.sort_values(by='gini', inplace=True)
        # print(gini_columns)
        best_col = gini_columns.index[0]
        best_point = gini_columns.iloc[0, 0]

        return best_col, best_point

    def get_ylabel(self, x_series):
        """计算一行 x 数据 (Series) 对应的标签 """

        model = self.decision_df

        y_pred = '#'
        x_index = 1
        parent_index = []
        child_df = pd.DataFrame()

        # for i in range(1):
        while (y_pred == '#'):
            # 判断条件
            condition = [model.loc[x_index, 'column'], model.loc[x_index, 'point']]
            if (x_series[condition[0]] > condition[1]):
                x_index += 1
            #   print('%s>%f' %(condition[0],condition[1]))
            # else:
            #   print('%s<=%f' %(condition[0],condition[1]))

            y_pred = model.loc[x_index, 'label']
            # 更新父节点索引并找到其子节点
            parent_index = [model.loc[x_index, 'this_depth'], model.loc[x_index, 'this_pos']]
            child_df = model.loc[(model['parent_depth'] == parent_index[0]) & (model['parent_pos'] == parent_index[1])]

            # 找到标签时结束
            if (child_df.shape[0] != 0):
                x_index = list(child_df.index)[0]
            #   print('跳到第%d行继续判断' %x_index)
            # else:
            #   print('预测结束')
        # print('pred:', y_pred)

        return y_pred

    def show_tree(self):
        """将决策树进行可视化"""

        def add_text(im_draw, text_str, xy, multiline=1):
            '''在绘图对象的某个位置添加文字'''
            # 设置大小
            font_h, font_w = 25, 14
            font_h *= multiline
            text_len = round(len(text_str) / multiline)

            font = ImageFont.truetype('Times.ttc', size=20)
            im_draw.text(xy=(xy[0] - font_w * 3, xy[1]), text=text_str, font=font, fill='black', align='center')
            # 绘制矩形
            # im_draw.rectangle(xy=(xy[0],xy[1],xy[0]+font_w*text_len,xy[1]+font_h),outline='black',width=2)

        interval_x, interval_y = 60, 80
        model = self.decision_df.copy()
        model['x_pos'] = model['this_pos']
        model['y_pos'] = (model['this_depth'] - 1) * interval_y
        model['text'] = 'text'
        max_depth = model.iloc[-1, 2]

        # 创建图像
        img_w, img_h = 1500, 600
        tree_img = Image.new(mode='RGB', size=(img_w, img_h), color='white')
        draw = ImageDraw.Draw(tree_img)  # 创建绘图对象

        parent_pos = []
        parent_x_pos = 0
        x_pos = 0
        for x_index in model.index:
            text = model.loc[x_index, 'column']
            if (str(model.loc[x_index, 'point']) == 'nan'):
                x_pos = img_w / 4
            else:
                # 跟新text内容和x位置
                model.loc[x_index, 'x_pos'] = x_pos
                parent_pos = [model.loc[x_index, 'parent_depth'], model.loc[x_index, 'parent_pos']]
                parent_x_pos = model.loc[
                    (model['this_depth'] == parent_pos[0]) & (model['this_pos'] == parent_pos[1]), 'x_pos']
                depth = model.loc[x_index, 'this_depth'] - 1
                if (model.loc[x_index, 'this_pos'] % 2 == 0):
                    text += '\n<=' + ('%.3f' % model.loc[x_index, 'point'])
                    x_pos = parent_x_pos - interval_x * np.sqrt(max_depth - depth)

                else:
                    text += '\n>' + ('%.3f' % model.loc[x_index, 'point'])
                    x_pos = parent_x_pos + interval_x * np.sqrt(max_depth - depth)
                x_pos = x_pos.iloc[0]
                if (model.loc[x_index, 'label'] != '#'):
                    text += '\nClass:' + str(model.loc[x_index, 'label'])

            # 将文字和位置添加到
            model.loc[x_index, 'text'] = text
            model.loc[x_index, 'x_pos'] = x_pos

        # 调整节点横坐标位置
        gap = 140
        for depth in model['this_depth'].unique():

            if (depth != 1):
                same_depth = model.loc[model['this_depth'] == depth]
                for x_index in same_depth.index[:-1]:
                    if (x_index == same_depth.index[0]):
                        if ((model.loc[same_depth.index[0], 'x_pos'] - model.loc[
                            same_depth.index[-1], 'x_pos']) < gap * len(same_depth.index)):
                            # 如果整体太挤,整层先往左移一段
                            for i in same_depth.index:
                                model.loc[i, 'x_pos'] -= gap * len(same_depth.index) / 8
                    # 如果相邻两个靠太近,右边的往右移一点
                    if ((model.loc[x_index + 1, 'x_pos'] - model.loc[x_index, 'x_pos']) < gap):
                        model.loc[x_index + 1, 'x_pos'] = model.loc[x_index, 'x_pos'] + gap
                        # model.loc[x_index,'x_pos']-=gap/2

        # 绘制文字和线
        this_img_pos = []
        parent_img_pos = []
        for x_index in model.index:
            # 绘制直线
            if (x_index != 0):
                this_img_pos = [model.loc[x_index, 'x_pos'], model.loc[x_index, 'y_pos']]
                parent_pos = [model.loc[x_index, 'parent_depth'], model.loc[x_index, 'parent_pos']]
                parent_img_pos = model.loc[
                    (model['this_depth'] == parent_pos[0]) & (model['this_pos'] == parent_pos[1]), ['x_pos', 'y_pos']]
                parent_img_pos = [parent_img_pos.iloc[0, 0], parent_img_pos.iloc[0, 1]]
                draw.line(xy=(parent_img_pos[0], parent_img_pos[1], this_img_pos[0], this_img_pos[1]), fill='gray',
                          width=1)

            # 添加文字
            this_pos = (model.loc[x_index, 'x_pos'], model.loc[x_index, 'y_pos'])
            text = model.loc[x_index, 'text']
            add_text(im_draw=draw, text_str=text, xy=this_pos)

        # 显示图片
        tree_img.show()

if __name__ == '__main__':
    # 读取文件
    data = pd.read_csv('./paras_labels.csv')
    # 数据按 8:2 进行训练集、测试集切分
    x_train, x_test, y_train, y_test = train_test_split(data, data['label'], test_size=0.2, random_state=7)

    ds_tree = MyDecisionTree()

    ds_tree.fit(x_train, y_train)
    # ds_tree.save_model('my_decision_tree%d.csv' %e)

    # ds_tree.load_model('my_decision_tree%d.csv' %e)
    # print(ds_tree.decision_df)

    print('训练集评估模型: ')
    ds_tree.evaluate(x_train, y_train)
    print('测试集评估模型: ')
    ds_tree.evaluate(x_test, y_test)

    ds_tree.show_tree()


~ cat paras_labels.csv
edu,exp,label
1,69,0
3,89,1
2,97,1
3,78,1
2,70,0
1,99,1
2,90,1
2,87,1
1,50,0
3,70,1
1,60,0
2,60,0
3,60,1
1,90,1
2,90,1
3,90,1
1,66,0
1,56,0
2,89,1
3,78,1



结果


decision condition: exp 74.0
添加一个子节点,数据大小: (8, 2)
添加一个叶节点,标签类型: 1 数据大小: (8, 2)
该层决策结束, 进行下一层决策

更新 parent_node_list, 大小:1
decision condition: edu 2.0
添加一个叶节点,标签类型: 0 数据大小: (6, 1)
添加一个叶节点,标签类型: 1 数据大小: (2, 1)
该层决策结束, 进行下一层决策

更新 parent_node_list, 大小:0
决策树构建完成
  parent_depth parent_pos this_depth this_pos column  point label
0          NaN        NaN          1        0   root    NaN     #
1            1          0          2        0    exp   74.0     #
2            1          0          2        1    exp   74.0     1
3            2          0          3        0    edu    2.0     0
4            2          0          3        1    edu    2.0     1
训练集评估模型:
混淆矩阵:
    1  0
1  10  0
0   0  6
acc:1.00000
测试集评估模型:
混淆矩阵:
   1  0
1  3  0
0  0  1
acc:1.00000






reference


https://blog.csdn.net/weixin_42444020/article/details/120959947

https://github.com/scikit-learn/scikit-learn/tree/main/sklearn/tree