Commit 33da8bda authored by Jan Kovář's avatar Jan Kovář
Browse files

Merge branch 'develop' into 'main'

DNN playground 1.0

See merge request !2
parents 875904f7 129eb779
Loading
Loading
Loading
Loading

.gitignore

0 → 100644
+3 −0
Original line number Diff line number Diff line
custom_datasets/
model/
__pycache__/
 No newline at end of file

dataset.py

0 → 100644
+239 −0
Original line number Diff line number Diff line
import numpy as np
import math
import random
from textDatasetGenerator import generateTextDataset
import sys

def additional_input(k, x, y):
    if k == 0:
        return x*x
    elif k == 1:
        return y*y
    elif k == 2:
        return x*y
    elif k == 3:
        return np.sin(x)*np.cos(y)
    elif k == 4:
        return np.sin(y)*np.cos(x)
    else:
        return x*y

class Dataset:
    input_data_dimension = 4 
    input_coord_dimension = 2
    output_data_dimension = 1
    data_size = 1

    all_data = []
    labels = []

    train_data_size = 1
    train_indices = []
    train_data = []
    train_labels = []

    val_data_size = 1
    val_indices = []
    val_data = []
    val_labels = []

    test_data_size = 1
    test_indices = []
    test_data = []
    test_labels = []

    labelled_data_A = []
    labelled_data_B = []

    #test points for the heatmap - modification of the heatmap resolution
    test_points_x = np.linspace(-1,1,51)
    test_points_y = np.linspace(1,-1,51)


    def __init__(self, data_size, training_data_size, validation_data_size, test_data_size):
        self.data_size = data_size
        self.train_data_size = training_data_size
        self.val_data_size = validation_data_size
        if test_data_size != data_size - training_data_size - validation_data_size:
            print("Your dataset is not split into training/val/test data properly. The sum must be the total number of datapoints!")
            sys.exit(1)
        self.test_data_size = data_size - training_data_size - validation_data_size

    def initializeDataSet(self, type):
        self.createAllData(type)
        self.addAdditionalInputs()
        self.splitToClasses()
        self.splitDataToTrainingTest()

    def norm(self,x,y):
        return np.sqrt(x**2 + y**2)

    def addAdditionalInputs(self):
        for j in range(self.input_data_dimension-self.input_coord_dimension):
            input = additional_input(j,self.all_data[:,0], self.all_data[:,1])
            self.all_data = np.column_stack((self.all_data, input))


    def createAllData(self, type):
        if(type == "linear"):
            self.createLinear()
            return True 

        elif(type == "elipse"):
            self.createElipse()
            return True

        elif(type == "two_elipses"):
            self.createTwoElipses()
            return True
        
        elif(type == "sinus"):
            self.createSinus()
            return True

        elif(type == "half_elipse"):
            self.createHalfElipse()
            return True

        elif(type == "spiral"):
            self.createSpiral()
            return True
        
        elif(type == "custom"):
            self.createCustomDatasetFromFiles()
            return True
        
        else:
            return False
        
        

    def splitDataToTrainingTest(self):
        self.train_indices = random.sample(range(self.data_size), self.train_data_size)
        self.train_data = self.all_data[self.train_indices,:]
        self.train_labels = self.labels[self.train_indices]

        self.val_indices = random.sample(list(np.delete(range(self.data_size), self.train_indices, axis=0)), self.val_data_size)
        self.val_data = self.all_data[self.val_indices,:]
        self.val_labels = self.labels[self.val_indices]

        self.test_indices = np.delete(range(self.data_size), self.train_indices+self.val_indices, axis=0)
        self.test_data = self.all_data[self.test_indices,:]
        self.test_labels = self.labels[self.test_indices]

    def splitToClasses(self):
        self.labelled_data_A = self.all_data[self.labels == 1]
        self.labelled_data_B = self.all_data[self.labels == -1]

    def createLinear(self):
        self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension))
        for i in range(self.data_size):
            if(self.all_data[i][1] < 0):
                self.all_data[i][1] = self.all_data[i][1]
            else:
                self.all_data[i][1] = self.all_data[i][1]
        self.labels = 2*(self.all_data[:,1] < 0)-1

    def createElipse(self):
        self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension))
        for i in range(self.data_size):
            if self.norm((self.all_data[i][0])/1, (self.all_data[i][1])/1) < 0.7:
                self.all_data[i][0] = 0.5*self.all_data[i][0]
                self.all_data[i][1] = 0.5*self.all_data[i][1]
        self.labels = 2*(self.norm((self.all_data[:,0]/1), (self.all_data[:,1])/1) < 0.7)-1
        #self.labels = (self.norm((self.all_data[:,0]/1), (self.all_data[:,1])/1) < 0.7)
    
    def createSinus(self):
        self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension))
        for i in range(self.data_size):
            if self.all_data[i][1] < 0.5*np.sin(4*self.all_data[i][0]):
                self.all_data[i][1] -= 0.0
            else:
                self.all_data[i][1] += 0.0
        self.labels = 2*(self.all_data[:,1] < 0.5*np.sin(4*self.all_data[:,0]))-1

    def createHalfElipse(self):
        self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension))
        for i in range(self.data_size):
            if self.norm(self.all_data[i][0], self.all_data[i][1]+1.2) < 1.2:
                if self.all_data[i][0] > 0:
                    self.all_data[i][0] -= 0.2
                else:
                    self.all_data[i][0] += 0.2
                if self.all_data[i][1] > 0:
                    self.all_data[i][1] += 0.2
                else:
                    self.all_data[i][1] = np.maximum(-1, self.all_data[i][1]-0.1)
        self.labels = 2*(self.norm(self.all_data[:,0], self.all_data[:,1]+1.2) < 1.2)-1

    def createSpiral(self):
        # Define the center of the spiral, as well as the starting angle and the distance between the lines
        center_x = 0
        center_y = 0
        angle = 0.5
        distance = 0.1
        noise = 0.04

        # Create an empty list to store the points of the spiral
        self.all_data = np.zeros((self.data_size,2))
        self.labels = np.zeros(self.data_size)

        # Create a loop to generate the points of the spiral
        for i in range(math.floor(self.data_size/2)):
            # Calculate the x and y coordinates of the next point in the spiral
            x = center_x + distance * math.cos(angle) + noise*np.random.standard_normal()
            y = center_y + distance * math.sin(angle) + noise*np.random.standard_normal()

            x_2 = center_x + distance * math.cos(angle + math.pi) + noise*np.random.standard_normal()
            y_2 = center_y + distance * math.sin(angle + math.pi) + noise*np.random.standard_normal()
            
            # Add the point to the list of points
            self.all_data[2*i] = [x,y]
            self.labels[2*i] = -1
            self.all_data[2*i+1] = [x_2,y_2]
            self.labels[2*i+1] = 1
            
            # Increase the angle and distance for the next iteration
            angle += 4*math.pi/self.data_size
            distance += 1.5/self.data_size

    def createTwoElipses(self):
        self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension))
        self.labels = np.zeros(self.data_size)

        center_x1 = 0.4
        center_y1 = 0.4
        range1 = 0.5
        center_x2 = -0.8
        center_y2 = -0.8
        range2 = 0.4

        for i in range(self.data_size):
            if self.norm((self.all_data[i][0]-center_x1)/1, (self.all_data[i][1]-center_y1)/1) < range1:
                self.all_data[i][0] = 0.8*(self.all_data[i][0]-center_x1) + center_x1
                self.all_data[i][1] = 0.8*(self.all_data[i][1]-center_y1) + center_y1
                self.labels[i] = 1
            if self.norm((self.all_data[i][0]-center_x2)/1, (self.all_data[i][1]-center_y2)/1) < range2:
                self.all_data[i][0] = 0.8*(self.all_data[i][0]-center_x2) + center_x2
                self.all_data[i][1] = 0.8*(self.all_data[i][1]-center_y2) + center_y2
                self.labels[i] = 1
        self.labels = 2*self.labels-1

    def createCustomDatasetFromFiles(self):
        if not generateTextDataset(self.data_size):
            print("Datasize for custom dataset must be even number.")
            sys.exit()
        self.labelled_data_A = np.load('custom_datasets/classA.npy')
        self.labelled_data_B = np.load('custom_datasets/classB.npy')
        self.all_data = np.zeros((self.data_size, self.input_coord_dimension))
        print(self.data_size)
        self.labels = np.zeros(self.data_size)
        
        for i in range(len(self.labelled_data_A)):
            self.labels[i] = 1
            self.all_data[i] = self.labelled_data_A[i]
        
        for i in range(len(self.labelled_data_B)):
            self.all_data[i+len(self.labelled_data_A)] = self.labelled_data_B[i]

        self.labels = 2*self.labels-1

layer.py

0 → 100644
+38 −0
Original line number Diff line number Diff line
import numpy as np

class Layer:
    def __init__(self, dimension, prev_layer_dimension):
        #Number of neurons in the layer
        self.dimension = dimension
        #Previous layer dimension - important for the dimension of weight matrix
        self.prev_layer_dimension = prev_layer_dimension

        self.neurons = np.zeros(dimension)
        self.neurons_notActivated = np.zeros(dimension)
        self.weights = 2*np.random.rand(dimension, prev_layer_dimension)-1 
        self.gradient = np.zeros((dimension, prev_layer_dimension))
        self.bias = np.zeros(dimension)
        self.gradient_bias = np.zeros(dimension)
    
    def activationFunction(self,num):
        #return num
        return np.maximum(num,0)
        #return np.tanh(num)
        #return 1.0/(1+np.exp(-num))

    def activationOutput(self,num):
        return num
        #return np.maximum(num,0)
        #return np.tanh(num)
        #print(1.0/(1+np.exp(-num)))
        #return 1.0/(1+np.exp(-num))

    def activateLayer(self, prev_layer_data):
        self.neurons_notActivated = np.matmul(self.weights, prev_layer_data) + self.bias
        self.neurons = self.activationFunction(self.neurons_notActivated)

    def activateOutputLayer(self, prev_layer_data):
        self.neurons_notActivated = np.matmul(self.weights, prev_layer_data) + self.bias
        self.neurons = self.activationOutput(self.neurons_notActivated)

neuralNetwork.py

0 → 100644
+211 −0
Original line number Diff line number Diff line
from layer import Layer
from dataset import additional_input
import numpy as np
import os
import shutil
import sys

class NeuralNetwork:  
    errors = []
    gradient = []
    learning_rate = 0.01
    l2_regularization = 0.0
    loss_function = 0.0

    def __init__(self, structure):
        #Number of layers
        self.depth = len(structure)
        #Layer structure vector - number of neurons in the layers
        self.structure = structure
        self.layer = []

        self.setNeuralNetwork()

    def setLearningParameters(self, learning_rate, l2_reg):
        self.learning_rate = learning_rate
        self.l2_regularization = l2_reg
    
    def setNeuralNetwork(self):
        self.layer.append(Layer(self.structure[0],1))
        self.errors.append(np.zeros(self.structure[0]))
        for i in range(self.depth-1):
            self.layer.append(Layer(self.structure[i+1], self.structure[i]))
            self.errors.append(np.zeros(self.structure[i+1]))
    
    def activation(self,input_data):
        if len(input_data) != self.layer[0].dimension:
            return 0
        else:
            self.layer[0].neurons = input_data
            for i in range(1,self.depth-1):
                self.layer[i].activateLayer(self.layer[i-1].neurons)
            self.layer[self.depth-1].activateOutputLayer(self.layer[self.depth-2].neurons)
        return 1

    def calculateErrors(self, output):
        self.errors[self.depth-1] = np.multiply((self.layer[self.depth-1].neurons - output), self.derivativeOfActivationOutput(self.layer[self.depth-1].neurons_notActivated))   
        for i in range(self.depth-2,0,-1):
            self.errors[i] = np.multiply(np.matmul(np.transpose(self.layer[i+1].weights),self.errors[i+1]), self.derivativeOfActivationFunction(self.layer[i].neurons_notActivated))
        
    def computeLossFunction(self, output):
        self.loss_function += 0.5*np.dot(self.layer[self.depth-1].neurons - output,self.layer[self.depth-1].neurons - output)

    def updateGradient(self, batch_size):
        for n in range(self.depth-1, 0, -1):
            self.layer[n].gradient += (1.0/batch_size)*np.outer(self.errors[n], self.layer[n-1].neurons)
            self.layer[n].gradient_bias += (1.0/batch_size)*self.errors[n]
    
    def resetBeforeEpoch(self):
        self.loss_function = 0.0
        for n in range(self.depth):
            self.layer[n].gradient = np.zeros((self.layer[n].dimension, self.layer[n].prev_layer_dimension))
            self.layer[n].gradient_bias = np.zeros(self.layer[n].dimension)

    def derivativeOfActivationFunction(self,num):
        #return 1
        #return 1-pow(np.tanh(num),2)
        return (num > 0) * 1
        #return (1.0/(1+np.exp(-num)))*(1.0 - 1.0/(1+np.exp(-num)))
    
    def derivativeOfActivationOutput(self,num):
        return 1
        #return 1-pow(np.tanh(num),2)
        #return (num > 0) * 1
        #return (1.0/(1+np.exp(-num)))*(1.0 - 1.0/(1+np.exp(-num)))

    def updateWeights(self):
        for n in range(self.depth-1, 0, -1):
            self.layer[n].weights -= self.learning_rate*(self.layer[n].gradient + self.l2_regularization*self.layer[n].weights)
            self.layer[n].bias -= self.learning_rate*self.layer[n].gradient_bias

    def getResult(self, input):
        if len(input) != self.structure[0]:
            print("Difference in dimension! The input layer has dimension "+str(self.structure[0])+", while the data have dimension "+str(len(input))+".")
            sys.exit(1)
        self.activation(input)
        return self.layer[self.depth-1].neurons

    def updateLossFunction(self, input, output):
        for i in range(len(input)):
            self.activation(input[i])
            self.computeLossFunction(output[i])
        value = self.loss_function
        self.loss_function = 0.0
        return value
    
    def getAccuracy(self, input, output):
        right = 0
        for i in range(len(input)):
            if self.getResult(input[i]) > 0 and output[i] > 0:
                right+=1
            elif self.getResult(input[i]) < 0 and output[i] < 0:
                right+=1
        return right/len(input)*100 #percentage

    def computeHeatMap(self, map_coord_x, map_coord_y):
        x = len(map_coord_x)
        y = len(map_coord_y)
        heatmap = np.zeros((x,y))
        for i in range(x):
            for j in range(y):
                point = [map_coord_x[j], map_coord_y[i]]
                for k in range(len(self.layer[0].neurons)-2):
                    add = additional_input(k,map_coord_x[j], map_coord_y[i])
                    point.append(add)

                heatmap[i][j] = self.getResult(point)
        return heatmap

    def getResultFromNeuron(self, layer_index, neuron_index):
        return self.layer[layer_index].neurons[neuron_index]
    
    def computeWholeNeuralNetworkHeatMapForOnePoint(self, point):
        resultsInAllNeurons = []
        self.activation(point)

        for i in range(self.depth):
            resultsInAllNeurons.append(self.layer[i].neurons)

        return resultsInAllNeurons

    def computeWholeNeuralNetworkHeatMap(self, map_coord_x, map_coord_y, grid_x, grid_y):
        x = len(map_coord_x)
        y = len(map_coord_y)
        heatmap = np.zeros((grid_x*grid_y, x,y))

        for i in range(x):
            for j in range(y):
                point = [map_coord_x[j], map_coord_y[i]]
                for k in range(len(self.layer[0].neurons)-2):
                    add = additional_input(k,map_coord_x[j], map_coord_y[i])
                    point.append(add)
                values = self.computeWholeNeuralNetworkHeatMapForOnePoint(point)
                for k in range(self.depth):
                    for l in range(len(self.layer[k].neurons)):
                        heatmap[k+self.depth*l][i][j] = values[k][l]
        return heatmap

    def printNetwork(self):
        for i in range(self.depth):
            print(self.layer[i].neurons)
            print("\n")

    def printWeights(self):
        for i in range(self.depth):
            print(self.layer[i].weights)
            print("\n")

    def printBiases(self):
        for i in range(self.depth):
            print(self.layer[i].bias)
            print("\n")
    
    def printResult(self):
        print(self.layer[self.depth-1].neurons)

    def summary(self):
        print()
        print("Model summary:")
        print("____________________________________")
        print("Neurons in layer 1: \t"+str(self.structure[0]))
        total_parameters = 0
        for i in range(1,self.depth,1):
            print("Parameters between: \t"+str(self.structure[i]*self.structure[i-1])) 
            print("Neurons in layer "+str(i+1)+": \t"+str(self.structure[i]))
            total_parameters += self.structure[i]*self.structure[i-1]
        print("____________________________________")
        print("Total number of parameters: "+str(total_parameters))

    def saveModel(self, folder_path):
        path_weights = folder_path+"weights/"
        if os.path.exists(path_weights):
            shutil.rmtree(path_weights)
        os.makedirs(path_weights, exist_ok=True)

        path_biases = folder_path+"biases/"
        if os.path.exists(path_biases):
            shutil.rmtree(path_biases)
        os.makedirs(path_biases, exist_ok=True)
        
        np.save(folder_path+"structure", self.structure)

        for i in range(1, self.depth):
            np.save(path_weights+"layer"+str(i), self.layer[i].weights)
            np.save(path_biases+"layer"+str(i), self.layer[i].bias)
    
    def loadModel(self, folder_path):
        structure = np.load(folder_path+"structure.npy")
        self.__init__(structure)

        path_weights = folder_path+"weights/"
        path_biases = folder_path+"biases/"

        for i in range(1, self.depth):
            self.layer[i].weights = np.load(path_weights+"layer"+str(i)+".npy")
            self.layer[i].bias = np.load(path_biases+"layer"+str(i)+".npy")

        
        
                

plotter.py

0 → 100644
+85 −0
Original line number Diff line number Diff line
import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib.patches import ConnectionPatch
import numpy as np

class Plotter:
    folder_path = "model/"

    def __init__(self, folder_path):
        plt.ion()
        self.fig, (self.state,self.loss,self.accuracy) = plt.subplots(1,3, figsize=(15, 5))
        self.folder_path = folder_path

    def plotStateAfterEpoch(self, dataset, heatmap, train_loss_history, test_loss_history, train_accuracy_history, test_accuracy_history, epoch):
        self.loss.plot(*zip(*train_loss_history), 'k-', label='Train loss')
        self.loss.plot(*zip(*test_loss_history), 'b-', label='Test loss')
        self.accuracy.plot(*zip(*train_accuracy_history), 'k-', label='Train accuracy')
        self.accuracy.plot(*zip(*test_accuracy_history), 'b-', label='Test accuracy')

        self.state.imshow(heatmap, cmap='RdYlGn', extent=([-1, 1, -1, 1]), interpolation='bilinear', vmin=-1, vmax=1)
        self.state.plot(dataset.labelled_data_A[:,0], dataset.labelled_data_A[:,1], 'go', label="Class A")
        self.state.plot(dataset.labelled_data_B[:,0], dataset.labelled_data_B[:,1], 'ro', label="Class B")
        self.state.plot(dataset.train_data[:,0], dataset.train_data[:,1], 'k.', label="Training data")
        
        if(epoch == 0):
            self.state.legend(loc='lower center', bbox_to_anchor=(0.5, 1.0), ncols=3)
            #fig.colorbar(heatmap, orientation="horizontal")
            self.loss.legend(loc='lower center', bbox_to_anchor=(0.5, 1.0), ncols=2)
            self.accuracy.legend(loc='lower center', bbox_to_anchor=(0.5, 1.0), ncols=2)
            self.state.set_xlabel("x [-]")
            self.state.set_xlabel("y [-]")

            self.loss.set_xlabel("Epoch")
            self.loss.set_ylabel("Loss function / baseline [-]")

            self.accuracy.set_xlabel("Epoch")
            self.accuracy.set_ylabel("Accuracy [%]")

        self.fig.canvas.draw()
        self.fig.canvas.flush_events()

    def plotNetworkWithWeights(self, network, dataset):
        maxNeuronsInLayer = np.max(network.structure)

        networkMap = network.computeWholeNeuralNetworkHeatMap(dataset.test_points_x, dataset.test_points_y, len(network.structure), maxNeuronsInLayer)

        max_weights_in_layer = []
        for i in range(len(network.structure)):
            max_weights_in_layer.append(np.max(network.layer[i].weights))
        max_weight = np.max(np.array(max_weights_in_layer))

        subfigs = []
        fig2 = plt.figure(figsize=(4*len(network.structure), 4*maxNeuronsInLayer))

        for i in range(len(network.structure)):
            for j in range(network.structure[i]):
                subfig = fig2.add_subplot(maxNeuronsInLayer,  len(network.structure), i+len(network.structure)*j+1)
                subfig.imshow(networkMap[i+len(network.structure)*j], cmap='RdYlGn', extent=([-1, 1, -1, 1]), vmin=-1, vmax=1)
                if(i > 0):
                    subfig.set_xticks([])  # Remove x-axis tick labels
                    subfig.set_yticks([])  # Remove y-axis tick labels

                subfigs.append(subfig)
                
                index_of_first_neuron_in_previous_layer = int(np.sum(network.structure[:i-1]))
                if(i > 0):
                    xy1=[1,0]
                    xy2=[-1,0]

                    for index in range(network.structure[i-1]):
                        weight = network.layer[i].weights[j][index]/max_weight
                        if weight >= 0:
                            con = ConnectionPatch(xyA=xy2, xyB=xy1, coordsA="data", coordsB="data", axesA=subfig, axesB=subfigs[index_of_first_neuron_in_previous_layer+index], color="green", linestyle='dashed', lw=3.0*weight)
                        else:
                            con = ConnectionPatch(xyA=xy2, xyB=xy1, coordsA="data", coordsB="data", axesA=subfig, axesB=subfigs[index_of_first_neuron_in_previous_layer+index], color="red", linestyle='dashed', lw=-3.0*weight)
                        
                        subfig.add_artist(con)

        fig2.canvas.draw()
        fig2.canvas.flush_events()

        fig2.savefig(self.folder_path+"heatmap", dpi=500)
        self.fig.savefig(self.folder_path+"final_state", dpi=300)

        plt.show(block='false')
 No newline at end of file
Loading