在上一篇文章中,我們實現了樹的構造,在下面的內容中,我們將中心放在以下幾個方面
1.剪枝
2.連續值處理
3.資料載入器:DataLoader
4.模型評估
一,後剪枝
我選擇的是後剪枝,也就是先構造出完整的二元樹,最後在回過頭來剪掉其中一部分節點
後剪枝方法:
1.資料量太少的節點剪掉,可以防止減小誤差對模型的影響,降低過擬合風險,提高泛化能力
2.限制樹的的深度
3.小於先驗概率的節點剪掉,如果一個特徵判斷的能力甚至低於先驗概率,那麼就沒有必要存在
程式碼實現
# 後剪枝 def post_pruning(self,tree_dict,data,key=None): def processing_data(data,col_name,value): inx=data[col_name]==value return data[inx] for k in tree_dict: # 如果不是葉子節點 if not isinstance(tree_dict[k], str): if key in self.data.columns: # 遞迴遍歷所有節點 flag, count = self.post_pruning(tree_dict[k],data=processing_data(data,key,k),key=k) else:flag, count = self.post_pruning(tree_dict[k], data=data, key=k) # 如果知道葉子節點可以合併,返回兩次找到爺爺節點,把父節點變為save或not save if count == 1: return flag, count + 1 elif count == 2: tree_dict[k] = flag #葉子節點,判斷資料是否大於閾值 elif data.shape[0]<self.num_threshold: if len(data[data[self.target]==0])>=len(data[self.target])/2: return "not save",1 else:return "save",1 # 葉子節點,判斷是否都相同 elif np.array([v == "save" for v in tree_dict.values()]).all(): return "save", 1 elif np.array([v == "not save" for v in tree_dict.values()]).all(): return "not save", 1 continue return tree_dict, 0
二,連續值處理
處理方法:
def process_continue_value(x,y,total_entropy): ''' :param x: data:pd :param y: data[Survived]:pd :param total_entropy: int :return: 處理後的data ''' data=x["Age"] total_data=len(data) mean_list=[] gain_list=[] data=np.array(data) data.sort() unique_data=np.unique(data) for i in range(len(unique_data)-1): mean_list.append((unique_data[i]+unique_data[i+1])/2) for v in mean_list: x1_index=np.where(data>=v)[0] x2_index=np.where(data<v)[0] kind_y1=y[x1_index] kind_y2=y[x2_index] len_kind1=len(kind_y1) len_kind2 = len(kind_y2) part_gain=total_entropy-(len_kind1/total_data)*entropy(kind_y1)-(len_kind2/total_data)*entropy(kind_y2) gain_list.append(part_gain) x["Age"]=[0 if i<mean_list[np.argmax(gain_list)] else 1 for i in x["Age"]] return x
三,資料載入器:DataLoader
構造類資料載入器,傳入data自動劃分好train 和test,傳入引數設定train和test比例,亂數種子,實現劃分
程式碼實現
class DataLoader(object): def __init__(self,data:"pd",random_seed=None,test_size=0.33): self.data=data self.test_size=test_size self.random_seed=random_seed if random_seed is not None else None def __getitem__(self, index): return self.data.iloc[index,:] def __len__(self): return len(self.data) def split_data(self): if self.random_seed is not None: np.random.seed(self.random_seed) train_data_idx=np.random.randint(0,len(self.data),size=int(len(self.data)*(1-self.test_size))) test_data_idx=np.random.randint(0,len(self.data),size=int(len(self.data)*self.test_size)) train_data=self.data.iloc[train_data_idx,:] test_data=self.data.iloc[test_data_idx,:] return train_data,test_data
四,模型評估
模型的好壞我們無法肉眼觀察,需要在測試集中測試,按照構建的決策樹做決策,與真實值比較,得出準確率
程式碼實現
def evaluator(self,tree_dict, test_data: "pd", target_name): #調換資料順序,按照資訊增益由大到小 columns=test_data.columns new_columns=[columns[i] for i in self.gain.argsort()[::-1]] new_columns.append(target_name) #改變順序 test_data=test_data.reindex(columns=new_columns) right=0 #遍歷test_data中每一行資料 for index,row in test_data.iterrows(): temp_tree = tree_dict #根據test_data做選擇 for name in new_columns: choice=row[name] #如果沒有當前分支則跳過 if choice not in temp_tree[name].keys(): value=None break value=temp_tree[name][choice] temp_tree=value if value in["save","not save"]: #將y和pred_y同一 value=0 if value=="not save" else 1 break if value==row[target_name]: right+=1 accuracy = right/len(test_data) return accuracy
五,總結
首先,先把程式碼跑的資料截圖展示以下
1.未剪枝準確率:0.3299319727891156
2.剪枝後準確率:0.6190476190476191
對比分析可見,剪枝後準確率大大提升
總結反思:1.在編寫程式碼時,總是會出現各種錯誤,只有細心細心再細心,才能走向成功
2.在此次程式碼中運用大量遞迴,由於剛開始資料結構掌握的不熟練,吃了很多虧
3.在確定類時,剛開始沒有構思好,導致後續整合時,走了彎路。之後最好先畫類圖,再動手是實現程式碼
最後,全部程式碼如下,資料在上一篇文章中已放出
from sklearn.model_selection import train_test_split import numpy as np import pandas as pd from graphviz import Digraph import matplotlib.pyplot as plt import random # 定義文字方塊和箭頭格式 decisionNode = dict(boxstyle="sawtooth", fc="0.8") leafNode = dict(boxstyle="round4", fc="0.8") arrow_args = dict(arrowstyle="<-") ## 繪製帶箭頭的註解############## def plotNode(nodeTxt, centerPt, parentPt, nodeType): createPlot.ax1.annotate(nodeTxt, xy=parentPt, xycoords='axes fraction', xytext=centerPt, textcoords='axes fraction', va="center", ha="center", bbox=nodeType, arrowprops=arrow_args) ## PLOTTREE################################# ## 在父子節點間填充文字資訊 def plotMidText(cntrPt, parentPt, txtString): xMid = (parentPt[0] - cntrPt[0]) / 2.0 + cntrPt[0] yMid = (parentPt[1] - cntrPt[1]) / 2.0 + cntrPt[1] createPlot.ax1.text(xMid, yMid, txtString) ## 獲取葉節點的數目和樹的層數####################### def getNumLeafs(myTree): numLeafs = 0 firstStr = list(myTree.keys())[0]#找到輸入的第一個元素 secondDict = myTree[firstStr] for key in secondDict.keys(): if type(secondDict[key]).__name__ == 'dict': numLeafs += getNumLeafs(secondDict[key]) else: numLeafs += 1 return numLeafs def getTreeDepth(myTree): maxDepth = 0 firstStr = list(myTree.keys())[0] secondDict = myTree[firstStr] for key in secondDict.keys(): if type(secondDict[key]).__name__ == 'dict': thisDepth = 1 + getTreeDepth(secondDict[key]) else: thisDepth = 1 if thisDepth > maxDepth: maxDepth = thisDepth return maxDepth def plotTree(myTree, parentPt, nodeTxt): # 計算寬與高 numLeafs = getNumLeafs(myTree) depth = getTreeDepth(myTree) firstStr = list(myTree.keys())[0] cntrPt = (plotTree.xOff + (1.0 + float(numLeafs)) / 2.0 / plotTree.totalW, plotTree.yOff) # 標記子節點屬性 plotMidText(cntrPt, parentPt, nodeTxt) plotNode(firstStr, cntrPt, parentPt, decisionNode) secondDict = myTree[firstStr] # 減少y偏移 plotTree.yOff = plotTree.yOff - 1.0 / plotTree.totalD for key in secondDict.keys(): if type(secondDict[key]) == dict: plotTree(secondDict[key], cntrPt, str(key)) else: plotTree.xOff = plotTree.xOff + 1.0 / plotTree.totalW plotNode(secondDict[key], (plotTree.xOff, plotTree.yOff), cntrPt, leafNode) plotMidText((plotTree.xOff, plotTree.yOff), cntrPt, str(key)) plotTree.yOff = plotTree.yOff + 1.0 / plotTree.totalD def process(data): #資料處理 data.drop("Embarked",inplace=True,axis=1) x=data[["Age","Pclass","Sex","SibSp","Parch","Fare"]] y=data["Survived"] x["Pclass"] = x["Pclass"] - 1 x["SibSp"].fillna(x["SibSp"].mean(),inplace=True)#將預設的用平均值填充 x["Parch"].fillna(x["Parch"].mean(),inplace=True) x["Age"].fillna(x["Age"].mean(),inplace=True) #將大於平均值的設為1 x["Fare"]=[1 if i>np.array(x["Fare"]).mean() else 0 for i in x["Fare"] ] #將sex轉為1和0 x["Sex"]=pd.factorize(x["Sex"])[0].astype(np.uint16) return x,y def process_data(data): data = data[["Age", "Pclass", "Sex", "SibSp", "Parch", "Fare","Survived"]] #使得Pclass從0開始 data["Pclass"]=data["Pclass"]-1 data["Fare"] = [1 if i > np.max(data["Fare"] / 2) else 0 for i in data["Fare"]] data["Sex"] = pd.factorize(data["Sex"])[0].astype(np.uint16) data["SibSp"].fillna(data["SibSp"].mean(),inplace=True)#將預設的用平均值填充 data["Parch"].fillna(data["Parch"].mean(),inplace=True) data["Age"].fillna(data["Age"].mean(), inplace=True) return data #計算資訊熵 def entropy(data): total_len=len(data) len_1=np.where(data==1)[0].shape[0] len_0=len(data)-len_1 # 出現問題:部分資料會返回nan # 探索原因:當概率很小時,取對數後結果趨於負無窮大。 # 解決方法:改變浮點數的精度為1e - 5 p1=len_1/total_len+1e-5 p0=len_0/total_len+1e-5 entro=-np.sum([p0*np.log2(p0),p1*np.log2(p1)]) return entro def process_continue_value(x,y,total_entropy): ''' :param x: data:pd :param y: data[Survived]:pd :param total_entropy: int :return: 處理後的data ''' data=x["Age"] total_data=len(data) mean_list=[] gain_list=[] data=np.array(data) data.sort() unique_data=np.unique(data) for i in range(len(unique_data)-1): mean_list.append((unique_data[i]+unique_data[i+1])/2) for v in mean_list: x1_index=np.where(data>=v)[0] x2_index=np.where(data<v)[0] kind_y1=y[x1_index] kind_y2=y[x2_index] len_kind1=len(kind_y1) len_kind2 = len(kind_y2) part_gain=total_entropy-(len_kind1/total_data)*entropy(kind_y1)-(len_kind2/total_data)*entropy(kind_y2) gain_list.append(part_gain) x["Age"]=[0 if i<mean_list[np.argmax(gain_list)] else 1 for i in x["Age"]] return x #計算資訊增益 def gain(total_entropy,x,y): gain=[] total_data=len(y) # #將特徵轉為數位 # for f in x: # x[f] = pd.factorize(x[f])[0].astype(np.uint16) #計算每一個的gain for feature in x: # print("\n",feature) part_entropy = [] for kind in np.unique(x[feature]): # print("kind:",kind) x_index=np.where(x[feature]==kind)[0] kind_y=y[x_index] len_kind=len(kind_y) # print("len_kind:",len_kind) # print("獲救人數:",len(np.where(kind_y==1)[0])) part_entropy.append((len_kind/total_data)*entropy(kind_y)) gain.append(total_entropy-np.sum(part_entropy)) return gain class TreeNode(object): def __init__(self,name,parent=None): self.name=name self.parent=parent self.children={} #重寫,返回節點名稱 def __repr__(self): return "TreeNode(%s)"%self.name def add_child(self,child,idx): self.children[idx]=child def print_tree(self,root:"TreeNode",leval=0): print("第", leval, "層\n") if root.parent==None: print(root) if root.children: # print(root.name,":",end="") print(root.children.values()) #將每一層的節點全部輸出 #深度優先遍歷 for child in root.children.values(): self.print_tree(child,leval+1) print("\n") #獲得tree的字典 def get_tree_dict(self,root,data=None): def split_data(data,feature,value): inx=data[feature]==value return data[inx] #如果不是TreeNode型別,其實就是「empty」,就停止向下延伸 if not isinstance(root,TreeNode): return #如果沒有資料,標記為empty if data.empty: return "empty" #葉子節點時,那種多就標記為哪一種 if root.children=={}: if len(data[data["Survived"]==0])>=len(data["Survived"])/2: return "not save" else : return "save" # # 類別完全相同,停止劃分 if len(data[data["Survived"]==0])==len(data["Survived"]): #如果data["Survived"]全為0 return "not save" elif len(data[data["Survived"]==1])==len(data["Survived"]): # 如果data["Survived"]全為1 return "save" tree = {root.name: {}} for key in root.children: value=self.get_tree_dict(root.children[key],data=split_data(data,root.name,key)) #如果下個節點為empty,就不生成新的節點 if value=="empty": continue #遞迴,類似於dfs tree[root.name][key]=value return tree class DecisionTree(object): def __init__(self,gain,data:"pd.DataFrame",target,max_depth=None,num_threshold=5): self.prior_probability=len(np.where(target==1)[0])/len(target) self.max_depth=max_depth self.num_threshold=num_threshold self.depth=0 self.data=data # self.gain=np.array(gain.sort(reverse=True)) self.gain=np.array(gain) self.x=data.drop(target,axis=1) self.target=target # 深度優先建樹 def init(self): def dfs(node, map, leval=1): # leval=0為根節點 if leval < len(map): # 獲取當前 cur_name = map[leval] node_children = np.unique(self.x[cur_name]) cur_node = TreeNode(cur_name, node) for i in node_children: dfs(cur_node, map, leval + 1) node.add_child(cur_node, i) else: return features_name = [name for name in self.x] # 逆序 features_index = self.gain.argsort()[::-1] features_map = {} # 將資訊增益與對應的排名組成字典 for i,key in enumerate(features_index): features_map[i] = features_name[key] root = TreeNode(features_map[0]) dfs(root, features_map) return root # 後剪枝 def post_pruning(self,tree_dict,data,key=None): def processing_data(data,col_name,value): inx=data[col_name]==value return data[inx] for k in tree_dict: # 如果不是葉子節點 if not isinstance(tree_dict[k], str): if key in self.data.columns: # 遞迴遍歷所有節點 flag, count = self.post_pruning(tree_dict[k],data=processing_data(data,key,k),key=k) else:flag, count = self.post_pruning(tree_dict[k], data=data, key=k) # 如果知道葉子節點可以合併,返回兩次找到爺爺節點,把父節點變為save或not save if count == 1: return flag, count + 1 elif count == 2: tree_dict[k] = flag #葉子節點,判斷資料是否大於閾值 elif data.shape[0]<self.num_threshold: if len(data[data[self.target]==0])>=len(data[self.target])/2: return "not save",1 else:return "save",1 # 葉子節點,判斷是否都相同 elif np.array([v == "save" for v in tree_dict.values()]).all(): return "save", 1 elif np.array([v == "not save" for v in tree_dict.values()]).all(): return "not save", 1 continue return tree_dict, 0 def evaluator(self,tree_dict, test_data: "pd", target_name): #調換資料順序,按照資訊增益由大到小 columns=test_data.columns new_columns=[columns[i] for i in self.gain.argsort()[::-1]] new_columns.append(target_name) #改變順序 test_data=test_data.reindex(columns=new_columns) right=0 #遍歷test_data中每一行資料 for index,row in test_data.iterrows(): temp_tree = tree_dict #根據test_data做選擇 for name in new_columns: choice=row[name] #如果沒有當前分支則跳過 if choice not in temp_tree[name].keys(): value=None break value=temp_tree[name][choice] temp_tree=value if value in["save","not save"]: #將y和pred_y同一 value=0 if value=="not save" else 1 break if value==row[target_name]: right+=1 accuracy = right/len(test_data) return accuracy def createPlot(inTree): fig = plt.figure(1, facecolor='white') fig.clf() axprops = dict(xticks=[], yticks=[]) createPlot.ax1 = plt.subplot(111, frameon=False) # no ticks plotTree.totalW = float(getNumLeafs(inTree)) plotTree.totalD = float(getTreeDepth(inTree)) plotTree.xOff = -0.5 / plotTree.totalW plotTree.yOff = 1.0 plotTree(inTree, (0.5, 1.0), '') plt.show() class DataLoader(object): def __init__(self,data:"pd",random_seed=None,test_size=0.33): self.data=data self.test_size=test_size self.random_seed=random_seed if random_seed is not None else None def __getitem__(self, index): return self.data.iloc[index,:] def __len__(self): return len(self.data) def split_data(self): if self.random_seed is not None: np.random.seed(self.random_seed) train_data_idx=np.random.randint(0,len(self.data),size=int(len(self.data)*(1-self.test_size))) test_data_idx=np.random.randint(0,len(self.data),size=int(len(self.data)*self.test_size)) train_data=self.data.iloc[train_data_idx,:] test_data=self.data.iloc[test_data_idx,:] return train_data,test_data if __name__=="__main__": train_file = "D:/DataSet/titanic/titanic_train.csv" test_file = "D:/DataSet/titanic/titanic_test.csv" # 資料讀取 data = pd.read_csv(train_file) # test = pd.read_csv(test_file) data=process_data(data) target_name="Survived" target = data[target_name] #總的資訊熵 total_entropy = entropy(np.array(target)) #連續值處理 data = process_continue_value(data, target, total_entropy) #資料載入器 dataloader=DataLoader(data,random_seed=1) train_data,test_data=dataloader.split_data() #獲得資訊增益 gain=gain(total_entropy, train_data.drop(target_name,axis=1), target) #構造樹,設定樹的最大深度,每一個節點最少資料量等引數 tree=DecisionTree(np.array(gain),train_data,"Survived",num_threshold=10) root=tree.init() #獲得書的字典,為後續畫圖,剪枝準備 tree_dict = root.get_tree_dict(root,tree.data ) #後剪枝 for i in range(3): tree_dict=tree.post_pruning(tree_dict,tree.data)[0] # 模型評估 accuracy = tree.evaluator(tree_dict, test_data, target_name) print(accuracy) #plt作圖 createPlot(tree_dict) # print(tree_dict)