blog

誤差逆伝播法(2) Pythonクラスの定義

Published:

By nob

Category: Posts

Tags: 機械学習 ニューラルネットワーク ディープラーニングがわかる数学入門 誤差逆伝播法 Python

Pythonクラスの定義

モデルをPythonのクラスにしてみる

import logging

import numpy as np
from fastprogress.fastprogress import master_bar, progress_bar


class Sigmoid:
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-1 * np.clip(x, -709, 709)))

    def activate(self, z):
        return self.sigmoid(z)

    def deactivate(self, a):
        return a * (1 - a)


class MeanSquaredError:

    def error(self, x, y):
        logging.debug("mean squared error error input x=%s y=%s", x, y)
        e = np.sum((y - x) ** 2) / 2
        logging.debug("mean squared error error output %s", e)
        return e

    def gradient(self, x, y):
        logging.debug("mean squared error gradient input x=%s y=%s", x, y)
        g = x - y
        logging.debug("mean squared error gradient output %s", g)
        return g


class InputLayer:

    def __init__(self, inputs):
        self.inputs = inputs
        self.weight = np.ones((inputs, inputs))
        self.bias = np.zeros(inputs)
        self.learning_rate = 0

    def forward(self, x):
        return x

    def backward(self, delta):
        return delta

    def update_params(self):
        pass


class ActivationLayer:

    def __init__(self, activation):
        self.logger = logging.getLogger(self.__class__.__name__)
        self.activation = activation

    def forward(self, x):

        self.logger.debug("activation forward input %s", x.shape)
        self.logger.debug(x)

        self.a = self.activation.activate(x)

        self.logger.debug("activation forward output %s", self.a.shape)
        self.logger.debug(self.a)

        return self.a

    def backward(self, delta):

        self.logger.debug("activation backward input %s", delta.shape)
        self.logger.debug(delta)

        d = delta * self.activation.deactivate(self.a)

        self.logger.debug("activation backward output %s", d.shape)
        self.logger.debug(d)

        return d

    def update_params(self):
        pass


class FullyConnectedLayer:

    def __init__(self, inputs, neurons, rng, learning_rate=0.1):
        self.logger = logging.getLogger(self.__class__.__name__)
        self.inputs = inputs
        self.neurons = neurons
        self.weight = rng.random(size=(neurons, inputs)) - 0.5
        self.bias = rng.random(neurons) - 0.5

        self.logger.debug("fullyconnectedlayer weights %s", self.weight.shape)
        self.logger.debug(self.weight)
        self.logger.debug("fullyconnectedlayer bias %s", self.bias.shape)
        self.logger.debug(self.bias)

        self.weight_delta = np.zeros_like(self.weight)
        self.bias_delta = np.zeros_like(self.bias)
        self.learning_rate = learning_rate

    def sigma(self, w, x, b):
        return np.dot(w, x) + b

    def forward(self, x):

        self.logger.debug("fullyconnectedlayer forward input %s", x.shape)
        self.logger.debug(x)

        self.x = x
        z = self.sigma(self.weight, self.x, self.bias)

        self.logger.debug("fullyconnectedlayer forward output %s", z.shape)
        self.logger.debug(z)

        return z

    def backward(self, delta):

        self.logger.debug(
            "%s backward input %s", self.__class__.__name__, delta.shape
        )
        self.logger.debug(delta)

        # checked
        dw = delta[:, np.newaxis] * self.x

        self.logger.debug("weight delta %s", dw.shape)
        self.logger.debug(dw)

        self.weight_delta += dw

        self.logger.debug("bias delta %s", delta.shape)
        self.logger.debug(delta)

        self.bias_delta += delta

        d = np.dot(delta, self.weight)

        self.logger.debug("fullyconnectedlayer backward output %s", d.shape)
        self.logger.debug(d)

        return d

    def update_params(self):
        self.logger.debug(
            "fullyconnectedlayer weight delta %s", self.weight_delta.shape
        )
        self.logger.debug(self.weight_delta)

        self.weight -= self.learning_rate * self.weight_delta
        self.logger.debug(
            "fullyconnectedlayer weight updated %s", self.weight.shape
        )
        self.logger.debug(self.weight)

        self.logger.debug(
            "fullyconnectedlayer bias delta %s", self.bias_delta.shape
        )
        self.logger.debug(self.bias_delta)

        self.bias -= self.learning_rate * self.bias_delta

        self.logger.debug(
            "fullyconnectedlayer bias updated %s", self.bias.shape
        )
        self.logger.debug(self.bias)

        self.weight_delta.fill(0)
        self.bias_delta.fill(0)


class MultiLayerPerceptron:
    def __init__(
        self,
        layers,
        cost,
        epochs=100,
        num_batch=1,
        learning_rate=0.1,
    ):
        self.logger = logging.getLogger(self.__class__.__name__)
        self.layers = layers
        self.cost = cost
        self.epochs = epochs
        self.num_batch = num_batch
        self.learning_rate = learning_rate

    def predict_proba(self, X):
        proba = []
        for x in master_bar(X):
            a = x
            for layer in self.layers:
                a = layer.forward(a)
            proba.append(a)
        return np.array(proba)

    def predict(self, X):
        proba = self.predict_proba(X)
        pred = np.zeros_like(proba)
        if proba[0].ndim == 1:
            pred = np.where(proba > 0.5, 1, 0)
        else:
            pred[np.arange(proba.shape[0]), np.argmax(proba, axis=1)] = 1
        return pred

    def fit(self, X, y):

        for layer in self.layers:
            layer.learning_rate = self.learning_rate

        history = []

        n = 0
        batch_size = X.shape[0] // self.num_batch

        mb = master_bar(range(1, self.epochs + 1))
        for epoch in mb:

            self.logger.debug("%s %s", str(epoch), "-" * 60)

            c = 0

            for x, t in progress_bar(
                zip(X, y), parent=mb, total=X.shape[0], leave=False
            ):

                a = x

                for layer in self.layers:
                    a = layer.forward(a)

                c = c + self.cost.error(a, t)

                delta = self.cost.gradient(a, t)

                for layer in reversed(self.layers):
                    delta = layer.backward(delta)

                n += 1
                if n % batch_size == 0:
                    for layer in self.layers:
                        layer.update_params()
                    history.append(c)

                    self.logger.debug(
                        "epoch %s batch %s cost %s", epoch, n // batch_size, c
                    )

                    c = 0

            for i, layer in enumerate(self.layers):
                self.logger.debug(
                    "epoch %s layer %s weight %s",
                    epoch,
                    i,
                    (
                        layer.weight.shape
                        if hasattr(layer, "weight")
                        else "**no weight**"
                    ),
                )
                self.logger.debug(
                    layer.weight
                    if hasattr(layer, "weight")
                    else "**no weight**"
                )
                self.logger.debug(
                    "epoch %s layer %s bias %s",
                    epoch,
                    i,
                    (
                        layer.bias.shape
                        if hasattr(layer, "bias")
                        else "**no bias"
                    ),
                )
                self.logger.debug(
                    layer.bias if hasattr(layer, "bias") else "**no bias**"
                )

        self.history = np.array(history)

XORを識別してみる

import numpy as np

np.set_printoptions(precision=3)

data = np.array(
    [
        [0, 0, 0],
        [1, 0, 1],
        [0, 1, 1],
        [1, 1, 0],
    ]
)

x = data[:, :-1]
y = data[:, -1:]

rng = np.random.default_rng(123)

mlp = MultiLayerPerceptron(
    [
        InputLayer(2),
        FullyConnectedLayer(2, 2, rng),
        ActivationLayer(Sigmoid()),
        FullyConnectedLayer(2, 1, rng),
        ActivationLayer(Sigmoid()),
    ],
    MeanSquaredError(),
    epochs=3000,
    learning_rate=0.5,
)
mlp.fit(x, y)
print(mlp.predict_proba(x))
print(mlp.predict(x))
[[0.043]
 [0.962]
 [0.951]
 [0.037]]
[[0]
 [1]
 [1]
 [0]]
import matplotlib.pyplot as plt

fig = plt.figure(figsize=(10, 5))
ax = fig.add_subplot()
ax.plot(mlp.history, marker="o", markersize=2)
ax.grid()
plt.minorticks_on()
plt.show()

fig-1