Loading Data.py→Dataset.py +43 −29 Original line number Diff line number Diff line Loading @@ -16,28 +16,44 @@ def additional_input(k, x, y): else: return x*y class Data: input_data_dimension = 2 class Dataset: input_data_dimension = 4 input_coord_dimension = 2 output_data_dimension = 1 data_size = 1 all_data = [] labels = [] training_data_size = 1 training_indices = [] training_data = [] labelled_data = [] train_data_size = 1 train_indices = [] train_data = [] train_labels = [] test_data_size = 1 test_indices = [] test_data = [] test_labels = [] labelled_data_A = [] labelled_data_B = [] #test points for the heatmap - modification of the heatmap resolution test_points_x = np.linspace(-1,1,51) test_points_y = np.linspace(1,-1,51) def __init__(self, data_size, training_data_size): self.data_size = data_size self.training_data_size = training_data_size self.train_data_size = training_data_size self.test_data_size = data_size - training_data_size def initializeDataSet(self, type): self.createAllData(type) self.addAdditionalInputs() self.splitToClasses() self.splitDataToTrainingTest() def norm(self,x,y): return np.sqrt(x**2 + y**2) Loading Loading @@ -78,19 +94,17 @@ class Data: def splitDataToTrainingTest(self): self.training_indices = random.sample(range(self.data_size), self.training_data_size) self.training_data = self.all_data[self.training_indices,:] self.train_indices = random.sample(range(self.data_size), self.train_data_size) self.train_data = self.all_data[self.train_indices,:] self.train_labels = self.labels[self.train_indices] self.test_indices = np.delete(range(self.data_size), self.training_indices, axis=0) self.test_indices = np.delete(range(self.data_size), self.train_indices, axis=0) self.test_data = self.all_data[self.test_indices,:] self.test_labels = self.labels[self.test_indices] if len(self.test_data) == self.test_data_size: print("OK") def initializeDataSet(self, type): self.createAllData(type) self.addAdditionalInputs() self.splitDataToTrainingTest() def splitToClasses(self): self.labelled_data_A = self.all_data[self.labels == 1] self.labelled_data_B = self.all_data[self.labels == -1] def createLinear(self): self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension)) Loading @@ -99,7 +113,7 @@ class Data: self.all_data[i][1] = self.all_data[i][1] else: self.all_data[i][1] = self.all_data[i][1] self.labelled_data = 2*(self.all_data[:,1] < 0)-1 self.labels = 2*(self.all_data[:,1] < 0)-1 def createElipse(self): self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension)) Loading @@ -107,7 +121,7 @@ class Data: if self.norm((self.all_data[i][0])/1, (self.all_data[i][1])/1) < 0.7: self.all_data[i][0] = 0.5*self.all_data[i][0] self.all_data[i][1] = 0.5*self.all_data[i][1] self.labelled_data = 2*(self.norm((self.all_data[:,0]/1), (self.all_data[:,1])/1) < 0.7)-1 self.labels = 2*(self.norm((self.all_data[:,0]/1), (self.all_data[:,1])/1) < 0.7)-1 def createSinus(self): self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension)) Loading @@ -116,7 +130,7 @@ class Data: self.all_data[i][1] -= 0.0 else: self.all_data[i][1] += 0.0 self.labelled_data = 2*(self.all_data[:,1] < 0.5*np.sin(4*self.all_data[:,0]))-1 self.labels = 2*(self.all_data[:,1] < 0.5*np.sin(4*self.all_data[:,0]))-1 def createHalfElipse(self): self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension)) Loading @@ -130,7 +144,7 @@ class Data: self.all_data[i][1] += 0.2 else: self.all_data[i][1] = np.maximum(-1, self.all_data[i][1]-0.1) self.labelled_data = 2*(self.norm(self.all_data[:,0], self.all_data[:,1]+1.2) < 1.2)-1 self.labels = 2*(self.norm(self.all_data[:,0], self.all_data[:,1]+1.2) < 1.2)-1 def createSpiral(self): # Define the center of the spiral, as well as the starting angle and the distance between the lines Loading @@ -138,11 +152,11 @@ class Data: center_y = 0 angle = 0.5 distance = 0.1 noise = 0.02 noise = 0.04 # Create an empty list to store the points of the spiral self.all_data = np.zeros((self.data_size,2)) self.labelled_data = np.zeros(self.data_size) self.labels = np.zeros(self.data_size) # Create a loop to generate the points of the spiral for i in range(math.floor(self.data_size/2)): Loading @@ -155,9 +169,9 @@ class Data: # Add the point to the list of points self.all_data[2*i] = [x,y] self.labelled_data[2*i] = -1 self.labels[2*i] = -1 self.all_data[2*i+1] = [x_2,y_2] self.labelled_data[2*i+1] = 1 self.labels[2*i+1] = 1 # Increase the angle and distance for the next iteration angle += 4*math.pi/self.data_size Loading @@ -165,7 +179,7 @@ class Data: def createTwoElipses(self): self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension)) self.labelled_data = np.zeros(self.data_size) self.labels = np.zeros(self.data_size) center_x1 = 0.4 center_y1 = 0.4 Loading @@ -178,10 +192,10 @@ class Data: if self.norm((self.all_data[i][0]-center_x1)/1, (self.all_data[i][1]-center_y1)/1) < range1: self.all_data[i][0] = 0.8*(self.all_data[i][0]-center_x1) + center_x1 self.all_data[i][1] = 0.8*(self.all_data[i][1]-center_y1) + center_y1 self.labelled_data[i] = 1 self.labels[i] = 1 if self.norm((self.all_data[i][0]-center_x2)/1, (self.all_data[i][1]-center_y2)/1) < range2: self.all_data[i][0] = 0.8*(self.all_data[i][0]-center_x2) + center_x2 self.all_data[i][1] = 0.8*(self.all_data[i][1]-center_y2) + center_y2 self.labelled_data[i] = 1 self.labelled_data = 2*self.labelled_data-1 self.labels[i] = 1 self.labels = 2*self.labels-1 Layer.py +2 −2 Original line number Diff line number Diff line Loading @@ -21,9 +21,9 @@ class Layer: #return 1.0/(1+np.exp(-num)) def activationOutput(self,num): #return num return num #return np.maximum(num,0) return np.tanh(num) #return np.tanh(num) #print(1.0/(1+np.exp(-num))) #return 1.0/(1+np.exp(-num)) Loading LearningController.py +77 −138 Original line number Diff line number Diff line import numpy as np from NeuralNetwork import NeuralNetwork from Data import Data import matplotlib as mpl import matplotlib.pyplot as plt from Dataset import Dataset from Plotter import Plotter import sys import os import random from matplotlib.patches import ConnectionPatch class TrainingController: folder_path = "model/" learning_rate = 0.1 learning_rate = 0.05 l2_regularization = 0.0 batch_size = 10 max_epochs = 1000 max_epochs = 100 epoch = 0 epsilon = 0.05 train_loss = 1.0 train_loss_baseline = 1.0 test_loss = 1.0 test_loss_baseline = 1.0 total_loss = 1.0 total_loss_baseline = 1.0 epoch = 0 train_loss_history = [] test_loss_history = [] def __init__(self, network): train_accuracy = 1.0 test_accuracy = 1.0 train_accuracy_history =[] test_accuracy_history = [] def __init__(self, network: NeuralNetwork, dataset: Dataset): self.network = network self.network.setLearningParameters(self.learning_rate, self.l2_regularization) plt.ion() self.fig, (self.state,self.cost) = plt.subplots(1,2, figsize=(10, 5)) self.dataset = dataset self.plotter = Plotter(self.folder_path) def trainOnBatch(self, batch, labels): for j in range(len(batch)): Loading @@ -39,13 +41,13 @@ class TrainingController: self.network.calculateErrors(labels[j]) self.network.updateGradient(len(batch)) def trainOneEpochBGD(self, dataset: Data): def trainOneEpochBGD(self): self.network.resetBeforeEpoch() shuffled_indexes = np.arange(dataset.training_data_size) shuffled_indexes = np.arange(self.dataset.train_data_size) np.random.shuffle(shuffled_indexes) number_of_full_batches = dataset.training_data_size // self.batch_size number_of_full_batches = self.dataset.train_data_size // self.batch_size for batch_number in range(number_of_full_batches+1): Loading @@ -54,17 +56,17 @@ class TrainingController: else: batch_indexes = shuffled_indexes[batch_number*self.batch_size:] batch = dataset.all_data[batch_indexes,:] labels = dataset.labelled_data[batch_indexes] batch = self.dataset.all_data[batch_indexes,:] labels = self.dataset.labels[batch_indexes] self.trainOnBatch(batch, labels) self.network.updateWeights() def trainOneEpochMiniBGD(self, dataset: Data): shuffled_indexes = np.arange(dataset.training_data_size) def trainOneEpochMiniBGD(self): shuffled_indexes = np.arange(self.dataset.train_data_size) np.random.shuffle(shuffled_indexes) number_of_full_batches = dataset.training_data_size // self.batch_size number_of_full_batches = self.dataset.train_data_size // self.batch_size for batch_number in range(number_of_full_batches+1): self.network.resetBeforeEpoch() Loading @@ -73,25 +75,25 @@ class TrainingController: else: batch_indexes = shuffled_indexes[batch_number*self.batch_size:] batch = dataset.all_data[batch_indexes,:] labels = dataset.labelled_data[batch_indexes] batch = self.dataset.all_data[batch_indexes,:] labels = self.dataset.labels[batch_indexes] self.trainOnBatch(batch, labels) self.network.updateWeights() def trainOneEpochSGD(self, dataset: Data, multiple_passes): batch_indexes = random.sample(dataset.training_indices, training_controller.batch_size) batch = dataset.all_data[batch_indexes,:] def trainOneEpochSGD(self, multiple_passes): batch_indexes = random.sample(self.dataset.train_indices, self.batch_size) batch = self.dataset.all_data[batch_indexes,:] labels = dataset.labelled_data[batch_indexes] labels = self.dataset.labels[batch_indexes] for n in range(multiple_passes): self.network.resetBeforeEpoch() self.trainOnBatch(batch, labels) self.network.updateWeights() def updateLossFunctionAfterEpoch(self): self.train_loss = self.network.updateLossFunction(data.all_data[data.training_indices], data.labelled_data[data.training_indices]) self.test_loss = self.network.updateLossFunction(data.all_data[data.test_indices], data.labelled_data[data.test_indices]) def updateMetricsAfterEpoch(self): self.train_loss = self.network.updateLossFunction(self.dataset.train_data, self.dataset.train_labels) self.test_loss = self.network.updateLossFunction(self.dataset.test_data, self.dataset.test_labels) self.total_loss = self.train_loss+self.test_loss if self.epoch == 0: self.train_loss_baseline = self.train_loss Loading @@ -102,141 +104,78 @@ class TrainingController: self.test_loss /= self.test_loss_baseline self.total_loss /= self.total_loss_baseline self.train_loss_history.append((self.epoch, self.train_loss)) self.test_loss_history.append((self.epoch, (self.test_loss))) print("Cost function: "+str(self.total_loss)) print("Training cost function: "+str(self.train_loss)) self.train_accuracy = self.network.getAccuracy(self.dataset.train_data, self.dataset.train_labels) self.test_accuracy = self.network.getAccuracy(self.dataset.test_data, self.dataset.test_labels) def plotAfterEpoch(self): if self.epoch % 10 == 0: self.cost.plot(*zip(*self.train_loss_history), 'k-', label='Training data') self.cost.plot(*zip(*self.test_loss_history), 'b-', label='Testing data') self.train_accuracy_history.append((self.epoch, self.train_accuracy)) self.test_accuracy_history.append((self.epoch, self.test_accuracy)) colormap = self.network.computeHeatMap2(test_points_x, test_points_y) self.train_loss_history.append((self.epoch, self.train_loss)) self.test_loss_history.append((self.epoch, self.test_loss)) #heatmap = proc.imshow(colormap, cmap='RdYlGn', extent=([-1, 1, -1, 1]), interpolation='bilinear', vmin=-1, vmax=1) self.state.imshow(colormap, cmap='RdYlGn', extent=([-1, 1, -1, 1]), interpolation='bilinear', vmin=-1, vmax=1) self.state.plot(labelled_data_A[:,0], labelled_data_A[:,1], 'go', label="Class A") self.state.plot(labelled_data_B[:,0], labelled_data_B[:,1], 'ro', label="Class B") self.state.plot(data.training_data[:,0], data.training_data[:,1], 'k.', label="Training data") print("Train loss: "+str(np.round(self.train_loss,4))+", Test loss: "+str(np.round(self.test_loss,4))+", Train accuracy: "+str(np.round(self.train_accuracy,2))+" %, Test accuracy: "+str(np.round(self.test_accuracy,2))+" %") if(self.epoch == 0): self.state.legend(loc='lower center', bbox_to_anchor=(0.5, 1.0), ncols=3) #fig.colorbar(heatmap, orientation="horizontal") self.cost.legend(loc='lower center', bbox_to_anchor=(0.5, 1.0), ncols=2) def updateAfterEpoch(self): self.fig.canvas.draw() self.fig.canvas.flush_events() heatmap = self.network.computeHeatMap(self.dataset.test_points_x, self.dataset.test_points_y) def trainOnDataset(self,dataset,number_of_epochs): for i in range(number_of_epochs): self.trainOneEpoch(dataset) # do functions for graphics self.plotter.plotStateAfterEpoch(self.dataset, heatmap, self.train_loss_history, self.test_loss_history, self.train_accuracy_history, self.test_accuracy_history, self.epoch) def saveTrainedModel(self): #TODO saving plots, history in txt, weights in txt and numpy for network loading colormap = self.network.computeHeatMap(test_points_x, test_points_y) #heatmap = proc.imshow(colormap, cmap='RdYlGn', extent=([-1, 1, -1, 1]), interpolation='bilinear', vmin=-1, vmax=1) self.state.imshow(colormap, cmap='RdYlGn', extent=([-1, 1, -1, 1]), interpolation='bilinear', vmin=-1, vmax=1) self.state.plot(labelled_data_A[:,0], labelled_data_A[:,1], 'go', label="Class A") self.state.plot(labelled_data_B[:,0], labelled_data_B[:,1], 'ro', label="Class B") self.state.plot(data.training_data[:,0], data.training_data[:,1], 'k.', label="Training data") #numberOfNeurons = np.sum(structure) maxNeuronsInLayer = np.max(structure) networkMap = self.network.computeWholeNeuralNetworkHeatMap(test_points_x, test_points_y, len(structure), maxNeuronsInLayer) #fig2, subfigs = plt.subplots(maxNeuronsInLayer, len(structure), figsize=(4*maxNeuronsInLayer, 4*maxNeuronsInLayer)) subfigs = [] fig2 = plt.figure(figsize=(4*len(structure), 4*maxNeuronsInLayer)) max_weights_in_layer = [] for i in range(len(structure)): max_weights_in_layer.append(np.max(self.network.layer[i].weights)) max_weight = np.max(np.array(max_weights_in_layer)) for i in range(len(structure)): for j in range(structure[i]): subfig = fig2.add_subplot(maxNeuronsInLayer, len(structure), i+len(structure)*j+1) subfig.imshow(networkMap[i+len(structure)*j], cmap='RdYlGn', extent=([-1, 1, -1, 1]), vmin=-1, vmax=1) if(i > 0): subfig.set_xticks([]) # Remove x-axis tick labels subfig.set_yticks([]) # Remove y-axis tick labels subfigs.append(subfig) index_of_first_neuron_in_previous_layer = int(np.sum(structure[:i-1])) if(i > 0): xy1=[1,0] xy2=[-1,0] for index in range(structure[i-1]): weight = self.network.layer[i].weights[j][index]/max_weight if weight >= 0: con = ConnectionPatch(xyA=xy2, xyB=xy1, coordsA="data", coordsB="data", axesA=subfig, axesB=subfigs[index_of_first_neuron_in_previous_layer+index], color="red", linestyle='dashed', lw=3.0*weight) else: con = ConnectionPatch(xyA=xy2, xyB=xy1, coordsA="data", coordsB="data", axesA=subfig, axesB=subfigs[index_of_first_neuron_in_previous_layer+index], color="green", linestyle='dashed', lw=-3.0*weight) def trainOnDataset(self): os.makedirs(training_controller.folder_path, exist_ok=True) subfig.add_artist(con) while self.epoch < self.max_epochs and self.train_loss >= self.epsilon: print("Epoch "+str(self.epoch)+":") self.trainOneEpochSGD(10) #self.trainOneEpochMiniBGD(data) #self.trainOneEpochBGD(data) fig2.canvas.draw() fig2.canvas.flush_events() self.updateMetricsAfterEpoch() fig2.savefig(self.folder_path+"heatmap", dpi=500) self.fig.savefig(self.folder_path+"final_state", dpi=300) if self.epoch % 10 == 0: self.updateAfterEpoch() plt.show(block='false') self.epoch += 1 self.saveTrainedModel() data_size = 1000 training_data_size = 300 self.network.summary() data = Data(data_size, training_data_size) if(data.initializeDataSet("two_elipses") == False): print("Dataset not defined") sys.exit(1) structure = [data.input_data_dimension,5,5,5,data.output_data_dimension] network = NeuralNetwork(structure) def saveTrainedModel(self): #TODO saving plots, history in txt, weights in txt and numpy for network loading training_controller = TrainingController(network) self.updateAfterEpoch() os.makedirs(training_controller.folder_path, exist_ok=True) self.plotter.plotNetworkWithWeights(self.network, self.dataset) test_points_x = np.linspace(-1,1,101) test_points_y = np.linspace(1,-1,101) self.network.saveModel(self.folder_path) labelled_data_A = np.zeros((data_size, data.input_data_dimension)) labelled_data_B = np.zeros((data_size, data.input_data_dimension)) self.saveMetricsHistory() for k in range(data_size): if data.labelled_data[k] > 0: labelled_data_A[k] = data.all_data[k] else: labelled_data_B[k] = data.all_data[k] def saveMetricsHistory(self): ep, tr_l_h = zip(*self.test_loss_history) ep, te_l_h = zip(*self.test_loss_history) ep, tr_a_h = zip(*self.train_accuracy_history) ep, te_a_h = zip(*self.test_accuracy_history) labelled_data_A = labelled_data_A[~np.all(labelled_data_A == 0, axis=1)] labelled_data_B = labelled_data_B[~np.all(labelled_data_B == 0, axis=1)] np.savetxt(self.folder_path+"metrics_history", np.column_stack((ep, tr_l_h, te_l_h, tr_a_h, te_a_h))) epsilon = 0.05 costFunction = 1.0 costFunction_training_value = 1.0 while training_controller.epoch < training_controller.max_epochs and training_controller.train_loss >= epsilon: print("Epoch "+str(training_controller.epoch)+":") data_size = 1000 training_data_size = 300 training_controller.trainOneEpochSGD(data,10) #training_controller.trainOneEpochMiniBGD(data) #training_controller.trainOneEpochBGD(data) data = Dataset(data_size, training_data_size) if(data.initializeDataSet("spiral") == False): print("Dataset not defined") sys.exit(1) training_controller.updateLossFunctionAfterEpoch() training_controller.plotAfterEpoch() structure = [data.input_data_dimension,5,5,5,data.output_data_dimension] network = NeuralNetwork(structure) training_controller.epoch += 1 training_controller = TrainingController(network, data) training_controller.saveTrainedModel() No newline at end of file training_controller.trainOnDataset() No newline at end of file NeuralNetwork.py +40 −7 Original line number Diff line number Diff line from Layer import Layer from Data import additional_input from Dataset import additional_input import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt import time import os class NeuralNetwork: errors = [] Loading Loading @@ -68,8 +66,8 @@ class NeuralNetwork: #return (1.0/(1+np.exp(-num)))*(1.0 - 1.0/(1+np.exp(-num))) def derivativeOfActivationOutput(self,num): #return 1 return 1-pow(np.tanh(num),2) return 1 #return 1-pow(np.tanh(num),2) #return (num > 0) * 1 #return (1.0/(1+np.exp(-num)))*(1.0 - 1.0/(1+np.exp(-num))) Loading @@ -90,6 +88,15 @@ class NeuralNetwork: self.loss_function = 0.0 return value def getAccuracy(self, input, output): right = 0 for i in range(len(input)): if self.getResult(input[i]) > 0 and output[i] > 0: right+=1 elif self.getResult(input[i]) < 0 and output[i] < 0: right+=1 return right/len(input)*100 #percentage def computeHeatMap(self, map_coord_x, map_coord_y): x = len(map_coord_x) y = len(map_coord_y) Loading Loading @@ -166,4 +173,30 @@ class NeuralNetwork: def printResult(self): print(self.layer[self.depth-1].neurons) def summary(self): print() print("Model summary:") print("____________________________________") print("Neurons in layer 1: \t"+str(self.structure[0])) total_parameters = 0 for i in range(1,self.depth,1): print("Parameters between: \t"+str(self.structure[i]*self.structure[i-1])) print("Neurons in layer "+str(i+1)+": \t"+str(self.structure[i])) total_parameters += self.structure[i]*self.structure[i-1] print("____________________________________") print("Total number of parameters: "+str(total_parameters)) def saveModel(self, folder_path): path_weights = folder_path+"weights/" os.makedirs(path_weights, exist_ok=True) np.save(folder_path+"structure", self.structure) for i in range(1, self.depth): np.save(path_weights+"layer"+str(i), self.layer[i].weights) Plotter.py 0 → 100644 +85 −0 File added.Preview size limit exceeded, changes collapsed. Show changes Loading
Data.py→Dataset.py +43 −29 Original line number Diff line number Diff line Loading @@ -16,28 +16,44 @@ def additional_input(k, x, y): else: return x*y class Data: input_data_dimension = 2 class Dataset: input_data_dimension = 4 input_coord_dimension = 2 output_data_dimension = 1 data_size = 1 all_data = [] labels = [] training_data_size = 1 training_indices = [] training_data = [] labelled_data = [] train_data_size = 1 train_indices = [] train_data = [] train_labels = [] test_data_size = 1 test_indices = [] test_data = [] test_labels = [] labelled_data_A = [] labelled_data_B = [] #test points for the heatmap - modification of the heatmap resolution test_points_x = np.linspace(-1,1,51) test_points_y = np.linspace(1,-1,51) def __init__(self, data_size, training_data_size): self.data_size = data_size self.training_data_size = training_data_size self.train_data_size = training_data_size self.test_data_size = data_size - training_data_size def initializeDataSet(self, type): self.createAllData(type) self.addAdditionalInputs() self.splitToClasses() self.splitDataToTrainingTest() def norm(self,x,y): return np.sqrt(x**2 + y**2) Loading Loading @@ -78,19 +94,17 @@ class Data: def splitDataToTrainingTest(self): self.training_indices = random.sample(range(self.data_size), self.training_data_size) self.training_data = self.all_data[self.training_indices,:] self.train_indices = random.sample(range(self.data_size), self.train_data_size) self.train_data = self.all_data[self.train_indices,:] self.train_labels = self.labels[self.train_indices] self.test_indices = np.delete(range(self.data_size), self.training_indices, axis=0) self.test_indices = np.delete(range(self.data_size), self.train_indices, axis=0) self.test_data = self.all_data[self.test_indices,:] self.test_labels = self.labels[self.test_indices] if len(self.test_data) == self.test_data_size: print("OK") def initializeDataSet(self, type): self.createAllData(type) self.addAdditionalInputs() self.splitDataToTrainingTest() def splitToClasses(self): self.labelled_data_A = self.all_data[self.labels == 1] self.labelled_data_B = self.all_data[self.labels == -1] def createLinear(self): self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension)) Loading @@ -99,7 +113,7 @@ class Data: self.all_data[i][1] = self.all_data[i][1] else: self.all_data[i][1] = self.all_data[i][1] self.labelled_data = 2*(self.all_data[:,1] < 0)-1 self.labels = 2*(self.all_data[:,1] < 0)-1 def createElipse(self): self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension)) Loading @@ -107,7 +121,7 @@ class Data: if self.norm((self.all_data[i][0])/1, (self.all_data[i][1])/1) < 0.7: self.all_data[i][0] = 0.5*self.all_data[i][0] self.all_data[i][1] = 0.5*self.all_data[i][1] self.labelled_data = 2*(self.norm((self.all_data[:,0]/1), (self.all_data[:,1])/1) < 0.7)-1 self.labels = 2*(self.norm((self.all_data[:,0]/1), (self.all_data[:,1])/1) < 0.7)-1 def createSinus(self): self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension)) Loading @@ -116,7 +130,7 @@ class Data: self.all_data[i][1] -= 0.0 else: self.all_data[i][1] += 0.0 self.labelled_data = 2*(self.all_data[:,1] < 0.5*np.sin(4*self.all_data[:,0]))-1 self.labels = 2*(self.all_data[:,1] < 0.5*np.sin(4*self.all_data[:,0]))-1 def createHalfElipse(self): self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension)) Loading @@ -130,7 +144,7 @@ class Data: self.all_data[i][1] += 0.2 else: self.all_data[i][1] = np.maximum(-1, self.all_data[i][1]-0.1) self.labelled_data = 2*(self.norm(self.all_data[:,0], self.all_data[:,1]+1.2) < 1.2)-1 self.labels = 2*(self.norm(self.all_data[:,0], self.all_data[:,1]+1.2) < 1.2)-1 def createSpiral(self): # Define the center of the spiral, as well as the starting angle and the distance between the lines Loading @@ -138,11 +152,11 @@ class Data: center_y = 0 angle = 0.5 distance = 0.1 noise = 0.02 noise = 0.04 # Create an empty list to store the points of the spiral self.all_data = np.zeros((self.data_size,2)) self.labelled_data = np.zeros(self.data_size) self.labels = np.zeros(self.data_size) # Create a loop to generate the points of the spiral for i in range(math.floor(self.data_size/2)): Loading @@ -155,9 +169,9 @@ class Data: # Add the point to the list of points self.all_data[2*i] = [x,y] self.labelled_data[2*i] = -1 self.labels[2*i] = -1 self.all_data[2*i+1] = [x_2,y_2] self.labelled_data[2*i+1] = 1 self.labels[2*i+1] = 1 # Increase the angle and distance for the next iteration angle += 4*math.pi/self.data_size Loading @@ -165,7 +179,7 @@ class Data: def createTwoElipses(self): self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension)) self.labelled_data = np.zeros(self.data_size) self.labels = np.zeros(self.data_size) center_x1 = 0.4 center_y1 = 0.4 Loading @@ -178,10 +192,10 @@ class Data: if self.norm((self.all_data[i][0]-center_x1)/1, (self.all_data[i][1]-center_y1)/1) < range1: self.all_data[i][0] = 0.8*(self.all_data[i][0]-center_x1) + center_x1 self.all_data[i][1] = 0.8*(self.all_data[i][1]-center_y1) + center_y1 self.labelled_data[i] = 1 self.labels[i] = 1 if self.norm((self.all_data[i][0]-center_x2)/1, (self.all_data[i][1]-center_y2)/1) < range2: self.all_data[i][0] = 0.8*(self.all_data[i][0]-center_x2) + center_x2 self.all_data[i][1] = 0.8*(self.all_data[i][1]-center_y2) + center_y2 self.labelled_data[i] = 1 self.labelled_data = 2*self.labelled_data-1 self.labels[i] = 1 self.labels = 2*self.labels-1
Layer.py +2 −2 Original line number Diff line number Diff line Loading @@ -21,9 +21,9 @@ class Layer: #return 1.0/(1+np.exp(-num)) def activationOutput(self,num): #return num return num #return np.maximum(num,0) return np.tanh(num) #return np.tanh(num) #print(1.0/(1+np.exp(-num))) #return 1.0/(1+np.exp(-num)) Loading
LearningController.py +77 −138 Original line number Diff line number Diff line import numpy as np from NeuralNetwork import NeuralNetwork from Data import Data import matplotlib as mpl import matplotlib.pyplot as plt from Dataset import Dataset from Plotter import Plotter import sys import os import random from matplotlib.patches import ConnectionPatch class TrainingController: folder_path = "model/" learning_rate = 0.1 learning_rate = 0.05 l2_regularization = 0.0 batch_size = 10 max_epochs = 1000 max_epochs = 100 epoch = 0 epsilon = 0.05 train_loss = 1.0 train_loss_baseline = 1.0 test_loss = 1.0 test_loss_baseline = 1.0 total_loss = 1.0 total_loss_baseline = 1.0 epoch = 0 train_loss_history = [] test_loss_history = [] def __init__(self, network): train_accuracy = 1.0 test_accuracy = 1.0 train_accuracy_history =[] test_accuracy_history = [] def __init__(self, network: NeuralNetwork, dataset: Dataset): self.network = network self.network.setLearningParameters(self.learning_rate, self.l2_regularization) plt.ion() self.fig, (self.state,self.cost) = plt.subplots(1,2, figsize=(10, 5)) self.dataset = dataset self.plotter = Plotter(self.folder_path) def trainOnBatch(self, batch, labels): for j in range(len(batch)): Loading @@ -39,13 +41,13 @@ class TrainingController: self.network.calculateErrors(labels[j]) self.network.updateGradient(len(batch)) def trainOneEpochBGD(self, dataset: Data): def trainOneEpochBGD(self): self.network.resetBeforeEpoch() shuffled_indexes = np.arange(dataset.training_data_size) shuffled_indexes = np.arange(self.dataset.train_data_size) np.random.shuffle(shuffled_indexes) number_of_full_batches = dataset.training_data_size // self.batch_size number_of_full_batches = self.dataset.train_data_size // self.batch_size for batch_number in range(number_of_full_batches+1): Loading @@ -54,17 +56,17 @@ class TrainingController: else: batch_indexes = shuffled_indexes[batch_number*self.batch_size:] batch = dataset.all_data[batch_indexes,:] labels = dataset.labelled_data[batch_indexes] batch = self.dataset.all_data[batch_indexes,:] labels = self.dataset.labels[batch_indexes] self.trainOnBatch(batch, labels) self.network.updateWeights() def trainOneEpochMiniBGD(self, dataset: Data): shuffled_indexes = np.arange(dataset.training_data_size) def trainOneEpochMiniBGD(self): shuffled_indexes = np.arange(self.dataset.train_data_size) np.random.shuffle(shuffled_indexes) number_of_full_batches = dataset.training_data_size // self.batch_size number_of_full_batches = self.dataset.train_data_size // self.batch_size for batch_number in range(number_of_full_batches+1): self.network.resetBeforeEpoch() Loading @@ -73,25 +75,25 @@ class TrainingController: else: batch_indexes = shuffled_indexes[batch_number*self.batch_size:] batch = dataset.all_data[batch_indexes,:] labels = dataset.labelled_data[batch_indexes] batch = self.dataset.all_data[batch_indexes,:] labels = self.dataset.labels[batch_indexes] self.trainOnBatch(batch, labels) self.network.updateWeights() def trainOneEpochSGD(self, dataset: Data, multiple_passes): batch_indexes = random.sample(dataset.training_indices, training_controller.batch_size) batch = dataset.all_data[batch_indexes,:] def trainOneEpochSGD(self, multiple_passes): batch_indexes = random.sample(self.dataset.train_indices, self.batch_size) batch = self.dataset.all_data[batch_indexes,:] labels = dataset.labelled_data[batch_indexes] labels = self.dataset.labels[batch_indexes] for n in range(multiple_passes): self.network.resetBeforeEpoch() self.trainOnBatch(batch, labels) self.network.updateWeights() def updateLossFunctionAfterEpoch(self): self.train_loss = self.network.updateLossFunction(data.all_data[data.training_indices], data.labelled_data[data.training_indices]) self.test_loss = self.network.updateLossFunction(data.all_data[data.test_indices], data.labelled_data[data.test_indices]) def updateMetricsAfterEpoch(self): self.train_loss = self.network.updateLossFunction(self.dataset.train_data, self.dataset.train_labels) self.test_loss = self.network.updateLossFunction(self.dataset.test_data, self.dataset.test_labels) self.total_loss = self.train_loss+self.test_loss if self.epoch == 0: self.train_loss_baseline = self.train_loss Loading @@ -102,141 +104,78 @@ class TrainingController: self.test_loss /= self.test_loss_baseline self.total_loss /= self.total_loss_baseline self.train_loss_history.append((self.epoch, self.train_loss)) self.test_loss_history.append((self.epoch, (self.test_loss))) print("Cost function: "+str(self.total_loss)) print("Training cost function: "+str(self.train_loss)) self.train_accuracy = self.network.getAccuracy(self.dataset.train_data, self.dataset.train_labels) self.test_accuracy = self.network.getAccuracy(self.dataset.test_data, self.dataset.test_labels) def plotAfterEpoch(self): if self.epoch % 10 == 0: self.cost.plot(*zip(*self.train_loss_history), 'k-', label='Training data') self.cost.plot(*zip(*self.test_loss_history), 'b-', label='Testing data') self.train_accuracy_history.append((self.epoch, self.train_accuracy)) self.test_accuracy_history.append((self.epoch, self.test_accuracy)) colormap = self.network.computeHeatMap2(test_points_x, test_points_y) self.train_loss_history.append((self.epoch, self.train_loss)) self.test_loss_history.append((self.epoch, self.test_loss)) #heatmap = proc.imshow(colormap, cmap='RdYlGn', extent=([-1, 1, -1, 1]), interpolation='bilinear', vmin=-1, vmax=1) self.state.imshow(colormap, cmap='RdYlGn', extent=([-1, 1, -1, 1]), interpolation='bilinear', vmin=-1, vmax=1) self.state.plot(labelled_data_A[:,0], labelled_data_A[:,1], 'go', label="Class A") self.state.plot(labelled_data_B[:,0], labelled_data_B[:,1], 'ro', label="Class B") self.state.plot(data.training_data[:,0], data.training_data[:,1], 'k.', label="Training data") print("Train loss: "+str(np.round(self.train_loss,4))+", Test loss: "+str(np.round(self.test_loss,4))+", Train accuracy: "+str(np.round(self.train_accuracy,2))+" %, Test accuracy: "+str(np.round(self.test_accuracy,2))+" %") if(self.epoch == 0): self.state.legend(loc='lower center', bbox_to_anchor=(0.5, 1.0), ncols=3) #fig.colorbar(heatmap, orientation="horizontal") self.cost.legend(loc='lower center', bbox_to_anchor=(0.5, 1.0), ncols=2) def updateAfterEpoch(self): self.fig.canvas.draw() self.fig.canvas.flush_events() heatmap = self.network.computeHeatMap(self.dataset.test_points_x, self.dataset.test_points_y) def trainOnDataset(self,dataset,number_of_epochs): for i in range(number_of_epochs): self.trainOneEpoch(dataset) # do functions for graphics self.plotter.plotStateAfterEpoch(self.dataset, heatmap, self.train_loss_history, self.test_loss_history, self.train_accuracy_history, self.test_accuracy_history, self.epoch) def saveTrainedModel(self): #TODO saving plots, history in txt, weights in txt and numpy for network loading colormap = self.network.computeHeatMap(test_points_x, test_points_y) #heatmap = proc.imshow(colormap, cmap='RdYlGn', extent=([-1, 1, -1, 1]), interpolation='bilinear', vmin=-1, vmax=1) self.state.imshow(colormap, cmap='RdYlGn', extent=([-1, 1, -1, 1]), interpolation='bilinear', vmin=-1, vmax=1) self.state.plot(labelled_data_A[:,0], labelled_data_A[:,1], 'go', label="Class A") self.state.plot(labelled_data_B[:,0], labelled_data_B[:,1], 'ro', label="Class B") self.state.plot(data.training_data[:,0], data.training_data[:,1], 'k.', label="Training data") #numberOfNeurons = np.sum(structure) maxNeuronsInLayer = np.max(structure) networkMap = self.network.computeWholeNeuralNetworkHeatMap(test_points_x, test_points_y, len(structure), maxNeuronsInLayer) #fig2, subfigs = plt.subplots(maxNeuronsInLayer, len(structure), figsize=(4*maxNeuronsInLayer, 4*maxNeuronsInLayer)) subfigs = [] fig2 = plt.figure(figsize=(4*len(structure), 4*maxNeuronsInLayer)) max_weights_in_layer = [] for i in range(len(structure)): max_weights_in_layer.append(np.max(self.network.layer[i].weights)) max_weight = np.max(np.array(max_weights_in_layer)) for i in range(len(structure)): for j in range(structure[i]): subfig = fig2.add_subplot(maxNeuronsInLayer, len(structure), i+len(structure)*j+1) subfig.imshow(networkMap[i+len(structure)*j], cmap='RdYlGn', extent=([-1, 1, -1, 1]), vmin=-1, vmax=1) if(i > 0): subfig.set_xticks([]) # Remove x-axis tick labels subfig.set_yticks([]) # Remove y-axis tick labels subfigs.append(subfig) index_of_first_neuron_in_previous_layer = int(np.sum(structure[:i-1])) if(i > 0): xy1=[1,0] xy2=[-1,0] for index in range(structure[i-1]): weight = self.network.layer[i].weights[j][index]/max_weight if weight >= 0: con = ConnectionPatch(xyA=xy2, xyB=xy1, coordsA="data", coordsB="data", axesA=subfig, axesB=subfigs[index_of_first_neuron_in_previous_layer+index], color="red", linestyle='dashed', lw=3.0*weight) else: con = ConnectionPatch(xyA=xy2, xyB=xy1, coordsA="data", coordsB="data", axesA=subfig, axesB=subfigs[index_of_first_neuron_in_previous_layer+index], color="green", linestyle='dashed', lw=-3.0*weight) def trainOnDataset(self): os.makedirs(training_controller.folder_path, exist_ok=True) subfig.add_artist(con) while self.epoch < self.max_epochs and self.train_loss >= self.epsilon: print("Epoch "+str(self.epoch)+":") self.trainOneEpochSGD(10) #self.trainOneEpochMiniBGD(data) #self.trainOneEpochBGD(data) fig2.canvas.draw() fig2.canvas.flush_events() self.updateMetricsAfterEpoch() fig2.savefig(self.folder_path+"heatmap", dpi=500) self.fig.savefig(self.folder_path+"final_state", dpi=300) if self.epoch % 10 == 0: self.updateAfterEpoch() plt.show(block='false') self.epoch += 1 self.saveTrainedModel() data_size = 1000 training_data_size = 300 self.network.summary() data = Data(data_size, training_data_size) if(data.initializeDataSet("two_elipses") == False): print("Dataset not defined") sys.exit(1) structure = [data.input_data_dimension,5,5,5,data.output_data_dimension] network = NeuralNetwork(structure) def saveTrainedModel(self): #TODO saving plots, history in txt, weights in txt and numpy for network loading training_controller = TrainingController(network) self.updateAfterEpoch() os.makedirs(training_controller.folder_path, exist_ok=True) self.plotter.plotNetworkWithWeights(self.network, self.dataset) test_points_x = np.linspace(-1,1,101) test_points_y = np.linspace(1,-1,101) self.network.saveModel(self.folder_path) labelled_data_A = np.zeros((data_size, data.input_data_dimension)) labelled_data_B = np.zeros((data_size, data.input_data_dimension)) self.saveMetricsHistory() for k in range(data_size): if data.labelled_data[k] > 0: labelled_data_A[k] = data.all_data[k] else: labelled_data_B[k] = data.all_data[k] def saveMetricsHistory(self): ep, tr_l_h = zip(*self.test_loss_history) ep, te_l_h = zip(*self.test_loss_history) ep, tr_a_h = zip(*self.train_accuracy_history) ep, te_a_h = zip(*self.test_accuracy_history) labelled_data_A = labelled_data_A[~np.all(labelled_data_A == 0, axis=1)] labelled_data_B = labelled_data_B[~np.all(labelled_data_B == 0, axis=1)] np.savetxt(self.folder_path+"metrics_history", np.column_stack((ep, tr_l_h, te_l_h, tr_a_h, te_a_h))) epsilon = 0.05 costFunction = 1.0 costFunction_training_value = 1.0 while training_controller.epoch < training_controller.max_epochs and training_controller.train_loss >= epsilon: print("Epoch "+str(training_controller.epoch)+":") data_size = 1000 training_data_size = 300 training_controller.trainOneEpochSGD(data,10) #training_controller.trainOneEpochMiniBGD(data) #training_controller.trainOneEpochBGD(data) data = Dataset(data_size, training_data_size) if(data.initializeDataSet("spiral") == False): print("Dataset not defined") sys.exit(1) training_controller.updateLossFunctionAfterEpoch() training_controller.plotAfterEpoch() structure = [data.input_data_dimension,5,5,5,data.output_data_dimension] network = NeuralNetwork(structure) training_controller.epoch += 1 training_controller = TrainingController(network, data) training_controller.saveTrainedModel() No newline at end of file training_controller.trainOnDataset() No newline at end of file
NeuralNetwork.py +40 −7 Original line number Diff line number Diff line from Layer import Layer from Data import additional_input from Dataset import additional_input import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt import time import os class NeuralNetwork: errors = [] Loading Loading @@ -68,8 +66,8 @@ class NeuralNetwork: #return (1.0/(1+np.exp(-num)))*(1.0 - 1.0/(1+np.exp(-num))) def derivativeOfActivationOutput(self,num): #return 1 return 1-pow(np.tanh(num),2) return 1 #return 1-pow(np.tanh(num),2) #return (num > 0) * 1 #return (1.0/(1+np.exp(-num)))*(1.0 - 1.0/(1+np.exp(-num))) Loading @@ -90,6 +88,15 @@ class NeuralNetwork: self.loss_function = 0.0 return value def getAccuracy(self, input, output): right = 0 for i in range(len(input)): if self.getResult(input[i]) > 0 and output[i] > 0: right+=1 elif self.getResult(input[i]) < 0 and output[i] < 0: right+=1 return right/len(input)*100 #percentage def computeHeatMap(self, map_coord_x, map_coord_y): x = len(map_coord_x) y = len(map_coord_y) Loading Loading @@ -166,4 +173,30 @@ class NeuralNetwork: def printResult(self): print(self.layer[self.depth-1].neurons) def summary(self): print() print("Model summary:") print("____________________________________") print("Neurons in layer 1: \t"+str(self.structure[0])) total_parameters = 0 for i in range(1,self.depth,1): print("Parameters between: \t"+str(self.structure[i]*self.structure[i-1])) print("Neurons in layer "+str(i+1)+": \t"+str(self.structure[i])) total_parameters += self.structure[i]*self.structure[i-1] print("____________________________________") print("Total number of parameters: "+str(total_parameters)) def saveModel(self, folder_path): path_weights = folder_path+"weights/" os.makedirs(path_weights, exist_ok=True) np.save(folder_path+"structure", self.structure) for i in range(1, self.depth): np.save(path_weights+"layer"+str(i), self.layer[i].weights)