Using Autoencoders to Identify Out-of-Domain Examples and Reject

My focus has shifted to identifying out-of-domain samples to reject with my autoencoder. Towards this goal I have implemented a variational version of my autoencoder for the MNIST digits task that achieves similar (within ~100 MSE on most runs, and particularly on the model being evaluated below) error on the task. I use both types of autoencoders to evaluate testing samples and reject those samples that would have reconstruction error about the 95th percentile of the training data. In this way I attempt to give the model an option to discard samples it believes it cannot classify

Below are shown my results. All models evaluated were trained on a "pure" training sample, that is if any of the training samples were to appear at testing time they would not be rejected, as they do not exceed the reconstruction error threshold. This is performed so that the distributions between training and testing are as similar as possible (as the stationary assumption is common in the theoretical justification for most machine learning models).

From the results it can be seen that the variational autoencoder is much more tolerant to the noise in the testing sample in terms of the reconstruction error. This turns out to be quite problematic as it also produces latent space representations that are quite brittle for the classifiers. The noise has an enormous negative effect on the classification accuracy compared to the latent space representation achieved by the deterministic autoencoder. Conversely the deterministic autoencoder is much less tolerant to the noise in the testing sample, and this can be interpreted as making it much more adept at identifying the distribution shift.

Both rejection strategies help the neural network achieve a higher performance on the testing set. However, not all models experience this benefit equally with the variational strategy. In both cases the variational rejection strategy actually make the performance of the models worse. The deterministic rejection strategy, however, provides significant improvement across all classifications strategies.

In future, I plan to test training on the noisy examples, training to denoise, and whether the AE can recognize different forms of distribution shifts or reject adversarial inputs when trained on clean data. This code is available on my gitub, and most of it is documented at the time of posting.

running:

"python main.py --nn --tree --forest --noise --load --newnn"

and "python main.py --nn --tree --forest --noise --load --newnn --var"

will produce the outputs for this experiment.


print('no code this time :)')
"""
# code can be found at https://github.com/JayRothenberger/AETree
# Results: ( all numbers given are categorical accuracy = (TP + TN) / (TP + FP + TN + FN) )

            Decision Tree (depth 16, gini):
            (Deterministic)
baseline     - train, test: 0.9405166666666667, 0.84
noisy        - train, test: 0.56495, 0.5548
noisy int    - train, test: 0.5778666666666666, 0.5656
noisy reject - noise, int : 0.6563769293257514, 0.6608929007074896
rejected samples: 6307, 5901
            (Variational)
baseline     - train, test: 0.9307333333333333, 0.8506
noisy        - train, test: 0.4554666666666667, 0.4496
noisy int    - train, test: 0.4670166666666667, 0.4603
noisy reject - noise, int : 0.4266710461487929, 0.44140077821011675
rejected samples: 3911, 3575

            Random Forest (250 trees, max depth 9):
            (Deterministic)
baseline     - train, test: 0.9358333333333333, 0.9169
noisy        - train, test: 0.77185, 0.7722
noisy int    - train, test: 0.7824333333333333, 0.7836
noisy reject - noise, int : 0.8711075006769564, 0.8728958282507929
rejected samples: 6307, 5901
            (Variational)
baseline     - train, test: 0.9259666666666667, 0.9131
noisy        - train, test: 0.5807666666666667, 0.5727
noisy int    - train, test: 0.5900333333333333, 0.5821
noisy reject - noise, int : 0.5332566923961242, 0.549727626459144
rejected samples: 3911, 3575

            Neural Network ( Dense(10, softmax)(Dense(49, selu)) ):
            (Deterministic)
baseline     - train, test: 0.9345666766166687, 0.9358000159263611
noisy        - train, test: 0.7833666801452637, 0.7821000218391418
noisy int    - train, test: 0.7911499738693237, 0.7914999723434448
noisy reject - noise, int : 0.8329271674156189, 0.834837794303894
rejected samples: 6307, 5901
            (Variational)
baseline     - train, test: 0.9498833417892456, 0.9491999745368958
noisy        - train, test: 0.8301666378974915, 0.8342999815940857
noisy int    - train, test: 0.8414499759674072, 0.8468999862670898
noisy reject - noise, int : 0.8856955170631409, 0.8915175199508667
rejected samples: 3911, 3575

"""
        
No fun pictures this time :(
Autoencoder Reconstruction Error as a Reject Criterion

Using my MNIST digits Autoencoder I wanted to see if I could detect examples that would be difficult to classify in the latent space by their reconstruction error. I supposed that, intuitively, if it were difficult for the decoder to decode the latent representation, then the features must define some atypical example that would be difficult for a classifier to classify, as it was not representative of the original training distribution. I thought that the reconstruction error of an AE learned on the training set might be able to identify such examples at test time, thus by rejecting such examples we could achieve better generalization performance. This way I hoped to make typically brittle classifiers (decision trees, neural networks, random forests) more robust to some input perturbation (in this case gaussian noise).

In the first image you can see the effect of adding random noise to the reconstruction error. Adding noise to the image usually (but not always) increases the reconstruction error by some amount. In the first figure noise was added with mean 0 and standard deviation 10 to the pixel values of the image input to the encoder. Values were then clipped to remain in the valid [0, 255] range. In the second figure pixel values were not only clipped, but cast to integers to remain in the valid integer domain Z/(256)Z. The threshold in both figures was chosen as the 95th percentile reconstruction error over the training set. The threshold represents the level which a testing sample has to reach in order to be rejected (not classified).

From the text results below you can see that the technique never decreases accuracy, and in all cases results in some nominal increase in accuracy.

The second images rows should be interpreted as follows:

Rows 1-4 : [5 best samples rejected, " (with coresp. noise), 5 worst samples rejected, " (with coresp. noise)]
Rows 5-8 : [5 best samples kept, " (with coresp. noise), 5 worst samples kept, " (with coresp. noise)]
integer casting is not shown as it is visually imperceptible.

best and worst refer to the magnitude of reconstruction error.

From this figure and our accuracy results we can draw a few interesting conclusions:

First, it seems that reconstruction error is largely dominated by the difficulty of classifying the original image. The rejected samples are visually difficult to identify as their respective digits compared to the better samples kept. This is particularly noticeable when considering that the very best kept samples are all single-line 1s.

Second, interestingly the most brittle classifiers are those with the fewest parameters. This is slightly counterintuitive when considering the typical wisdom about overfitting and regularization, but it seems that in this case the noise has the greatest effect on the tree-based classifiers. This could also be because the neural network is continuous, and thus not effected as much by the smaller perturbations that are present in gaussian noise.

In the future I think I will try different types of noise. Perhaps salt and pepper noise or uniform noise will yield different brittleness results for the different classifiers. It also may be useful to perform the thresholding on the training set as well, as it seems like some of the worst reconstructed digits are rather nasty examples. It would also be nice to be able to try to find minimum adversarial perturbations for each of these models. This is feasible for the neural network, but is at least exponentially hard for the decision tree models.

code is available on my github if you would like to try for yourself, although at the time of writing I have not yet documented it. running "python main.py --tree --nn --forest --noise" with the correct directory structure should perform the experiment and show the figures.


print('no code this time :)')
"""
# code can be found at https://github.com/JayRothenberger/AETree
# Results: ( all numbers given are categorical accuracy = (TP + TN) / (TP + FP + TN + FN) )

            Decision Tree (depth 16, gini):
baseline     - train, test: 0.9699333333333333, 0.8633
noisy        - train, test: 0.65415, 0.6477
noisy int    - train, test: 0.6680333333333334, 0.6559
noisy reject - noise, int : 0.6525575044638168, 0.6604247306203578
rejected samples: 479, 441

            Random Forest (25 trees, max depth 5):
baseline     - train, test: 0.7857333333333333, 0.7869
noisy        - train, test: 0.6893666666666667, 0.6983
noisy int    - train, test: 0.6961, 0.7057
noisy reject - noise, int : 0.7081188950740468, 0.7150329532377864
rejected samples: 479, 441

            Neural Network ( Dense(10, softmax)(Dense(49, selu)) ):
baseline     - train, test: 0.95333331823349, 0.9519000053405762
noisy        - train, test: 0.9205499887466431, 0.9215999841690063
noisy int    - train, test: 0.9244999885559082, 0.9243000149726868
noisy reject - noise, int : 0.9278436899185181, 0.9302228093147278
rejected samples: 479, 441
"""
        
Output:
MNIST Digits Autoencoder Feature Map Visualizations

Using the following code on my MNIST digits autoencoder I achieved the average activations displayed below as images. The three rows correspond to the three different depthwise-separable convolutional layers that perform the scaling in the autoencoder. Each row has 49 columns, one for each feature map in the corresponding layer.


layer_names = [layer.name for layer in encoder.layers if len(layer.output.shape) == 4][2:]
layer_outputs = [layer.output for layer in encoder.layers if len(layer.output.shape) == 4][2:]

feature_map_model = tf.keras.models.Model(inputs=[encoder.input], outputs=layer_outputs)
feature_map_model.compile()

feature_maps = [np.average(feature_map, axis=0) for feature_map in feature_map_model.predict(x_test)]
print([feature_map.shape for feature_map in feature_maps])

ncols = 49
nrows = len(layer_names)
fig = plt.figure(figsize=(ncols, nrows))
row = 0
for layer_name, feature_map in zip(layer_names, feature_maps):
    k = feature_map.shape[-1]
    size = feature_map.shape[1]
    for i in range(k):
        # iterating over a feature map of a particular layer to separate all feature images.
        feature_image = feature_map[:, :, i]
        feature_image -= feature_image.mean()
        feature_image /= feature_image.std()
        feature_image *= 64
        feature_image += 128
        feature_image = np.clip(feature_image, 0, 255).astype('uint8')
        ax = fig.add_subplot(nrows, ncols, row * ncols + i + 1)
        ax.axes.xaxis.set_ticks([])
        ax.axes.yaxis.set_ticks([])
        plt.imshow(array_to_img(np.expand_dims(feature_image, axis=-1)))

    row += 1
plt.show()
        
Output:
New Experiment: MNIST Digits

Put my autoencoding skills to the test on an easier task (mnist digits). More to come on this project as I have some experiments I want to try when performing classification from the encoded representation. Below I show the results for the model I have built, as well as the clean code I wrote (and then proceeded not to document). Top is the representation that has been passed through the autoencoder, bottom is the ground truth.


import tensorflow as tf
from tensorflow.keras import layers
from sklearn import tree
from time import time
from tensorflow.keras.preprocessing.image import array_to_img
import matplotlib.pyplot as plt
import numpy as np
import argparse
import os
import re

# mnist digits
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(path="mnist.npz")

batch_size = 32

input_shape = (28, 28, 1)
print(input_shape)

def build_AE(input_shape, activation='selu', encoder_filter_size=(4, 4), init=tf.keras.initializers.LecunNormal(),
             learning_rate=0.001):

    enc_inputs = layers.Input(input_shape, name='enc_in')
    # 28x28
    x = layers.Conv2D(1, encoder_filter_size, padding='same', activation=activation, kernel_initializer=init)(enc_inputs)
    x = layers.SeparableConv2D(49, encoder_filter_size, (2, 2), padding='same',
                               kernel_initializer=init, activation=activation)(x)
    # 14x14
    x = layers.SeparableConv2D(49, encoder_filter_size, (2, 2), padding='same',
                               kernel_initializer=init, activation=activation)(x)
    # 7x7
    x = layers.Conv2D(49, encoder_filter_size, (2, 2), padding='same',
                               kernel_initializer=init, activation=activation)(x)
    x = layers.GlobalAveragePooling2D()(x)
    # 32 output features
    enc_outputs = x

    dec_inputs = layers.Input((49,), name='dec_in')

    x = layers.Reshape((7, 7, 1))(dec_inputs)
    # 7x7
    x = layers.SeparableConv2D(49, encoder_filter_size, padding='same',
                               kernel_initializer=init, activation=activation)(x)
    x = layers.UpSampling2D(2)(x)
    # 14x14
    x = layers.SeparableConv2D(49, encoder_filter_size, padding='same',
                               kernel_initializer=init, activation=activation)(x)
    x = layers.UpSampling2D(2)(x)
    # 28x28
    x = layers.SeparableConv2D(49, encoder_filter_size, padding='same',
                               kernel_initializer=init, activation=activation)(x)

    x = layers.Conv2D(1, (3, 3), padding='same', activation=activation)(x)

    dec_outputs = x

    encoder = tf.keras.Model(inputs=[enc_inputs], outputs=[enc_outputs], name='mnist_enc')

    decoder = tf.keras.Model(inputs=[dec_inputs], outputs=[dec_outputs], name='mnist_dec')

    model = tf.keras.Model(inputs=[enc_inputs], outputs=decoder(encoder(enc_inputs)), name='mnist_ae')

    opt = tf.keras.optimizers.Adam(learning_rate=learning_rate)

    model.compile(loss='mse', optimizer=opt)

    print(model.summary())

    return model, encoder, decoder


def load_recent_model(dirname, filebase):
    print(os.listdir(dirname))
    files = [f for f in os.listdir(dirname) if re.match(r'%s' % filebase, f)]
    files = sorted(files, key=lambda x: float(x.split('_')[-1]))

    return tf.keras.models.load_model(files[-1])  # return file with latest timestamp


def create_parser():
    parser = argparse.ArgumentParser(description='MNIST experiment')
    parser.add_argument('--load', action='store_true', help='load a model rather than creating a new one')

    return parser


if __name__ == '__main__':
    parser = create_parser()
    args = parser.parse_args()

    model, encoder, decoder = None, None, None

    if args.load:
        model, encoder, decoder = load_recent_model('.', 'autoenc'), load_recent_model('.', 'enc'), load_recent_model('.', 'dec')
    else:
        model, encoder, decoder = build_AE(input_shape)
        model.fit(x_train, x_train, batch_size=batch_size, epochs=25)

        tf.keras.models.save_model(model, f'autoenc_{time()}')
        tf.keras.models.save_model(encoder, f'enc_{time()}')
        tf.keras.models.save_model(decoder, f'dec_{time()}')

    inds = [0, 1, 2, 3, 4, 5, 6]
    ncols = len(inds)
    nrows = 2

    fig = plt.figure(figsize=(ncols, nrows), dpi=300)

    for i in inds:
        ax = fig.add_subplot(nrows, ncols, i + 1)
        ax.axes.xaxis.set_ticks([])
        ax.axes.yaxis.set_ticks([])
        encoded = encoder.predict(np.array([x_test[inds[i]], ]))
        decoded = decoder.predict(encoded)
        plt.imshow(array_to_img(decoded[0]))
        ax = fig.add_subplot(nrows, ncols, i + 1 + ncols)
        ax.axes.xaxis.set_ticks([])
        ax.axes.yaxis.set_ticks([])
        plt.imshow(array_to_img(np.expand_dims(x_test[inds[i]], axis=-1)))

    plt.show()
        
Output:
Updating my Convolutional Autoencoder (Less params, larger filters)

Finally I was able to recreate my result for similar performance to the original convolutional AE with a model using many fewer parameters. This model uses ~800,000 compared to the original ~2.5 million. You can see that the output even achieves the color that was hard for previous models to capture. In terms of MSE this model achieves ~1500, which is competitive (but slightly more) than that achieved by the model in my first post. For this model I have only included the function I used to build the model and the corresponding call. The issue was the Conv2D layer I was using to merge the depthwise separable layers. I have also removed the maxpool layers in favor of larger strides, increased the filter sizes, and removed skip connections in the flat blocks.


import os

import tensorflow as tf
from tensorflow import keras
#import tensorflow_addons as tfa
from tensorflow.keras import layers
from time import time

from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.preprocessing.image import array_to_img

from os import listdir
from os.path import isfile, join

import matplotlib.pyplot as plt
import numpy as np

from math import log2 as log

batch_size = 4

df = np.load('pkmn_dset_numpy.npy')

input_shape = (256, 256, 3)


def build_separable_conv_model(input_shape, patch_blocks=1, flat_blocks=1,
                               AE=False,
                               activation='relu',
                               learning_rate=0.001,
                               flat_filters=256,
                               scaling_filters=32,
                               patch_size=(8, 8),
                               decoder_filter_size=(4, 4),
                               encoder_filter_size=(8, 8)):
    init = tf.keras.initializers.LecunNormal()

    enc_inputs = layers.Input(input_shape, name='inputs')  # does not include batch size
    skip = layers.Conv2D(32, (8, 8), (2, 2), 'same', activation=activation, kernel_initializer=init)(enc_inputs)
    x = skip

    for block in range(patch_blocks):  # each downscales by 4
        x = layers.Conv2D(scaling_filters, encoder_filter_size, (4, 4), padding='same')(x)  # patchify convolution
        x = layers.SeparableConv2D(scaling_filters, encoder_filter_size, padding='same', kernel_initializer=init)(x)  # patches should interact
        x = layers.Activation(activation)(x)  # activate
        skip = layers.Conv2D(scaling_filters, encoder_filter_size, (4, 4), padding='same')(skip)
        x = layers.Add()([x, skip])

    for block in range(flat_blocks):
        x = layers.Activation(activation)(x)
        x = layers.SeparableConv2D(flat_filters, encoder_filter_size, padding='same', kernel_initializer=init)(x)
        x = layers.Activation(activation)(x)
        x = layers.SeparableConv2D(flat_filters, encoder_filter_size, padding='same', kernel_initializer=init)(x)
        #x = layers.Conv2D(flat_filters, (4, 4),  kernel_initializer=init)(x)  # bottleneck
        # skip = layers.Conv2D(flat_filters, (1, 1), padding='same')(skip)
        # x = layers.Add()([x, skip])

    enc_outputs = layers.GlobalAveragePooling2D()(x)

    if AE:
        encoder = tf.keras.Model(inputs=[enc_inputs], outputs=[enc_outputs],
                              name='xceptional_smaller_encoder')

        input_shape_dec = (flat_filters,)
        output_shape_dec = input_shape

        dec_inputs = layers.Input(input_shape_dec, name='dec_inputs')
        dim = int(input_shape_dec[0] ** (1 / 2))
        x = layers.Reshape((dim, dim, 1))(dec_inputs)

        scaling_blocks_dec = int(log(output_shape_dec[0]) - log(input_shape_dec[0] ** (1 / 2)))

        skip = x

        for block in range(scaling_blocks_dec):
            x = layers.UpSampling2D(2)(x)
            # 2**(block + 4) = [16, 32, 64, 128]
            x = layers.SeparableConv2D(2 ** ((scaling_blocks_dec - block) + 4), decoder_filter_size, strides=(1, 1), padding='same',
                                       kernel_initializer=init)(x)
            x = layers.Activation(activation)(x)
            x = layers.SeparableConv2D(2 ** ((scaling_blocks_dec - block) + 4), decoder_filter_size, strides=(1, 1), padding='same',
                                       kernel_initializer=init)(x)
            x = layers.Activation(activation)(x)
            x = layers.SeparableConv2D(2 ** ((scaling_blocks_dec - block) + 4), decoder_filter_size, strides=(1, 1), padding='same',
                                       kernel_initializer=init)(x)
            #x = layers.Conv2D(2 ** ((scaling_blocks_dec - block) + 4), (4, 4), kernel_initializer=init, padding='same')(x)  # bottleneck
            skip = layers.Conv2D(2 ** ((scaling_blocks_dec - block) + 4), (1, 1), padding='same')(
                layers.UpSampling2D(2)(skip))
            x = layers.Add()([x, skip])

        x = layers.Conv2D(3, (3, 3), padding='same', activation=activation)(x)
        dec_outputs = layers.Conv2D(3, (1, 1))(x)

        decoder = tf.keras.Model(inputs=dec_inputs, outputs=dec_outputs, name='xceptional_smaller_decoder')

        model = tf.keras.Model(inputs=[enc_inputs], outputs=decoder(encoder(enc_inputs)),
                            name=f'xceptional_smaller_{patch_blocks}_AE_{AE}')

        opt = tf.keras.optimizers.Adam(learning_rate=learning_rate)

        model.compile(loss='mse', optimizer=opt)

        print(model.summary())

        return model, encoder, decoder

    opt = tf.keras.optimizers.Nadam(learning_rate=learning_rate, beta_1=0.9,
                                    beta_2=0.999, epsilon=None, decay=0.0)

    model = tf.keras.Model(inputs=enc_inputs, outputs=enc_outputs,
                        name=f'xceptional_smaller_{patch_blocks}_AE_{AE}')

    model.compile(loss='mse', optimizer=opt)

    print(model.summary())

    return model

model, encoder, decoder = build_separable_conv_model(input_shape, patch_blocks=2, flat_blocks=2, AE=True)
        
Output:
Convolutional Autoencoder using Xception-like Depthwise Separable Convolutions and a Patchify Layer

Below I have included the code for my fourth attempt at constructing an autoencoder capable of reconstructing pokemon from a learned latent space. In this iteration in the spirit of the newly released ConvNext architecture I make use of a patchify layer on the input. Effectively this separates the input image into patches instead of the traditional maxpooling downsampling approach. I am not confident I have implemented this in any useful way, and as of posting this the architechture below has not proven suitable for this task. I still make use of the familiar skip connections and depthwise separable convolutions, but this model is much smaller. I had hoped that the previous approaches simply were too complicated or deep, and that the blurryness that is evident in the earlier outputs was a result of the typical loss of definition that comes with depth in CNNs, however it appears that was not so. Loss on this model has not been encouraging on preliminary tests, and I believe I may have to resign myself to the fact that 1400 images simply is not enough to learn this task. This may be my last attempt at an architecture for this task, as I think it would be more gratifying to practice these techniques on an easier task such as classification.


import os

import tensorflow as tf
from tensorflow import keras
#import tensorflow_addons as tfa
from tensorflow.keras import layers
from time import time

from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.preprocessing.image import array_to_img

from os import listdir
from os.path import isfile, join

import matplotlib.pyplot as plt
import numpy as np

from math import log2 as log

# os.environ['TF_GPU_ALLOCATOR'] = 'cuda_malloc_async'

mypath = './pokemon_img_jpg/'
"""
df = []
for f in listdir(mypath):
  if '-' not in f:
    if isfile(join(mypath, f)):
      try:
        arr_img = img_to_array(load_img(join(mypath, f)))
        df += [arr_img, np.flipud(arr_img)]
        #df += [arr_img, np.flipud(arr_img), np.fliplr(arr_img), np.fliplr(np.flipud(arr_img)), np.rot90(arr_img), np.rot90(arr_img, k=3)]
        #ones = np.ones_like(arr_img)
        #zeroes = np.zeros_like(arr_img)
        #twofiftyfives = ones*255
        #df += [np.minimum(twofiftyfives, arr_img + i*ones) for i in range(4, 9, 4)]
        #df += [np.maximum(zeroes, arr_img - i*ones) for i in range(4, 9, 4)]

      except FileNotFoundError as e:
        print(e)
print(len(df))
df = np.asarray(df)
np.random.shuffle(df)

df = df.astype(np.uint8)

df = tf.data.Dataset.from_tensor_slices((df, df))
tf.data.experimental.save(df, 'pkmn_dset_tf_uint8')
"""
batch_size = 4

df = np.load('pkmn_dset_numpy.npy')

input_shape = (256, 256, 3)


def build_separable_conv_model(input_shape, patch_blocks=1, flat_blocks=1,
                               AE=False,
                               activation='relu',
                               learning_rate=0.001,
                               flat_filters=256,
                               scaling_filters=32,
                               patch_size=(4, 4),
                               decoder_filter_size=(3, 3),
                               encoder_filter_size=(4, 4)):
    init = tf.keras.initializers.LecunNormal()

    enc_inputs = layers.Input(input_shape, name='inputs')  # does not include batch size
    skip = enc_inputs

    x = skip
    for block in range(patch_blocks):  # each downscales by 4
        x = layers.Conv2D(scaling_filters, (4, 4), (4, 4), padding='same')(x)  # patchify convolution
        x = layers.SeparableConv2D(scaling_filters, encoder_filter_size, padding='same', kernel_initializer=init)(x)  # patches should interact
        x = layers.Activation(activation)(x)  # activate
        skip = layers.Conv2D(scaling_filters, (4, 4), (4, 4), padding='same')(skip)
        x = layers.Add()([x, skip])

    for block in range(flat_blocks):
        x = layers.Activation(activation)(x)
        x = layers.SeparableConv2D(flat_filters, encoder_filter_size, padding='same', kernel_initializer=init)(x)
        x = layers.Activation(activation)(x)
        x = layers.SeparableConv2D(flat_filters, encoder_filter_size, padding='same', kernel_initializer=init)(x)
        x = layers.Conv2D(flat_filters, (1, 1),  kernel_initializer=init)(x)  # bottleneck
        skip = layers.Conv2D(flat_filters, (1, 1), padding='same')(skip)
        x = layers.Add()([x, skip])

    enc_outputs = layers.GlobalAveragePooling2D()(x)

    if AE:
        encoder = tf.keras.Model(inputs=[enc_inputs], outputs=[enc_outputs],
                              name='xceptional_smaller_encoder')

        input_shape_dec = (flat_filters,)
        output_shape_dec = input_shape

        dec_inputs = layers.Input(input_shape_dec, name='dec_inputs')
        dim = int(input_shape_dec[0] ** (1 / 2))
        x = layers.Reshape((dim, dim, 1))(dec_inputs)

        scaling_blocks_dec = int(log(output_shape_dec[0]) - log(input_shape_dec[0] ** (1 / 2)))

        skip = x

        for block in range(scaling_blocks_dec):
            x = layers.UpSampling2D(2)(x)
            # 2**(block + 4) = [16, 32, 64, 128]
            x = layers.SeparableConv2D(128, decoder_filter_size, strides=(1, 1), padding='same',
                                       kernel_initializer=init)(x)
            x = layers.Activation(activation)(x)
            x = layers.SeparableConv2D(128, decoder_filter_size, strides=(1, 1), padding='same',
                                       kernel_initializer=init)(x)
            x = layers.Activation(activation)(x)
            x = layers.SeparableConv2D(128, decoder_filter_size, strides=(1, 1), padding='same',
                                       kernel_initializer=init)(x)
            x = layers.Conv2D(128, (1, 1), kernel_initializer=init)(x)  # bottleneck
            skip = layers.Conv2D(128, (1, 1), padding='same')(
                layers.UpSampling2D(2)(skip))
            x = layers.Add()([x, skip])

        x = layers.Conv2D(3, (3, 3), padding='same', activation=activation)(x)
        dec_outputs = layers.Conv2D(3, (1, 1))(x)

        decoder = tf.keras.Model(inputs=dec_inputs, outputs=dec_outputs, name='xceptional_smaller_decoder')

        model = tf.keras.Model(inputs=[enc_inputs], outputs=decoder(encoder(enc_inputs)),
                            name=f'xceptional_smaller_{patch_blocks}_AE_{AE}')

        opt = tf.keras.optimizers.Adam(learning_rate=learning_rate)

        model.compile(loss='mse', optimizer=opt)

        print(model.summary())

        return model, encoder, decoder

    opt = tf.keras.optimizers.Nadam(learning_rate=learning_rate, beta_1=0.9,
                                    beta_2=0.999, epsilon=None, decay=0.0)

    model = tf.keras.Model(inputs=enc_inputs, outputs=enc_outputs,
                        name=f'xceptional_smaller_{patch_blocks}_AE_{AE}')

    model.compile(loss='mse', optimizer=opt)

    print(model.summary())

    return model


model, encoder, decoder = build_separable_conv_model(input_shape, patch_blocks=2, flat_blocks=3, AE=True)

cont = input('proceed with experiment?')

if cont == 'stop':
    raise ValueError()

model.fit(df, df, batch_size=batch_size, epochs=50)

tf.keras.models.save_model(model, f'autoenc_{time()}')
tf.keras.models.save_model(encoder, f'enc_{time()}')
tf.keras.models.save_model(decoder, f'dec_{time()}')

inds = [0, 1, 2, 3, 4, 5, 6]
ncols = len(inds)
nrows = 2

fig = plt.figure(figsize=(ncols, nrows), dpi=300)

for i in inds:
    ax = fig.add_subplot(nrows, ncols, i + 1)
    ax.axes.xaxis.set_ticks([])
    ax.axes.yaxis.set_ticks([])
    prediction = model.predict(np.array([df[inds[i]], ]))
    plt.imshow(array_to_img(prediction[0]))
    ax = fig.add_subplot(nrows, ncols, i + 1 + ncols)
    ax.axes.xaxis.set_ticks([])
    ax.axes.yaxis.set_ticks([])
    plt.imshow(array_to_img(df[inds[i]]))

plt.show()

        
No Output For This Model
Variational Fully Convolutional Autoencoder using Xception-like Depthwise Separable Convolutions

Below I have included the code for my third attempt at a convolutional autoencoder for learning a representation of images of pokemon. The model below never achieved good enough performance for me to get an output that looks even as visually recognizable as the previous two, which is really a shame because I used some pretty slick techniques to construct it. I have included a basic sampling layer (this is the code suggested by keras or Aurelien Geron) to make this a variational autoencoder, as well as some updates to the functions I use to construct my encoder and decoder. This code makes changing hyperparameters a more sensible task. My inputs to the variational layer are also in the style of Fully Convolutional Neural Networks making use of Global Average Pooling layers. I continue to use the residual connections present in the previous models.


import os

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from time import time

from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.preprocessing.image import array_to_img

from os import listdir
from os.path import isfile, join

import matplotlib.pyplot as plt
import numpy as np

from math import log2 as log

# os.environ['TF_GPU_ALLOCATOR'] = 'cuda_malloc_async'

mypath = './pokemon_img_jpg/'
"""
df = []
for f in listdir(mypath):
  if '-' not in f:
    if isfile(join(mypath, f)):
      try:
        arr_img = img_to_array(load_img(join(mypath, f)))
        df += [arr_img, np.flipud(arr_img)]
        #df += [arr_img, np.flipud(arr_img), np.fliplr(arr_img), np.fliplr(np.flipud(arr_img)), np.rot90(arr_img), np.rot90(arr_img, k=3)]
        #ones = np.ones_like(arr_img)
        #zeroes = np.zeros_like(arr_img)
        #twofiftyfives = ones*255
        #df += [np.minimum(twofiftyfives, arr_img + i*ones) for i in range(4, 9, 4)]
        #df += [np.maximum(zeroes, arr_img - i*ones) for i in range(4, 9, 4)]

      except FileNotFoundError as e:
        print(e)
print(len(df))
df = np.asarray(df)
np.random.shuffle(df)

df = df.astype(np.uint8)

df = tf.data.Dataset.from_tensor_slices((df, df))
tf.data.experimental.save(df, 'pkmn_dset_tf_uint8')
"""
batch_size = 4

df = np.load('pkmn_dset_numpy.npy')

input_shape = (256, 256, 3)


class Sampling(layers.Layer):
    def call(self, inputs):
        mean, log_var = inputs
        return tf.keras.backend.random_normal(tf.shape(log_var)) * tf.keras.backend.exp(log_var / 2) * mean


def build_separable_conv_model(input_shape, scaling_blocks=1, flat_blocks=1,
                               AE=False,
                               activation='selu',
                               learning_rate=0.001,
                               flat_filters=256,
                               scaling_filters=128,
                               decoder_filter_size=(3, 3),
                               encoder_filter_size=(3, 3)):
    init = tf.keras.initializers.LecunNormal()

    enc_inputs = layers.Input(input_shape, name='inputs')  # does not include batch size
    x = layers.Conv2D(32, (3, 3), (2, 2), 'same', activation=activation, kernel_initializer=init)(enc_inputs)
    skip = layers.Conv2D(68, (3, 3), (1, 1), 'same', activation=activation, kernel_initializer=init)(x)

    x = skip
    for block in range(scaling_blocks):
        x = layers.SeparableConv2D(scaling_filters, encoder_filter_size, strides=(1, 1), padding='same', kernel_initializer=init)(x)
        x = layers.Activation(activation)(x)
        x = layers.SeparableConv2D(scaling_filters, encoder_filter_size, strides=(1, 1), padding='same', kernel_initializer=init)(x)
        x = layers.MaxPooling2D(encoder_filter_size, (2, 2), padding='same')(x)
        skip = layers.Conv2D(scaling_filters, (1, 1), (2, 2), padding='same')(skip)
        x = layers.Add()([x, skip])

    for block in range(flat_blocks):
        x = layers.Activation(activation)(x)
        x = layers.SeparableConv2D(flat_filters, encoder_filter_size, padding='same', kernel_initializer=init)(x)
        x = layers.Activation(activation)(x)
        x = layers.SeparableConv2D(flat_filters, encoder_filter_size, padding='same', kernel_initializer=init)(x)
        x = layers.Activation(activation)(x)
        x = layers.SeparableConv2D(flat_filters, encoder_filter_size, padding='same', kernel_initializer=init)(x)
        x = layers.Conv2D(flat_filters, (1, 1),  kernel_initializer=init)(x)  # bottleneck
        skip = layers.Conv2D(flat_filters, (1, 1), padding='same')(skip)
        x = layers.Add()([x, skip])

    mu = layers.SeparableConv2D(flat_filters, (3, 3), padding='same', kernel_initializer=init)(x)
    gamma = layers.SeparableConv2D(flat_filters, (3, 3), padding='same', kernel_initializer=init)(x)

    mu = layers.GlobalAveragePooling2D()(mu)
    gamma = layers.GlobalAveragePooling2D()(gamma)

    enc_outputs = Sampling()([mu, gamma])

    if AE:
        encoder = tf.keras.Model(inputs=[enc_inputs], outputs=[enc_outputs],
                              name='xceptional_encoder')

        input_shape_dec = (flat_filters,)
        output_shape_dec = input_shape

        dec_inputs = layers.Input(input_shape_dec, name='dec_inputs')
        dim = int(input_shape_dec[0] ** (1/2))
        x = layers.Reshape((dim, dim, 1))(dec_inputs)

        scaling_blocks_dec = int(log(output_shape_dec[0]) - log(input_shape_dec[0] ** (1 / 2)))

        skip = x

        for block in range(scaling_blocks_dec):
            x = layers.UpSampling2D(2)(x)
            # 2**(block + 4) = [16, 32, 64, 128]
            x = layers.SeparableConv2D(128, decoder_filter_size, strides=(1, 1), padding='same',
                                       kernel_initializer=init)(x)
            x = layers.Activation(activation)(x)
            x = layers.SeparableConv2D(128, decoder_filter_size, strides=(1, 1), padding='same',
                                       kernel_initializer=init)(x)
            x = layers.Activation(activation)(x)
            x = layers.SeparableConv2D(128, decoder_filter_size, strides=(1, 1), padding='same',
                                       kernel_initializer=init)(x)
            x = layers.Conv2D(128, (1, 1), kernel_initializer=init)(x)  # bottleneck
            skip = layers.Conv2D(128, (1, 1), padding='same')(
                layers.UpSampling2D(2)(skip))
            x = layers.Add()([x, skip])

        x = layers.Conv2D(3, (3, 3), padding='same', activation=activation)(x)
        dec_outputs = layers.Conv2D(3, (1, 1))(x)

        decoder = tf.keras.Model(inputs=dec_inputs, outputs=dec_outputs, name='decoder')

        model = tf.keras.Model(inputs=[enc_inputs], outputs=decoder(encoder(enc_inputs)),
                            name=f'xceptional_{scaling_blocks}_AE_{AE}')

        opt = tf.keras.optimizers.Nadam(learning_rate=learning_rate, beta_1=0.9,
                                        beta_2=0.999, epsilon=None, decay=0.0)

        model.compile(loss='mse', optimizer=opt)

        print(model.summary())

        return model, encoder, decoder

    opt = tf.keras.optimizers.Nadam(learning_rate=learning_rate, beta_1=0.9,
                                    beta_2=0.999, epsilon=None, decay=0.0)

    model = tf.keras.Model(inputs=enc_inputs, outputs=enc_outputs,
                        name=f'xceptional_{scaling_blocks}_AE_{AE}')

    model.compile(loss='mse', optimizer=opt)

    print(model.summary())

    return model


model, encoder, decoder = build_separable_conv_model(input_shape, scaling_blocks=4, flat_blocks=2, AE=True)

cont = input('proceed with experiment?')

if cont == 'stop':
    raise ValueError()

model.fit(df, df, batch_size=batch_size, epochs=150)

tf.keras.models.save_model(model, f'autoenc_{time()}')
tf.keras.models.save_model(encoder, f'enc_{time()}')
tf.keras.models.save_model(decoder, f'dec_{time()}')

inds = [0, 1, 2, 3, 4, 5, 6]
ncols = len(inds)
nrows = 2

fig = plt.figure(figsize=(ncols, nrows), dpi=300)

for i in inds:
    ax = fig.add_subplot(nrows, ncols, i + 1)
    ax.axes.xaxis.set_ticks([])
    ax.axes.yaxis.set_ticks([])
    prediction = model.predict(np.array([df[inds[i]], ]))
    plt.imshow(array_to_img(prediction[0]))
    ax = fig.add_subplot(nrows, ncols, i + 1 + ncols)
    ax.axes.xaxis.set_ticks([])
    ax.axes.yaxis.set_ticks([])
    plt.imshow(array_to_img(df[inds[i]]))

plt.show()

        
No Output For This Model
Convolutional Autoencoder using Xception-like Depthwise Separable Convolutions

Below I have included the code for my second attempt at a convolutional autoencoder that I built to try to learn a latent space representation for pokemon, and then of course reconstruct them from that latent representation. This model is a bit cooler than the last. Motivated by the Xception network I utilize depthwise separable convolutions to operate individually on different channels of this image both in the encoder and the decoder. Utilizing this strategy we can make the network deeper while reducing the number of parameters at each level. Reducing the parameters has the dual benefit of also reducing the space required to store the intermediate outputs that are needed to compute the gradient during the backward pass, so this version is also more RAM-friendly taking only about 10GB (according to windows task manager) for the code shown below to executed for some training epochs. Dispite this added benefit, I was running the code on my local machine so I only had access to about the amount of RAM I was using, and consequently I did have to reduce the size of the dataset. I chose to construct the dataset as shown with only the original images and their flipped versions. This amounts to about 500mbs of uint8 256 by 256 pixel 3 channel RGB images. The configuration shown below is my best effort at reconstructing the optimal hyperparameters I found, and at the time of uploading I have lost the image for the best result I had obtained, and potentially some of the best hyperparameters. With about half of the parameters as the simple convolutional approach the xceptional model achieved very comparable performance. The screenshot I have included below is worse, as you can see the model has failed to capture the colors of the original image. This is very common in my experience with these models on these tasks and occurs at lower epoch levels, so perhaps I just need to train for more than 50 epochs. As you will notice, the model makes use of "skip" or "residual" connections between blocks and layers I call "bottlenecks" which force the depthwise separable convolutional layers to merge after each block. I have also implemented a variational version of this architecture using only a slight modification that will be included in the next post. The variational version gave me much more difficulty in training never achieving reasonable performance (at this point I assume I simply do not have enough data for this task).


import os

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from time import time

from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.preprocessing.image import array_to_img

from os import listdir
from os.path import isfile, join

import matplotlib.pyplot as plt
import numpy as np

from math import log2 as log

# os.environ['TF_GPU_ALLOCATOR'] = 'cuda_malloc_async'

mypath = './pokemon_img_jpg/'
"""
df = []
for f in listdir(mypath):
  if '-' not in f:
    if isfile(join(mypath, f)):
      try:
        arr_img = img_to_array(load_img(join(mypath, f)))
        df += [arr_img, np.flipud(arr_img)]
        #df += [arr_img, np.flipud(arr_img), np.fliplr(arr_img), np.fliplr(np.flipud(arr_img)), np.rot90(arr_img), np.rot90(arr_img, k=3)]
        #ones = np.ones_like(arr_img)
        #zeroes = np.zeros_like(arr_img)
        #twofiftyfives = ones*255
        #df += [np.minimum(twofiftyfives, arr_img + i*ones) for i in range(4, 9, 4)]
        #df += [np.maximum(zeroes, arr_img - i*ones) for i in range(4, 9, 4)]

      except FileNotFoundError as e:
        print(e)
print(len(df))
df = np.asarray(df)
np.random.shuffle(df)

df = df.astype(np.uint8)

np.save('pkmn_dset_numpy', df)
"""
batch_size = 8

df = np.load('pkmn_dset_numpy.npy')

input_shape = (256, 256, 3)


def build_decoder_model(input_shape, output_shape, activation):
    init = tf.keras.initializers.LecunNormal()

    dec_inputs = layers.Input(input_shape, name='dec_inputs')
    x = layers.Reshape((16, 16, 1))(dec_inputs)

    scaling_blocks = int(log(output_shape[0]) - log(input_shape[0] ** (1 / 2)))

    skip = x

    for block in range(scaling_blocks):
        x = layers.UpSampling2D(2)(x)
        # 2**(block + 4) = [16, 32, 64, 128]
        x = layers.SeparableConv2D(2 ** (block + 4), (3, 3), strides=(1, 1), padding='same', kernel_initializer=init)(x)
        x = layers.Activation(activation)(x)
        x = layers.SeparableConv2D(2 ** (block + 4), (3, 3), strides=(1, 1), padding='same', kernel_initializer=init)(x)
        x = layers.Activation(activation)(x)
        x = layers.SeparableConv2D(2 ** (block + 4), (3, 3), strides=(1, 1), padding='same', kernel_initializer=init)(x)
        x = layers.Conv2D(2 ** (block + 4), (1, 1),  kernel_initializer=init)(x)  # bottleneck
        skip = layers.Conv2D(2 ** (block + 4), (1, 1), padding='same')(layers.UpSampling2D(2)(skip))
        x = layers.Add()([x, skip])

    x = layers.Conv2D(3, (3, 3), padding='same', activation=activation)(x)
    dec_outputs = layers.Conv2D(3, (1, 1))(x)

    decoder = tf.keras.Model(inputs=dec_inputs, outputs=dec_outputs, name='decoder')
    print(decoder.summary())
    return decoder


def build_separable_conv_model(input_shape, scaling_blocks=1, flat_blocks=1,
                               AE=False,
                               activation='selu',
                               learning_rate=0.001):
    init = tf.keras.initializers.LecunNormal()

    enc_inputs = layers.Input(input_shape, name='inputs')  # does not include batch size
    x = layers.Conv2D(32, (3, 3), (2, 2), 'same', activation=activation, kernel_initializer=init)(enc_inputs)
    skip = layers.Conv2D(68, (3, 3), (1, 1), 'same', activation=activation, kernel_initializer=init)(x)

    x = skip
    for block in range(scaling_blocks):
        x = layers.SeparableConv2D(128, (3, 3), strides=(1, 1), padding='same', kernel_initializer=init)(x)
        x = layers.Activation(activation)(x)
        x = layers.SeparableConv2D(128, (3, 3), strides=(1, 1), padding='same', kernel_initializer=init)(x)
        x = layers.MaxPooling2D((3, 3), (2, 2), padding='same')(x)
        skip = layers.Conv2D(128, (1, 1), (2, 2), padding='same')(skip)
        x = layers.Add()([x, skip])

    for block in range(flat_blocks):
        x = layers.Activation(activation)(x)
        x = layers.SeparableConv2D(256, (3, 3), padding='same', kernel_initializer=init)(x)
        x = layers.Activation(activation)(x)
        x = layers.SeparableConv2D(256, (3, 3), padding='same', kernel_initializer=init)(x)
        x = layers.Activation(activation)(x)
        x = layers.SeparableConv2D(256, (3, 3), padding='same', kernel_initializer=init)(x)
        x = layers.Conv2D(256, (1, 1),  kernel_initializer=init)(x)  # bottleneck
        skip = layers.Conv2D(256, (1, 1), padding='same')(skip)
        x = layers.Add()([x, skip])

    enc_outputs = layers.GlobalAveragePooling2D()(x)

    if AE:
        opt = tf.keras.optimizers.Nadam(learning_rate=learning_rate, beta_1=0.9,
                                        beta_2=0.999, epsilon=None, decay=0.0)

        encoder = tf.keras.Model(inputs=enc_inputs, outputs=enc_outputs,
                              name='xceptional_encoder')

        decoder = build_decoder_model((256,), input_shape, activation)

        model = tf.keras.Model(inputs=encoder.input, outputs=decoder(encoder.output),
                            name=f'xceptional_{scaling_blocks}_AE_{AE}')

        model.compile(loss='mse', optimizer=opt)

        print(model.summary())

        return model, encoder, decoder

    opt = tf.keras.optimizers.Nadam(learning_rate=learning_rate, beta_1=0.9,
                                    beta_2=0.999, epsilon=None)

    model = tf.keras.Model(inputs=enc_inputs, outputs=enc_outputs,
                        name=f'xceptional_{scaling_blocks}_AE_{AE}')

    model.compile(loss='mse', optimizer=opt)

    print(model.summary())

    return model


model, encoder, decoder = build_separable_conv_model(input_shape, scaling_blocks=4, flat_blocks=4, AE=True)

cont = input('proceed with experiment?')

if cont == 'stop':
    raise ValueError()

model.fit(df, df, batch_size=batch_size, epochs=65)

tf.keras.models.save_model(model, f'autoenc_{time()}')
tf.keras.models.save_model(encoder, f'enc_{time()}')
tf.keras.models.save_model(decoder, f'dec_{time()}')

inds = [0, 1, 2, 3, 4, 5, 6]
ncols = len(inds)
nrows = 2

fig = plt.figure(figsize=(ncols, nrows), dpi=300)

for i in inds:
    ax = fig.add_subplot(nrows, ncols, i + 1)
    ax.axes.xaxis.set_ticks([])
    ax.axes.yaxis.set_ticks([])
    prediction = model.predict(np.array([df[inds[i]], ]))
    plt.imshow(array_to_img(prediction[0]))
    ax = fig.add_subplot(nrows, ncols, i + 1 + ncols)
    ax.axes.xaxis.set_ticks([])
    ax.axes.yaxis.set_ticks([])
    plt.imshow(array_to_img(df[inds[i]]))

plt.show()

        
Output:
First Attempt at a Convolutional Autoencoder

Below I have included the code for a simple convolutional autoencoder that I built to try to learn a hidden representation for pokemon, and then of course reconstruct them from that hidden representation. Below the code is a figure displaying 6 randomly chosen pokemon from my datasets and above them their corresponding reconstructions. In this first attempt my memory management and my architecture were both pretty sloppy. I was using a numpy array of float32s to store my images (which are RGB 3-channel 0-255). In later examples I have changed this to uint8s, although the majority of RAM usage is from the intermediate values held by tensorflow to reconstruct the gradient in the backward pass. In this example I build a series of stacked convolutional layers and a dense section. I am aware that this could be fully convolutional, and in later iterations you will see that I switch to that approach. I perform dataset augmentation by rotating the images and flipping them I also add constant values to all pixels on all levels and subtract constant values (ensuring to keep the pixel channels in valid range), although using a dataset of this size quickly becomes infeasible due to the aforementioned RAM usage. I tried several different activations, kernel sizes, latent space sizes, and scaling techniques, but this configuration produced the best results (visually and in terms of MSE loss). As you can see the reconstruction is visually identifiable, but lacks definition. It is definitely unsuitable for the task of generating new pokemon, which is what I had hoped to be able to accomplish.



import keras
from time import time
from keras import layers
from keras import backend as K
import tensorflow as tf

from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from keras.preprocessing.image import array_to_img

from os import listdir
from os.path import isfile, join

import matplotlib.pyplot as plt
import numpy as np

mypath = '/pokemon_img_jpg'

df = []
for f in listdir(mypath):
  if '-' not in f:
    if isfile(join(mypath, f)):
      try:
        arr_img = img_to_array(load_img(join(mypath, f)))
        df += [arr_img, np.flipud(arr_img), np.fliplr(arr_img), np.fliplr(np.flipud(arr_img)), np.rot90(arr_img), np.rot90(arr_img, k=3)]
        ones = np.ones_like(arr_img)
        zeroes = np.zeros_like(arr_img)
        twofiftyfives = ones*255
        df += [np.minimum(twofiftyfives, arr_img + i*ones) for i in range(4, 9, 4)]
        df += [np.maximum(zeroes, arr_img - i*ones) for i in range(4, 9, 4)]

      except FileNotFoundError as e:
        print(e)
print(len(df))
df = np.asarray(df)
np.random.shuffle(df)

x_train = df

x_train.shape[1:]

activation = 'selu'
init = keras.initializers.LecunNormal()

encoder_inputs = keras.Input(shape=x_train.shape[1:])

conv_1_1 = layers.Conv2D(128, 16, strides=4, padding='same', activation=activation, kernel_initializer=init, bias_initializer=init)(encoder_inputs)
pool_1 = layers.MaxPooling2D(2, strides=1)(conv_1_1)
norm_1 = layers.BatchNormalization()(pool_1)

conv_2_1 = layers.Conv2D(64, 8, strides=2, padding='same', activation=activation, kernel_initializer=init, bias_initializer=init)(norm_1)
pool_2 = layers.MaxPooling2D(2, strides=1)(conv_2_1)
norm_2 = layers.BatchNormalization()(pool_2)

conv_3_1 = layers.Conv2D(64, 4, strides=2, padding='same', activation=activation, kernel_initializer=init, bias_initializer=init)(norm_2)
pool_3 = layers.MaxPooling2D(2, strides=1)(conv_3_1)
norm_3 = layers.BatchNormalization()(pool_3)

conv_4_1 = layers.Conv2D(32, 4, strides=2, padding='same', activation=activation, kernel_initializer=init, bias_initializer=init)(norm_3)
pool_4 = layers.MaxPooling2D(2, strides=2)(conv_4_1)
norm_4 = layers.BatchNormalization()(pool_4)

conv_5_1 = layers.Conv2D(16, 2, strides=1, padding='same', activation=activation, kernel_initializer=init, bias_initializer=init)(norm_4)
pool_5 = layers.MaxPooling2D(2, strides=2)(conv_5_1)
norm_5 = layers.BatchNormalization()(pool_5)

flatten = keras.layers.Flatten()(norm_5)

encoder_outputs = keras.layers.Dense(512, activation=activation, kernel_initializer=init, bias_initializer=init)(flatten)

encoder = keras.Model(inputs = encoder_inputs, outputs=encoder_outputs, name='encoder')

decoder_inputs = keras.Input(shape=encoder.output_shape[1:])

dec_dense_1 = keras.layers.Dense(1024, activation=activation, kernel_initializer=init, bias_initializer=init)(decoder_inputs)
dec_reshape_1 = layers.Reshape((32,32,1))(dec_dense_1)
dec_up_1 = layers.UpSampling2D(2)(dec_reshape_1)

dec_conv_1_1 = layers.Conv2D(64, 4, strides=1, padding='same', activation=activation, kernel_initializer=init, bias_initializer=init)(dec_up_1)
dec_up_2 = layers.UpSampling2D(2)(dec_conv_1_1)

dec_conv_2_1 = layers.Conv2D(64, 8, strides=2, padding='same', activation=activation, kernel_initializer=init, bias_initializer=init)(dec_up_2)
dec_up_3 = layers.UpSampling2D(2)(dec_conv_2_1)

dec_conv_3_1 = layers.Conv2D(32, 8, strides=2, padding='same', activation=activation, kernel_initializer=init, bias_initializer=init)(dec_up_3)
#dec_conv_2_2 = layers.Conv2D(32, 4, strides=1, padding='same', activation=activation, kernel_initializer=init, bias_initializer=init)(dec_conv_2_1)
dec_up_4 = layers.UpSampling2D(2)(dec_conv_3_1)

dec_conv_4_1 = layers.Conv2D(32, 4, strides=1, padding='same', activation=activation, kernel_initializer=init, bias_initializer=init)(dec_up_4)
dec_up_5 = layers.UpSampling2D(2)(dec_conv_4_1)

decoder_outputs = layers.Conv2D(3, 2, strides=1, padding='same', activation=activation, kernel_initializer=init, bias_initializer=init)(dec_up_5)
#decoder_outputs = layers.Conv2D(3, 2, strides=1, padding='same', activation=activation, kernel_initializer=init, bias_initializer=init)(dec_conv_4_2)

decoder = keras.Model(inputs = decoder_inputs, outputs = decoder_outputs, name='decoder')

decoder.summary()

conv_autoencoder = keras.Model(inputs=encoder.input, outputs=decoder(encoder.outputs))
conv_autoencoder.summary()

conv_autoencoder.compile(optimizer='adam', loss=keras.losses.mean_squared_error)

model_save_path = join(mypath, 'autoencoder-1-17-10-46')

if isfile(model_save_path):
  conv_autoencoder = keras.models.load_model(model_save_path)
  K.set_value(conv_autoencoder.optimizer.learning_rate, 7e-4)

conv_autoencoder.fit(x_train, x_train, batch_size=16, epochs=25, validation_split=.1, verbose='auto')

conv_autoencoder.save(model_save_path)
print(model_save_path)

inds = [0, 1, 2, 3, 4, 5, 6]
ncols = len(inds)
nrows = 2
fig = plt.figure(figsize=(ncols,nrows), dpi=300)

for i in inds:
  ax = fig.add_subplot(nrows, ncols, i + 1)
  ax.axes.xaxis.set_ticks([])
  ax.axes.yaxis.set_ticks([])
  prediction = encoder.predict( np.array( [x_train[inds[i]],] )  )
  decoded = decoder.predict(prediction)
  plt.imshow(array_to_img(decoded[0]))
  ax = fig.add_subplot(nrows, ncols, i + 1 + ncols)
  ax.axes.xaxis.set_ticks([])
  ax.axes.yaxis.set_ticks([])
  plt.imshow(array_to_img(x_train[inds[i]]))
        
Output: