Commit 411e15f0 authored by Jan Kovář's avatar Jan Kovář
Browse files

First version with wrong batch creation - each epoch training on 1 randomly...

First version with wrong batch creation - each epoch training on 1 randomly chosen batch multiple times
parent ab3621ff
Loading
Loading
Loading
Loading

Data.py

0 → 100644
+170 −0
Original line number Diff line number Diff line
import numpy as np
import math
import random

def additional_input(k, x, y):
    if k == 0:
        return x*x
    elif k == 1:
        return y*y
    elif k == 2:
        return x*y
    else:
        return x*y

class Data:
    input_data_dimension = 4 
    input_coord_dimension = 2
    output_data_dimension = 1
    data_size = 1
    training_data_size = 1
    all_data = []
    training_indices = []
    training_data = []
    tagged_data = []

    def __init__(self, data_size, training_data_size):
        self.data_size = data_size
        self.training_data_size = training_data_size

    def norm(self,x,y):
        return np.sqrt(x**2 + y**2)

    def addAdditionalInputs(self):
        for j in range(self.input_data_dimension-self.input_coord_dimension):
            input = additional_input(j,self.all_data[:,0], self.all_data[:,1])
            self.all_data = np.column_stack((self.all_data, input))


    def createAllData(self, type):
        if(type == "linear"):
            self.createLinear()
            return True 

        elif(type == "elipse"):
            self.createElipse()
            return True

        elif(type == "two_elipses"):
            self.createTwoElipses()
            return True
        
        elif(type == "sinus"):
            self.createSinus()
            return True

        elif(type == "half_elipse"):
            self.createHalfElipse()
            return True

        elif(type == "spiral"):
            self.createSpiral()
            return True
        
        else:
            return False
        
        

    def chooseTrainingData(self):
        self.training_indices = random.sample(range(self.data_size), self.training_data_size)
        self.training_data = self.all_data[self.training_indices,:]

    def initializeDataSet(self, type):
        self.createAllData(type)
        self.addAdditionalInputs()
        self.chooseTrainingData()

    def createLinear(self):
        self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension))
        for i in range(self.data_size):
            if(self.all_data[i][1] < 0):
                self.all_data[i][1] = self.all_data[i][1]
            else:
                self.all_data[i][1] = self.all_data[i][1]
        self.tagged_data = 2*(self.all_data[:,1] < 0)-1

    def createElipse(self):
        self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension))
        for i in range(self.data_size):
            if self.norm((self.all_data[i][0])/1, (self.all_data[i][1])/1) < 0.7:
                self.all_data[i][0] = 0.5*self.all_data[i][0]
                self.all_data[i][1] = 0.5*self.all_data[i][1]
        self.tagged_data = 2*(self.norm((self.all_data[:,0]/1), (self.all_data[:,1])/1) < 0.7)-1
    
    def createSinus(self):
        self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension))
        for i in range(self.data_size):
            if self.all_data[i][1] < 0.5*np.sin(4*self.all_data[i][0]):
                self.all_data[i][1] -= 0.0
            else:
                self.all_data[i][1] += 0.0
        self.tagged_data = 2*(self.all_data[:,1] < 0.5*np.sin(4*self.all_data[:,0]))-1

    def createHalfElipse(self):
        self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension))
        for i in range(self.data_size):
            if self.norm(self.all_data[i][0], self.all_data[i][1]+1.2) < 1.2:
                if self.all_data[i][0] > 0:
                    self.all_data[i][0] -= 0.2
                else:
                    self.all_data[i][0] += 0.2
                if self.all_data[i][1] > 0:
                    self.all_data[i][1] += 0.2
                else:
                    self.all_data[i][1] = np.maximum(-1, self.all_data[i][1]-0.1)
        self.tagged_data = 2*(self.norm(self.all_data[:,0], self.all_data[:,1]+1.2) < 1.2)-1

    def createSpiral(self):
        # Define the center of the spiral, as well as the starting angle and the distance between the lines
        center_x = 0
        center_y = 0
        angle = 0.5
        distance = 0.1
        noise = 0.02

        # Create an empty list to store the points of the spiral
        self.all_data = np.zeros((self.data_size,2))
        self.tagged_data = np.zeros(self.data_size)

        # Create a loop to generate the points of the spiral
        for i in range(math.floor(self.data_size/2)):
            # Calculate the x and y coordinates of the next point in the spiral
            x = center_x + distance * math.cos(angle) + noise*np.random.standard_normal()
            y = center_y + distance * math.sin(angle) + noise*np.random.standard_normal()

            x_2 = center_x + distance * math.cos(angle + math.pi) + noise*np.random.standard_normal()
            y_2 = center_y + distance * math.sin(angle + math.pi) + noise*np.random.standard_normal()
            
            # Add the point to the list of points
            self.all_data[2*i] = [x,y]
            self.tagged_data[2*i] = -1
            self.all_data[2*i+1] = [x_2,y_2]
            self.tagged_data[2*i+1] = 1
            
            # Increase the angle and distance for the next iteration
            angle += 6*math.pi/self.data_size
            distance += 1.5/self.data_size

    def createTwoElipses(self):
        self.all_data = 2*np.random.rand(self.data_size,self.input_coord_dimension)-np.ones((self.data_size,self.input_coord_dimension))
        self.tagged_data = np.zeros(self.data_size)

        center_x1 = 0.4
        center_y1 = 0.4
        range1 = 0.4
        center_x2 = -0.8
        center_y2 = -0.8
        range2 = 0.5

        for i in range(self.data_size):
            if self.norm((self.all_data[i][0]-center_x1)/1, (self.all_data[i][1]-center_y1)/1) < range1:
                self.all_data[i][0] = 0.8*(self.all_data[i][0]-center_x1) + center_x1
                self.all_data[i][1] = 0.8*(self.all_data[i][1]-center_y1) + center_y1
                self.tagged_data[i] = 1
            if self.norm((self.all_data[i][0]-center_x2)/1, (self.all_data[i][1]-center_y2)/1) < range2:
                self.all_data[i][0] = 0.8*(self.all_data[i][0]-center_x2) + center_x2
                self.all_data[i][1] = 0.8*(self.all_data[i][1]-center_y2) + center_y2
                self.tagged_data[i] = 1
        self.tagged_data = 2*self.tagged_data-1

Layer.py

0 → 100644
+38 −0
Original line number Diff line number Diff line
import numpy as np

class Layer:
    def __init__(self, dimension, prev_layer_dimension):
        #Number of neurons in the layer
        self.dimension = dimension
        #Previous layer dimension - important for the dimension of weight matrix
        self.prev_layer_dimension = prev_layer_dimension

        self.neurons = np.zeros(dimension)
        self.neurons_notActivated = np.zeros(dimension)
        self.weights = 2*np.random.rand(dimension, prev_layer_dimension)-1 
        self.gradient = np.zeros((dimension, prev_layer_dimension))
        self.bias = np.zeros(dimension)
        self.gradient_bias = np.zeros(dimension)
    
    def activationFunction(self,num):
        #return num
        return np.maximum(num,0)
        #return np.tanh(num)
        #return 1.0/(1+np.exp(-num))

    def activationOutput(self,num):
        #return num
        #return np.maximum(num,0)
        return np.tanh(num)
        #print(1.0/(1+np.exp(-num)))
        #return 1.0/(1+np.exp(-num))

    def activateLayer(self, prev_layer_data):
        self.neurons_notActivated = np.matmul(self.weights, prev_layer_data) + self.bias
        self.neurons = self.activationFunction(self.neurons_notActivated)

    def activateOutputLayer(self, prev_layer_data):
        self.neurons_notActivated = np.matmul(self.weights, prev_layer_data) + self.bias
        self.neurons = self.activationOutput(self.neurons_notActivated)

LearningController.py

0 → 100644
+160 −0
Original line number Diff line number Diff line
import numpy as np
from NeuralNetwork import NeuralNetwork
from Data import Data
import matplotlib as mpl
import matplotlib.pyplot as plt
import sys 
import random


class LearningController:
    learning_rate = 0.1
    l2_regularization = 0.0
    batch_size = 30
    max_epochs = 2000

    def __init__(self, network):
        self.network = network
        network.setLearningParameters(self.learning_rate, self.l2_regularization)
        
    def trainOnBatch(self, batch, tags, iterationsOnOneBatch):
        batch_size = len(batch)
        for n in range(iterationsOnOneBatch):
            self.network.resetBeforeBatchLearning()
            for j in range(batch_size):
                self.network.activation(batch[j])
                #print("Po aktivaci:\n")
                #network.printNetwork()
                self.network.calculateErrors(tags[j])
                self.network.updateGradient(batch_size)
            self.network.updateWeights()
        



data_size = 1000
training_data_size = 300

data = Data(data_size, training_data_size)
if(data.initializeDataSet("spiral") == False):
    print("Data set not defined")
    sys.exit(1)

structure = [data.input_data_dimension,8,4,4,2,data.output_data_dimension]
network = NeuralNetwork(structure)

learning_controller = LearningController(network)

plt.ion()
fig, (tag,cost) = plt.subplots(1,2, figsize=(10, 5))


costFunction_training = []
costFunction_testing = []

test_points_x = np.linspace(-1,1,101)
test_points_y = np.linspace(1,-1,101)

tagged_data_A = np.zeros((len(data.all_data), data.input_data_dimension))
tagged_data_B = np.zeros((len(data.all_data), data.input_data_dimension))

for k in range(data_size):
        if data.tagged_data[k] > 0:
            tagged_data_A[k] = data.all_data[k]
        else:
            tagged_data_B[k] = data.all_data[k]

tagged_data_A = tagged_data_A[~np.all(tagged_data_A == 0, axis=1)]
tagged_data_B = tagged_data_B[~np.all(tagged_data_B == 0, axis=1)]

epoch = 0
epsilon = 0.01
costFunction = 1.0
costFunction_training_value = 1.0

while epoch < learning_controller.max_epochs and costFunction_training_value >= epsilon:
    print("Epoch "+str(epoch)+":")

    batch_indexes = random.sample(range(data.training_data_size), learning_controller.batch_size)#np.random.randint(0, training_data_size, batch_size)
    batch_indexes = random.sample(data.training_indices, learning_controller.batch_size)
    batch = data.all_data[batch_indexes,:]
    
    tags = data.tagged_data[batch_indexes]

    learning_controller.trainOnBatch(batch, tags, 10)

    network.updateCostFunction(data.all_data[data.training_indices], data.tagged_data[data.training_indices])

    if epoch == 0:
        cost_function_training_baseline = network.cost_function

    costFunction_training.append((epoch, network.cost_function/cost_function_training_baseline))

    training_value =  network.cost_function

    network.updateCostFunction(np.delete(data.all_data, data.training_indices, axis=0), np.delete(data.tagged_data, data.training_indices))

    if epoch == 0:
        cost_function_testing_baseline = network.cost_function - cost_function_training_baseline

    cost_function_baseline = cost_function_training_baseline + cost_function_testing_baseline

    costFunction_testing.append((epoch, (network.cost_function - training_value)/cost_function_testing_baseline))

    print("Cost function: "+str(network.cost_function/cost_function_baseline))
    print("Training cost function: "+str(training_value/cost_function_training_baseline))

    costFunction = network.cost_function/cost_function_baseline
    costFunction_training_value = training_value/cost_function_training_baseline

    cost.plot(*zip(*costFunction_training), 'k-', label='Training data')
    cost.plot(*zip(*costFunction_testing), 'b-', label='Testing data')

    if epoch % 20 == 0:
        colormap = network.computeHeatMap(test_points_x, test_points_y)

        #heatmap = proc.imshow(colormap, cmap='RdYlGn', extent=([-1, 1, -1, 1]), interpolation='bilinear', vmin=-1, vmax=1)
        tag.imshow(colormap, cmap='RdYlGn', extent=([-1, 1, -1, 1]), interpolation='bilinear', vmin=-1, vmax=1)
        tag.plot(tagged_data_A[:,0], tagged_data_A[:,1], 'go', label="Class A")
        tag.plot(tagged_data_B[:,0], tagged_data_B[:,1], 'ro', label="Class B")
        tag.plot(data.training_data[:,0], data.training_data[:,1], 'k.', label="Training data")
        
        if(epoch == 0):
            tag.legend(loc='lower center', bbox_to_anchor=(0.5, 1.0), ncols=3)
            #fig.colorbar(heatmap, orientation="horizontal")
            cost.legend(loc='lower center', bbox_to_anchor=(0.5, 1.0), ncols=2)

        fig.canvas.draw()
        fig.canvas.flush_events()
    
    epoch += 1
        
colormap = network.computeHeatMap(test_points_x, test_points_y)

#heatmap = proc.imshow(colormap, cmap='RdYlGn', extent=([-1, 1, -1, 1]), interpolation='bilinear', vmin=-1, vmax=1)
tag.imshow(colormap, cmap='RdYlGn', extent=([-1, 1, -1, 1]), interpolation='bilinear', vmin=-1, vmax=1)
tag.plot(tagged_data_A[:,0], tagged_data_A[:,1], 'go', label="Class A")
tag.plot(tagged_data_B[:,0], tagged_data_B[:,1], 'ro', label="Class B")
tag.plot(data.training_data[:,0], data.training_data[:,1], 'k.', label="Training data")

numberOfNeurons = np.sum(structure)
maxNeuronsInLayer = np.max(structure)

networkMap = network.computeWholeNeuralNetworkHeatMap(test_points_x, test_points_y, len(structure), maxNeuronsInLayer)

#fig2, subfigs = plt.subplots(maxNeuronsInLayer, len(structure), figsize=(4*maxNeuronsInLayer, 4*maxNeuronsInLayer))

fig2 = plt.figure(figsize=(4*len(structure), 4*maxNeuronsInLayer))
for i in range(len(structure)):
    for j in range(structure[i]):
        subfig = fig2.add_subplot(maxNeuronsInLayer,  len(structure), i+len(structure)*j+1)
        subfig.imshow(networkMap[i+len(structure)*j], cmap='RdYlGn', extent=([-1, 1, -1, 1]), vmin=-1, vmax=1)

fig2.canvas.draw()
fig2.canvas.flush_events()

plt.show(block='false')

network.printWeights()

network.printBiases()

NeuralNetwork.py

0 → 100644
+151 −0
Original line number Diff line number Diff line
from Layer import Layer
from Data import additional_input
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import time

class NeuralNetwork:  
    errors = []
    gradient = []
    learning_rate = 0.01
    l2_regularization = 0.0
    cost_function = 0.0

    def __init__(self, structure):
        #Number of layers
        self.depth = len(structure)
        #Layer structure vector - number of neurons in the layers
        self.structure = structure
        self.layer = []

        self.setNeuralNetwork()
    
    def setLearningParameters(self, learning_rate, l2_reg):
        self.learning_rate = learning_rate
        self.l2_regularization = l2_reg
    
    def setNeuralNetwork(self):
        self.layer.append(Layer(self.structure[0],1))
        self.errors.append(np.zeros(self.structure[0]))
        for i in range(self.depth-1):
            self.layer.append(Layer(self.structure[i+1], self.structure[i]))
            self.errors.append(np.zeros(self.structure[i+1]))
    
    def activation(self,input_data):
        if len(input_data) != self.layer[0].dimension:
            return 0
        else:
            self.layer[0].neurons = input_data
            for i in range(1,self.depth-1):
                self.layer[i].activateLayer(self.layer[i-1].neurons)
            self.layer[self.depth-1].activateOutputLayer(self.layer[self.depth-2].neurons)
        return 1

    def calculateErrors(self, output):
        self.errors[self.depth-1] = np.multiply((self.layer[self.depth-1].neurons - output), self.derivativeOfActivationOutput(self.layer[self.depth-1].neurons_notActivated))   
        for i in range(self.depth-2,0,-1):
            self.errors[i] = np.multiply(np.matmul(np.transpose(self.layer[i+1].weights),self.errors[i+1]), self.derivativeOfActivationFunction(self.layer[i].neurons_notActivated))
        
    def computeCostFunction(self, output):
        self.cost_function += 0.5*np.dot(self.layer[self.depth-1].neurons - output,self.layer[self.depth-1].neurons - output)

    def updateGradient(self, batch_size):
        for n in range(self.depth-1, 0, -1):
            self.layer[n].gradient += (1.0/batch_size)*np.outer(self.errors[n], self.layer[n-1].neurons)
            self.layer[n].gradient_bias += (1.0/batch_size)*self.errors[n]
    
    def resetBeforeBatchLearning(self):
        self.cost_function = 0.0
        for n in range(self.depth):
            self.layer[n].gradient = np.zeros((self.layer[n].dimension, self.layer[n].prev_layer_dimension))
            self.layer[n].gradient_bias = np.zeros(self.layer[n].dimension)

    def derivativeOfActivationFunction(self,num):
        #return 1
        #return 1-pow(np.tanh(num),2)
        return (num > 0) * 1
        #return (1.0/(1+np.exp(-num)))*(1.0 - 1.0/(1+np.exp(-num)))
    
    def derivativeOfActivationOutput(self,num):
        #return 1
        return 1-pow(np.tanh(num),2)
        #return (num > 0) * 1
        #return (1.0/(1+np.exp(-num)))*(1.0 - 1.0/(1+np.exp(-num)))

    def updateWeights(self):
        for n in range(self.depth-1, 0, -1):
            self.layer[n].weights -= self.learning_rate*(self.layer[n].gradient + self.l2_regularization*self.layer[n].weights)
            self.layer[n].bias -= self.learning_rate*self.layer[n].gradient_bias

    def getResult(self, input):
        self.activation(input)
        return self.layer[self.depth-1].neurons

    def updateCostFunction(self, input, output):
        for i in range(len(input)):
            self.activation(input[i])
            self.computeCostFunction(output[i])

    def computeHeatMap(self, map_coord_x, map_coord_y):
        x = len(map_coord_x)
        y = len(map_coord_y)
        heatmap = np.zeros((x,y))
        for i in range(x):
            for j in range(y):
                point = [map_coord_x[j], map_coord_y[i]]
                for k in range(len(self.layer[0].neurons)-2):
                    add = additional_input(k,map_coord_x[j], map_coord_y[i])
                    point.append(add)

                heatmap[i][j] = self.getResult(point)
        return heatmap

    def getResultFromNeuron(self, layer_index, neuron_index):
        return self.layer[layer_index].neurons[neuron_index]
    
    def computeWholeNeuralNetworkHeatMapForOnePoint(self, point):
        resultsInAllNeurons = []
        self.activation(point)

        for i in range(self.depth):
            resultsInAllNeurons.append(self.layer[i].neurons)

        return resultsInAllNeurons

    def computeWholeNeuralNetworkHeatMap(self, map_coord_x, map_coord_y, grid_x, grid_y):
        x = len(map_coord_x)
        y = len(map_coord_y)
        heatmap = np.zeros((grid_x*grid_y, x,y))

        for i in range(x):
            for j in range(y):
                point = [map_coord_x[j], map_coord_y[i]]
                for k in range(len(self.layer[0].neurons)-2):
                    add = additional_input(k,map_coord_x[j], map_coord_y[i])
                    point.append(add)
                values = self.computeWholeNeuralNetworkHeatMapForOnePoint(point)
                for k in range(self.depth):
                    for l in range(len(self.layer[k].neurons)):
                        heatmap[k+self.depth*l][i][j] = values[k][l]
        return heatmap

    def printNetwork(self):
        for i in range(self.depth):
            print(self.layer[i].neurons)
            print("\n")

    def printWeights(self):
        for i in range(self.depth):
            print(self.layer[i].weights)
            print("\n")

    def printBiases(self):
        for i in range(self.depth):
            print(self.layer[i].bias)
            print("\n")
    
    def printResult(self):
        print(self.layer[self.depth-1].neurons)