from math import log2 from math import sqrt from queue import PriorityQueue import numpy as np from sklearn.metrics import confusion_matrix class TreeNode(): def __init__(self, attr_list_for_split, depth, is_leaf=False, attr=None, continuous_attr_value=None, parent=None, classification=None, is_root=False): self.attr_list_for_split = attr_list_for_split # the currently remained attributes that can used for building tree self.depth = depth self.is_leaf = is_leaf self.attr = attr # the chosen attribute for growing sub-trees self.continuous_attr_value = continuous_attr_value # if self.attr is continuous, record the split value of it self.parent = parent self.classification = classification # classification result for leaf node self.descendants = {} # dict, keys for attr values, values for child nodes self.y_frequence_map = None # counts the occurrence number of all kinds of y, used in method 'pruning' self.is_root = is_root # used for TreeNode sort in pruning process def __lt__(self, other): return self.depth > other.depth class DTC45(): def __init__(self, max_depth=35, min_samples_split=2, max_continuous_attr_splits=150): self.max_depth = max_depth self.min_samples_split = min_samples_split self.max_continuous_attr_splits = max_continuous_attr_splits print("Tree parameter settings: max_depth={}, min_samples_split={}, max_continuous_attr_splits={}".format( max_depth, min_samples_split, max_continuous_attr_splits)) self.root = None self.built = False # only after the tree is built, can pruning be performed def fit(self, X_train, y_train, attr_list, attr_is_discrete, attr_discrete_values=None, verbose=0): self.y_kinds = set(y_train) self.attr_list = attr_list self.attr_position_map = dict(zip(attr_list, range(len(attr_list)))) # if the elements in *attr_is_discrete* are [True, False] format, convert them to [1, 0] format if attr_is_discrete[0] in [True, False]: attr_is_discrete = list([1 if x==True else 0 for x in attr_is_discrete]) self.attr_is_discrete_map = dict(zip(attr_list, attr_is_discrete)) if attr_discrete_values != None: self.attr_discrete_values = attr_discrete_values else: self.attr_discrete_values = {} for i in range(len(attr_list)): attr = attr_list[i] if self.attr_is_discrete_map[attr]: self.attr_discrete_values[attr] = set([x[i] for x in X_train]) self.root = TreeNode(attr_list_for_split=attr_list, depth=0, is_root=True) print('# Building tree, wait please...') print('Number of samples used for building:', len(X_train)) self.root = self._build_tree(self.root, X_train, y_train, verbose=verbose) self.built = True def _build_tree(self, node, X_train, y_train, verbose=0): if verbose: print("Sub-tree size: {}, depth: {},".format(len(y_train), node.depth)) assert len(y_train) > 0 # all left samples have the same classification result cur_y_all_same = True cur_y = y_train[0] for y in y_train[1:]: if y != cur_y: cur_y_all_same = False break if cur_y_all_same: if verbose: print("Set leaf node for y values are all same.") node.is_leaf = True node.classification = cur_y return node # all leaf samples have the same values on the leaf attributes for building tree cur_X_all_same = True for attr in node.attr_list_for_split: if cur_X_all_same: attr_pos = self.attr_position_map[attr] attr_v = X_train[0][attr_pos] for x in X_train[1:]: if x[attr_pos] != attr_v: cur_X_all_same = False break else: break if cur_X_all_same: if verbose: print("Set leaf node for X values are all same on the current leaf attributes.") node.is_leaf = True node.classification = self._find_most_frequent_value(y_train) return node # the attribute list for split is empty if (len(node.attr_list_for_split) == 0): if verbose: print("Set leaf node for empty attri_list to split.") node.is_leaf = True node.classification = self._find_most_frequent_value(y_train) return node # the maximum of tree depth is reached if (node.depth == self.max_depth): if verbose: print("Set leaf node for maximal tree depth is reached.") node.is_leaf = True node.classification = self._find_most_frequent_value(y_train) return node # the number of left samples is less than the minimum split threshold if (len(X_train) < self.min_samples_split): if verbose: print("Set leaf node for minimal sample split is reached.") node.is_leaf = True node.classification = self._find_most_frequent_value(y_train) return node # find current best attribute for growth # if the best_attr is a continuous numeric attribute, # then best_attr_split_value will be the best binary split position of this attribute, # otherwise, the best_attr_split_value will be None best_attr, best_attr_split_value = self._choose_best_split_attr(X_train, y_train, attr_list=node.attr_list_for_split) if verbose: print("Best attr: {}, attr_value: {}".format(best_attr, best_attr_split_value)) node.attr = best_attr if (not self.attr_is_discrete_map[best_attr]): node.continuous_attr_value = best_attr_split_value # build sub-trees # process discrete attribute if (self.attr_is_discrete_map[best_attr]): best_attr_values = self.attr_discrete_values[best_attr] descendants_X = {} descendants_y = {} for attr_value in best_attr_values: descendants_X[attr_value] = [] descendants_y[attr_value] = [] attr_pos = self.attr_position_map[best_attr] for i in range(len(X_train)): descendants_X[X_train[i][attr_pos]].append(X_train[i]) descendants_y[X_train[i][attr_pos]].append(y_train[i]) reduced_attr_list_for_split = node.attr_list_for_split[:] reduced_attr_list_for_split.remove(best_attr) for attr_v in best_attr_values: if len(descendants_X[attr_v]) > 0: child_node = TreeNode(attr_list_for_split=reduced_attr_list_for_split, depth=node.depth + 1, parent=node) node.descendants[attr_v] = self._build_tree(child_node, descendants_X[attr_v], descendants_y[attr_v], verbose=verbose) else: # no samples has this attribute value, so set child node to leaf node. # for the classification result, use the most frequent y value in y_train child_node = TreeNode(attr_list_for_split=reduced_attr_list_for_split, depth=node.depth + 1, parent=node, is_leaf=True, classification=self._find_most_frequent_value(y_train)) node.descendants[attr_v] = child_node # process continuous numeric attribute elif (not self.attr_is_discrete_map[best_attr]): best_attr_values = ['less', 'greater'] descendants_X = {} descendants_y = {} for attr_value in best_attr_values: descendants_X[attr_value] = [] descendants_y[attr_value] = [] attr_pos = self.attr_position_map[best_attr] for i in range(len(X_train)): if X_train[i][attr_pos] < best_attr_split_value: descendants_X['less'].append(X_train[i]) descendants_y['less'].append(y_train[i]) else: descendants_X['greater'].append(X_train[i]) descendants_y['greater'].append(y_train[i]) for attr_v in best_attr_values: if len(descendants_X[attr_v]) > 0: child_node = TreeNode(attr_list_for_split=node.attr_list_for_split, depth=node.depth + 1, parent=node) node.descendants[attr_v] = self._build_tree(child_node, descendants_X[attr_v], descendants_y[attr_v], verbose=verbose) else: # no samples has this attribute value, so set child node to leaf node. # for the classification result, use the most frequent y value in y_train child_node = TreeNode(attr_list_for_split=node.attr_list_for_split, depth=node.depth + 1, parent=node, is_leaf=True, classification=self._find_most_frequent_value(y_train)) node.descendants[attr_v] = child_node return node def _find_most_frequent_value(self, values): value_number_map = {} for v in values: if v not in value_number_map: value_number_map[v] = 1 else: value_number_map[v] += 1 max_number = 0 most_frequent_value = None for key in value_number_map: if value_number_map[key] > max_number: max_number = value_number_map[v] most_frequent_value = v if most_frequent_value is None: print("We've found a None y list !!") assert most_frequent_value != None return most_frequent_value def _choose_best_split_attr(self, X, y, attr_list): best_attr = None best_attr_gain_ratio = 0 best_attr_split_value = None # calculate entropy value of current tree node y_occurrence = dict(zip(self.y_kinds, [0] * len(self.y_kinds))) for yi in y: y_occurrence[yi] = y_occurrence[yi] + 1 ent = self._calculate_entropy_from_frequency(y_occurrence, len(y)) # deal with each attribute X_y = [list(xi) + [yi] for (xi, yi) in zip(X, y)] for attr in attr_list: attr_pos = self.attr_position_map[attr] # two dimensional map for query the occurrences of given attribute value and y value attr_value_y = {} # for discrete attributes if (self.attr_is_discrete_map[attr]): attr_v_occurrence = dict( zip(self.attr_discrete_values[attr], [0] * len(self.attr_discrete_values[attr]))) for attr_v in self.attr_discrete_values[attr]: attr_value_y[attr_v] = dict(zip(self.y_kinds, [0] * len(self.y_kinds))) for x_y in X_y: # print(x_y) attr_value_y[x_y[attr_pos]][x_y[-1]] = attr_value_y[x_y[attr_pos]][x_y[-1]] + 1 attr_v_occurrence[x_y[attr_pos]] = attr_v_occurrence[x_y[attr_pos]] + 1 ent_attr = 0 for attr_v in self.attr_discrete_values[attr]: ent_attr = ent_attr + attr_v_occurrence[attr_v] / (len(y)) * self._calculate_entropy_from_frequency( attr_value_y[attr_v], attr_v_occurrence[attr_v]) intrinsic_value = self._calculate_entropy_from_frequency(attr_v_occurrence, len(y)) # update best gain_ratio if intrinsic_value > 0: gain_ratio = (ent - ent_attr) / intrinsic_value if (gain_ratio > best_attr_gain_ratio): best_attr = attr best_attr_gain_ratio = gain_ratio # for continuous numeric attributes else: attr_split_value_list = list(set([x_y[attr_pos] for x_y in X_y])) attr_split_value_list.sort() # cut down the size of split value candidates while (len(attr_split_value_list) > self.max_continuous_attr_splits): attr_split_value_list = [attr_split_value_list[i] for i in range(len(attr_split_value_list)) if i % 3 == 0] # process each split value of the current attribute for attr_split_value in attr_split_value_list: attr_v_occurrence = dict(zip(['less', 'greater'], [0] * 2)) for attr_v in ['less', 'greater']: attr_value_y[attr_v] = dict(zip(self.y_kinds, [0] * len(self.y_kinds))) for x_y in X_y: # print(x_y) if x_y[attr_pos] < attr_split_value: attr_value_y['less'][x_y[-1]] = attr_value_y['less'][x_y[-1]] + 1 attr_v_occurrence['less'] = attr_v_occurrence['less'] + 1 else: attr_value_y['greater'][x_y[-1]] = attr_value_y['greater'][x_y[-1]] + 1 attr_v_occurrence['greater'] = attr_v_occurrence['greater'] + 1 ent_attr = 0 for attr_v in ['less', 'greater']: ent_attr = ent_attr + attr_v_occurrence[attr_v] / ( len(y)) * self._calculate_entropy_from_frequency( attr_value_y[attr_v], attr_v_occurrence[attr_v]) intrinsic_value = self._calculate_entropy_from_frequency(attr_v_occurrence, len(y)) # update best gain_ratio if intrinsic_value > 0: gain_ratio = (ent - ent_attr) / intrinsic_value if (gain_ratio > best_attr_gain_ratio): best_attr = attr best_attr_gain_ratio = gain_ratio if (not self.attr_is_discrete_map[attr]): best_attr_split_value = attr_split_value if best_attr == None: print("None best_attr found.") best_attr = attr_list[0] best_attr_split_value = X[0][self.attr_position_map[best_attr]] if self.attr_is_discrete_map[best_attr]: return best_attr, None else: return best_attr, best_attr_split_value def _calculate_entropy_from_frequency(self, y_occurrence_map, total_len): entropy = 0.0 if total_len > 0: for yi in y_occurrence_map: prob = y_occurrence_map[yi] / total_len if prob > 0: entropy = entropy - prob * log2(prob) return entropy def predict(self, X_test): if not self.built: print("You should build the tree first by calling the 'fit' method with some train samples.") return None y_predict = [] for x in X_test: cur_node = self.root while (not cur_node.is_leaf): x_attr_value = x[self.attr_position_map[cur_node.attr]] if (self.attr_is_discrete_map[cur_node.attr]): cur_node = cur_node.descendants[x_attr_value] else: if x_attr_value < cur_node.continuous_attr_value: cur_node = cur_node.descendants['less'] else: cur_node = cur_node.descendants['greater'] y_predict.append(cur_node.classification) return y_predict def evaluate(self, X_test, y_test, detailed_result=0): y_predict = self.predict(X_test) return self._calculate_metrics(y_predict, y_test, detailed_result) def _calculate_metrics(self, y_pred, y_true, detailed_result): """ If parameter detailed_result is False or 0, only prediction accuracy (Acc) will be returned. Otherwise, the returned result will be confusion matrix and prediction metrics list, in which only [Acc] for multiple classification and [Acc, Sn, Sp, Precision, MCC] for binary classification. """ y_right = [1 for (y_p, y_t) in zip(y_pred, y_true) if y_p == y_t] acc = len(y_right) / len(y_pred) if not detailed_result: return acc con_matrix = confusion_matrix(y_pred, y_true) if len(self.y_kinds) > 2: return con_matrix, [acc] else: tn = con_matrix[0][0] fp = con_matrix[0][1] fn = con_matrix[1][0] tp = con_matrix[1][1] p = tp + fn n = tn + fp sn = tp / p if p > 0 else None sp = tn / n if n > 0 else None pre = (tp) / (tp + fp) if (tp + fp) > 0 else None mcc = 0 tmp = sqrt(tp + fp) * sqrt(tp + fn) * sqrt(tn + fp) * sqrt(tn + fn) if tmp != 0: mcc = (tp * tn - fp * fn) / tmp return con_matrix, [acc, sn, sp, pre, mcc] def pruning(self, X_validation, y_validation, verbose=0): print('# Pruning tree, wait please...') print('Number of samples used for pruning:', len(X_validation)) self._count_y_frequence_of_each_node(X_validation, y_validation) # store the currently unprocessed nodes node_queue = PriorityQueue() node_set = set() # get all leaf nodes leaf_node_list = self._get_all_leaf_nodes(self.root) # count tree node number all_node_num, leaf_node_num = self._count_node_number(self.root) print("There are {} tree nodes and {} leaf nodes before pruning.".format(all_node_num, leaf_node_num)) # get the parents of all leaf nodes for leaf in leaf_node_list: parent = leaf.parent if parent != None and parent not in node_set: node_set.add(parent) node_queue.put(parent) # pruning best_acc = self.evaluate(X_validation, y_validation, detailed_result=0) while len(node_set) > 0: # get the deepest unprocessed node node = node_queue.get() node_queue.task_done() node_set.remove(node) if node.y_frequence_map != None: # set the current node to leaf node node.is_leaf = True # find the most frequent y value of the current node most_frequent_y = \ sorted([(x, node.y_frequence_map[x]) for x in node.y_frequence_map], key=lambda x: x[1])[-1][0] node.classification = most_frequent_y # performance validation val_acc = self.evaluate(X_validation, y_validation, detailed_result=0) if verbose: print("Dealing with a TreeNode with depth being {}".format(node.depth)) print("Val Acc: ", val_acc) # update the best Acc if val_acc < best_acc: # <= 0.94366 / < # rollback, if val_acc doesn't improve node.is_leaf = False node.classification = None else: best_acc = val_acc if verbose: print("This node has been translated into a leaf node.") print("Current Best Acc: ", best_acc) # add the parent of current node to the unprocessed node queue if node.parent not in node_set: if node.parent != None: node_set.add(node.parent) node_queue.put(node.parent) elif node.parent is None and node.is_root == False: print("We've found a non-root node has NO Parent!") # count tree node number all_node_num, leaf_node_num = self._count_node_number(self.root) print("There are {} tree nodes and {} leaf nodes after pruning.".format(all_node_num, leaf_node_num)) def _get_all_leaf_nodes(self, root): leaf_node_list = [] if root.is_leaf: leaf_node_list.append(root) else: for attr in root.descendants: leaf_node_list += self._get_all_leaf_nodes(root.descendants[attr]) return leaf_node_list def _count_node_number(self, root): all_node_num = 1 leaf_node_num = 0 if root.is_leaf: leaf_node_num = 1 else: for attr in root.descendants: child_node_num, child_leaf_num = self._count_node_number(root.descendants[attr]) all_node_num += child_node_num leaf_node_num += child_leaf_num return all_node_num, leaf_node_num def _count_y_frequence_of_each_node(self, X, y): if not self.built: print("You should build the tree first by calling the 'fit' method with some train samples.") return None X_y = [xi + [yi] for (xi, yi) in zip(X, y)] for x_y in X_y: cur_node = self.root while (not cur_node.is_leaf): # update the occurrence times of all kinds of y if cur_node.y_frequence_map is None: cur_node.y_frequence_map = dict(zip(self.y_kinds, [0] * len(self.y_kinds))) cur_node.y_frequence_map[x_y[-1]] += 1 x_attr_value = x_y[self.attr_position_map[cur_node.attr]] if (self.attr_is_discrete_map[cur_node.attr]): cur_node = cur_node.descendants[x_attr_value] else: if x_attr_value < cur_node.continuous_attr_value: cur_node = cur_node.descendants['less'] else: cur_node = cur_node.descendants['greater'] # performance test def main(): data_type_path = './datatypes.csv' # a list to record whether an attribute is discrete or not train_data_path = './btrain.csv' test_data_path = './bvalidate.csv' random_seed = 0 max_tree_depth = 35 min_samples_split = 4 max_continuous_attr_splits = 20 validation_sample_num = 5000 verbose = 0 with open(data_type_path, 'r') as data_type: attr_type =',') attr_type = [0 if x.lower() == 'false' else 1 for x in attr_type if len(x) > 0] def load_data(data_path, return_data_attrs=False): with open(data_path, 'r') as train_data: head = True samples = [] completed_samples = [] data_attrs = None for line in train_data: if head: data_attrs = line.strip().split(',') head = False else: sample = line.strip().split(',') samples.append(sample) if '?' not in sample: sample = [x if x.isalpha() else eval(x) for x in sample] completed_samples.append(sample) if return_data_attrs: return completed_samples, data_attrs else: return completed_samples samples, data_attrs = load_data(train_data_path, return_data_attrs=True) test_samples = load_data(test_data_path) # shuffle samples np.random.seed(random_seed) np.random.shuffle(samples) validation_samples = samples[-validation_sample_num:] train_samples = samples[0:-validation_sample_num] X_train = [x[0:-1] for x in train_samples] y_train = [x[-1] for x in train_samples] X_validation = [x[0:-1] for x in validation_samples] y_validation = [x[-1] for x in validation_samples] X_test = [x[0:-1] for x in test_samples] y_test = [x[-1] for x in test_samples] tree = DTC45(max_depth=max_tree_depth, min_samples_split=min_samples_split, max_continuous_attr_splits=max_continuous_attr_splits), y_train, attr_list=data_attrs[0:-1], attr_is_discrete=attr_type, verbose=verbose) print("Train Acc: {}".format(tree.evaluate(X_train, y_train, detailed_result=0))) con_matrix_before_pruning, performances_before_pruning = tree.evaluate(X_test, y_test, detailed_result=1) print("\nTest Acc before pruning: {}".format(performances_before_pruning[0])) print("Classification confusion_matrix:\n{}".format(con_matrix_before_pruning)) print("Detailed performances [Acc, Sn, Sp, Pre, MCC]:\n{}\n".format(performances_before_pruning)) tree.pruning(X_validation, y_validation, verbose=verbose) con_matrix_after_pruning, performances_after_pruning = tree.evaluate(X_test, y_test, detailed_result=1) print("\nTest Acc after pruning: {}".format(performances_after_pruning[0])) print("Classification confusion_matrix:\n{}".format(con_matrix_after_pruning)) print("Detailed performances [Acc, Sn, Sp, Pre, MCC]:\n{}".format(performances_after_pruning)) if __name__ == '__main__': main()
Bank Marketing
Data Mining classification homework

