Character-level text generator with Pytorch using Long Short-Term Memory Units

In this notebook we will be implementing a simple RNN character model with PyTorch to familiarize ourselves with the PyTorch library and get started with RNNs. The goal is to build a model that can complete your sentence based on a few characters or a word used as input.

The model will be fed with a word and will predict what the next character in the sentence will be. This process will repeat itself until we generate a sentence of our desired length.

Importing the libraries

import torch
from torch import nn
from torch.autograd import Variable

import os
import random as rnd
import numpy as np
import pickle
import time
import string

from tqdm import tqdm

Loading the dataset

First, we'll define the sentences that we want our model to output when fed with the first word or the first few characters. Our dataset is a text file containing Shakespeare's plays or books that we will extract sequence of chars to use as input to our model. Then our model will learn how to complete sentences like "Shakespeare would do".

This dataset can be downloaded from Karpathy's Github account: https://github.com/karpathy/char-rnn/blob/master/data/tinyshakespeare/input.txt.

As in many of my notebooks, we set some variables to the data directory and filenames. If you want to run this code on your own enviroment you must change these values:

# Set the root folder
root_folder='.'
# Set the folder with the dataset
data_folder_name='data'
model_folder_name='model'
# Set the filename
filename='input.txt'

# Path to the data folder
DATA_PATH = os.path.abspath(os.path.join(root_folder, data_folder_name))
model_dir = os.path.abspath(os.path.join(root_folder, model_folder_name))

# Set the path where the text for training is stored
train_path = os.path.join(DATA_PATH, filename)

# Set a seed
seed = 1
def load_text_data(filename, init_dialog=False):
    ''' Load the texts from the filename, splitting by lines and removing empty strings.
        Setting init_dialog = True will remove lines where the character who is going to speak is indicated
    '''
    sentences = []
    with open(filename, 'r') as reader:
        #sentences = reader.readlines()
        for line in reader:
            #if ':' not in line and line !='\n':
            if init_dialog or ':' not in line:
                # Append the line to the sentences, removing the end of line character
                sentences.append(line[:-1])
                
    return sentences

Loading the input data, sentences from Shakespeare's plays.

sentences = load_text_data(train_path)
print('Number of sentences: ', len(sentences))
print(sentences[:20])
Number of sentences:  29723
['Before we proceed any further, hear me speak.', '', 'Speak, speak.', '', 'You are all resolved rather to die than to famish?', '', 'Resolved. resolved.', '', 'First, you know Caius Marcius is chief enemy to the people.', '', "We know't, we know't.", '', "Let us kill him, and we'll have corn at our own price.", "Is't a verdict?", '', '', 'One word, good citizens.', '', 'We are accounted poor citizens, the patricians good.', 'would yield us but the superfluity, while it were']

Cleaning the input data

When working with text data, we usually need to perform some cleanings to prepare the data for our algorithm.This time we will start with a simple cleaning, convert to lowercase the text and remove non alphanumeric chracters (a parameter configuration).

def clean_text(sentences, alpha=False):
    ''' Cleaning process of the text'''
    if alpha:
        # Remove non alphabetic character
        cleaned_text = [''.join([t.lower() for t in text if t.isalpha() or t.isspace()]) for text in sentences]
    else:
        # Simply lower the characters
        cleaned_text = [t.lower() for t in sentences]
    # Remove any emoty string
    cleaned_text = [t for t in cleaned_text if t!='']
    
    return cleaned_text
# Clean the sentences
sentences = clean_text(sentences, False)
# Join all the sentences in a one long string
sentences = ' '.join(sentences)
print('Number of characters: ', len(sentences))
print(sentences[:100])
Number of characters:  894876
before we proceed any further, hear me speak. speak, speak. you are all resolved rather to die than 

Creating the dictionary

Now we'll create a dictionary out of all the characters that we have in the sentences and map them to an integer. This will allow us to convert our input characters to their respective integers (char2int) and viceversa (int2char).

class CharVocab: 
    ''' Create a Vocabulary for '''
    def __init__(self, type_vocab,pad_token='<PAD>', eos_token='<EOS>', unk_token='<UNK>'): #Initialization of the type of vocabulary
        self.type = type_vocab
        #self.int2char ={}
        self.int2char = []
        if pad_token !=None:
            self.int2char += [pad_token]
        if eos_token !=None:
            self.int2char += [eos_token]
        if unk_token !=None:
            self.int2char += [unk_token]
        self.char2int = {}
        
    def __call__(self, text):       #When called, adds the values of parameters x_1 and x_2, prints and returns the result 
        # Join all the sentences together and extract the unique characters from the combined sentences
        chars = set(''.join(text))

        # Creating a dictionary that maps integers to the characters
        self.int2char += list(chars)

        # Creating another dictionary that maps characters to integers
        self.char2int = {char: ind for ind, char in enumerate(self.int2char)}
        
vocab = CharVocab('char',None,None,'<UNK>')
vocab(sentences)
print('Length of vocabulary: ', len(vocab.int2char))
print('Int to Char: ', vocab.int2char)
print('Char to Int: ', vocab.char2int)
Length of vocabulary:  38
Int to Char:  ['<UNK>', 'l', 'g', 'b', 'z', '-', 'k', 'e', 'o', 'r', 'i', ',', 'w', "'", ';', 'f', 'x', '?', 'd', '$', '3', 'a', 's', 'v', 'm', '&', 't', 'h', 'u', 'p', 'n', 'y', ' ', 'c', 'q', '!', 'j', '.']
Char to Int:  {'<UNK>': 0, 'l': 1, 'g': 2, 'b': 3, 'z': 4, '-': 5, 'k': 6, 'e': 7, 'o': 8, 'r': 9, 'i': 10, ',': 11, 'w': 12, "'": 13, ';': 14, 'f': 15, 'x': 16, '?': 17, 'd': 18, '$': 19, '3': 20, 'a': 21, 's': 22, 'v': 23, 'm': 24, '&': 25, 't': 26, 'h': 27, 'u': 28, 'p': 29, 'n': 30, 'y': 31, ' ': 32, 'c': 33, 'q': 34, '!': 35, 'j': 36, '.': 37}

Save the dictionary

In this example it is nor mandatory to save the dictionary inmediately, because it is a quick task. But when dealing with a huge corpus and a large dictionary, we should save the dictionary to restore it latter when new experiments will be executed.

# Check or create the directory where dictionary will be saved
if not os.path.exists(DATA_PATH): # Make sure that the folder exists
    os.makedirs(DATA_PATH)
    
# Save the dictionary to data path dir  
with open(os.path.join(DATA_PATH, 'char_dict.pkl'), "wb") as f:
    pickle.dump(vocab.char2int, f)

with open(os.path.join(DATA_PATH, 'int_dict.pkl'), "wb") as f:
    pickle.dump(vocab.int2char, f)

Create the input data and labels for training

As we're going to predict the next character in the sequence at each time step, we'll have to divide each sentence into

  • Input data: The last input character should be excluded as it does not need to be fed into the model
  • Target/Ground Truth Label: One time-step ahead of the Input data as this will be the "correct answer" for the model at each time step corresponding to the input data
def one_hot_encode(indices, dict_size):
    ''' Define one hot encode matrix for our sequences'''
    # Creating a multi-dimensional array with the desired output shape
    # Encode every integer with its one hot representation
    features = np.eye(dict_size, dtype=np.float32)[indices.flatten()]
    
    # Finally reshape it to get back to the original array
    features = features.reshape((*indices.shape, dict_size))
            
    return features

def encode_text(input_text, vocab, one_hot = False):
    ''' Encode the input_text replacing the char by its integer number based on the dictionary vocab'''
    # Replace every char by its integer value based on the vocabulary
    output = [vocab.char2int.get(character,0) for character in input_text]
    
    if one_hot:
    # One hot encode every integer of the sequence
        dict_size = len(vocab.char2int)
        return one_hot_encode(output, dict_size)
    else:
        return np.array(output)

Now, we can encode our text, replacing every character by the integer value in the dictionary. When we have our dataset unified and prepared, we should do a quick check and see an example of the data our model will be trained on. This is generally a good idea as it allows you to see how each of the further processing steps affects the reviews and it also ensures that the data has been loaded correctly.

# Encode the train dataset
train_data = encode_text(sentences, vocab, one_hot = False)

# Create the input sequence, from 0 to len-1
input_seq=train_data[:-1]
# Create the target sequence, from 1 to len. It is right-shifted one place
target_seq=train_data[1:]
print('\nOriginal text:')
print(sentences[:100])
print('\nEncoded text:')
print(train_data[:100])
print('\nInput sequence:')
print(input_seq[:100])
print('\nTarget sequence:')
print(target_seq[:100])
Original text:
before we proceed any further, hear me speak. speak, speak. you are all resolved rather to die than 

Encoded text:
[ 3  7 15  8  9  7 32 12  7 32 29  9  8 33  7  7 18 32 21 30 31 32 15 28
  9 26 27  7  9 11 32 27  7 21  9 32 24  7 32 22 29  7 21  6 37 32 22 29
  7 21  6 11 32 22 29  7 21  6 37 32 31  8 28 32 21  9  7 32 21  1  1 32
  9  7 22  8  1 23  7 18 32  9 21 26 27  7  9 32 26  8 32 18 10  7 32 26
 27 21 30 32]

Input sequence:
[ 3  7 15  8  9  7 32 12  7 32 29  9  8 33  7  7 18 32 21 30 31 32 15 28
  9 26 27  7  9 11 32 27  7 21  9 32 24  7 32 22 29  7 21  6 37 32 22 29
  7 21  6 11 32 22 29  7 21  6 37 32 31  8 28 32 21  9  7 32 21  1  1 32
  9  7 22  8  1 23  7 18 32  9 21 26 27  7  9 32 26  8 32 18 10  7 32 26
 27 21 30 32]

Target sequence:
[ 7 15  8  9  7 32 12  7 32 29  9  8 33  7  7 18 32 21 30 31 32 15 28  9
 26 27  7  9 11 32 27  7 21  9 32 24  7 32 22 29  7 21  6 37 32 22 29  7
 21  6 11 32 22 29  7 21  6 37 32 31  8 28 32 21  9  7 32 21  1  1 32  9
  7 22  8  1 23  7 18 32  9 21 26 27  7  9 32 26  8 32 18 10  7 32 26 27
 21 30 32 26]

Now we can save our encoded dataset to a file, so we can restore it whenever it is necessary. It is important to note the format of the data that we are saving as we will need to know it when we write the training code. In our case, we will save the dataset as a pickle object, it is the array containing the whole dataset encoded as an integer value for every character.

# Save the encoded text to a file
encoded_data = os.path.join(DATA_PATH, 'input_data.pkl')
with open(encoded_data, 'wb') as fp:
    pickle.dump(train_data, fp)

Lets check our one-hot-encode function that we will use later during the training phase:

print('Encoded characters: ',train_data[100:102])
print('One-hot-encoded characters: ',one_hot_encode(train_data[100:102], 28))
Encoded characters:  [26  8]
One-hot-encoded characters:  [[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 1. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0.]]

Create a batch data generator

When training on the dataset, we need to extract a batch size examples from the inputs and targets, forward and backward the RNN and then repite the iteration with another batch size examples. A batch generator will help us to extract a batch size examples from our datasets.

First we will load a small portion of the training data set to use as a sample. It would be very time consuming to try and train the model completely in the notebook as we do not have access to a gpu and the compute instance that we are using is not particularly powerful. However, we can work on a small bit of the data to get a feel for how our training script is behaving.

def batch_generator_sequence(features_seq, label_seq, batch_size, seq_len):
    """Generator function that yields batches of data (input and target)

    Args:
        features_seq: sequence of chracters, feature of our model.
        label_seq: sequence of chracters, the target label of our model
        batch_size (int): number of examples (in this case, sentences) per batch.
        seq_len (int): maximum length of the output tensor.

    Yields:
        x_epoch: sequence of features for the epoch
        y_epoch: sequence of labels for the epoch
    """
    # calculate the number of batches we can supply
    num_batches = len(features_seq) // (batch_size * seq_len)
    if num_batches == 0:
        raise ValueError("No batches created. Use smaller batch size or sequence length.")
    # calculate effective length of text to use
    rounded_len = num_batches * batch_size * seq_len
    # Reshape the features matrix in batch size x num_batches * seq_len
    x = np.reshape(features_seq[: rounded_len], [batch_size, num_batches * seq_len])
    
    # Reshape the target matrix in batch size x num_batches * seq_len
    y = np.reshape(label_seq[: rounded_len], [batch_size, num_batches * seq_len])
    
    epoch = 0
    while True:
        # roll so that no need to reset rnn states over epochs
        x_epoch = np.split(np.roll(x, -epoch, axis=0), num_batches, axis=1)
        y_epoch = np.split(np.roll(y, -epoch, axis=0), num_batches, axis=1)
        for batch in range(num_batches):
            yield x_epoch[batch], y_epoch[batch]
        epoch += 1

Define the RNN model

The model is very simple_

  • An LSTM layer to encode the input (there is no need for an embedding layer because the data is one-hot-encoded)
  • A dropout layer to reduce overfitting
  • The decoder, a fully connected layer mapping to a vocabulary size outputs

The output provides the probability of every item in the vocabulary to be the next char.

class RNNModel(nn.Module):
    def __init__(self, vocab_size, embedding_size, hidden_dim, n_layers, drop_rate=0.2):
        
        super(RNNModel, self).__init__()

        # Defining some parameters
        self.hidden_dim = hidden_dim
        self.embedding_size = embedding_size
        self.n_layers = n_layers
        self.vocab_size = vocab_size
        self.drop_rate = drop_rate
        self.char2int = None
        self.int2char = None


        #Defining the layers
        # Define the encoder as an Embedding layer
        #self.encoder = nn.Embedding(vocab_size, embedding_size)
            
        # Dropout layer
        self.dropout = nn.Dropout(drop_rate)
        # RNN Layer
        self.rnn = nn.LSTM(embedding_size, hidden_dim, n_layers, dropout=drop_rate, batch_first = True)
        # Fully connected layer
        self.decoder = nn.Linear(hidden_dim, vocab_size)
    
    def forward(self, x, state):
        
        # input shape: [batch_size, seq_len, embedding_size]
        # Apply the embedding layer and dropout
        #embed_seq = self.dropout(self.encoder(x))
            
        #print('Input RNN shape: ', embed_seq.shape)
        # shape: [batch_size, seq_len, embedding_size]
        rnn_out, state = self.rnn(x, state)
        #print('Out RNN shape: ', rnn_out.shape)
        # rnn_out shape: [batch_size, seq_len, rnn_size]
        # hidden shape: [2, num_layers, batch_size, rnn_size]
        rnn_out = self.dropout(rnn_out)

        # shape: [seq_len, batch_size, rnn_size]
        # Stack up LSTM outputs using view
        # you may need to use contiguous to reshape the output
        rnn_out = rnn_out.contiguous().view(-1, self.hidden_dim)

        logits = self.decoder(rnn_out)
        # output shape: [seq_len * batch_size, vocab_size]
        #print('Output model shape: ', logits.shape)
        return logits, state
    
    def init_state(self, device, batch_size=1):
        """
        initialises rnn states.
        """
        #return (Variable(torch.zeros(self.n_layers, batch_size, self.hidden_dim)),
        #        Variable(torch.zeros(self.n_layers, batch_size, self.hidden_dim)))
        return (torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device),
                torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device))

    def predict(self, input):
        # input shape: [seq_len, batch_size]
        logits, hidden = self.forward(input)
        # logits shape: [seq_len * batch_size, vocab_size]
        # hidden shape: [2, num_layers, batch_size, rnn_size]
        probs = F.softmax(logits)
        # shape: [seq_len * batch_size, vocab_size]
        probs = probs.view(input.size(0), input.size(1), probs.size(1))
        # output shape: [seq_len, batch_size, vocab_size]
        return probs, hidden

Training the model

After defining a RNN model, we can code the main training function. It is very simple and the steps involved are the usual ones in any other training of a neural network: in every epoch get the next batch data, move the tensors to the device, call the model (Forward pass), calculate the loss function, get the gradients and update the weights.

def train_main(model, optimizer, loss_fn, batch_data, num_batches, val_batches, batch_size, seq_len, n_epochs, clip_norm, device):
    # Training Run
    
    for epoch in range(1, n_epochs + 1):
        # Store the loss in every batch iteration
        #epoch_losses = torch.Tensor(num_batches)
        epoch_losses = []
        # Init the hidden state
        hidden = model.init_state(device, batch_size)
        # Train all the batches in every epoch
        for i in tqdm(range(num_batches-val_batches), desc="Epoch {}/{}".format(epoch, n_epochs+1)):
            #print('Batch :', i)
            # Get the next batch data for input and target
            input_batch, target_batch = next(batch_data)
            # Onr hot encode the input data
            input_batch = one_hot_encode(input_batch, model.vocab_size)
            # Tranform to tensor
            input_data = torch.from_numpy(input_batch)
            target_data = torch.from_numpy(target_batch)
            # Create a new variable for the hidden state, necessary to calculate the gradients
            hidden = tuple(([Variable(var.data) for var in hidden]))
            # Move the input data to the device
            input_data = input_data.to(device)
            #print('Input shape: ', input_data.shape)
            #print('Hidden shape: ', hidden[0].shape, hidden[1].shape)
            # Set the model to train and prepare the gradients
            model.train()
            optimizer.zero_grad() # Clears existing gradients from previous epoch
            # Pass Fordward the RNN
            output, hidden = model(input_data, hidden)
            #print('Output shape: ', output.shape)
            output = output.to(device)
            #print('Output shape: ', output.shape)
            #print('Target shape; ', target_data.shape)
            # Move the target data to the device
            target_data = target_data.to(device)
            #print('Target shape; ', target_data.shape)
            target_data = torch.reshape(target_data, (batch_size*seq_len,))
            #print('Target shape; ', target_data.shape)
            loss = loss_fn(output, target_data.view(batch_size*seq_len))
            #print(loss)
            # Save the loss
            #epoch_losses[i] = loss.item() #data[0]
            epoch_losses.append(loss.item()) #data[0]
        
            loss.backward() # Does backpropagation and calculates gradients
            # clip gradient norm
            nn.utils.clip_grad_norm_(model.parameters(), clip_norm)
            
            optimizer.step() # Updates the weights accordingly
    
        # Now, when epoch is finished, evaluate the model on validation data
        model.eval()
        val_hidden = model.init_state(device, batch_size)
        val_losses = []
        for i in tqdm(range(val_batches), desc="Val Epoch {}/{}".format(epoch, n_epochs+1)):
            # Get the next batch data for input and target
            input_batch, target_batch = next(batch_data)
            # Onr hot encode the input data
            input_batch = one_hot_encode(input_batch, model.vocab_size)
            # Tranform to tensor
            input_data = torch.from_numpy(input_batch)
            target_data = torch.from_numpy(target_batch)
            # Create a new variable for the hidden state, necessary to calculate the gradients
            hidden = tuple(([Variable(var.data) for var in val_hidden]))
            # Move the input data to the device
            input_data = input_data.to(device)
            # Pass Fordward the RNN
            output, hidden = model(input_data, hidden)
            #print('Output shape: ', output.shape)
            output = output.to(device)
            #print('Output shape: ', output.shape)
            #print('Target shape; ', target_data.shape)
            # Move the target data to the device
            target_data = target_data.to(device)
            #print('Target shape; ', target_data.shape)
            target_data = torch.reshape(target_data, (batch_size*seq_len,))
            #print('Target shape; ', target_data.shape)
            loss = loss_fn(output, target_data.view(batch_size*seq_len))
            #print(loss)
            # Save the loss
            val_losses.append(loss.item()) #data[0]

        model.train()                  
        #if epoch%2 == 0:
        print('Epoch: {}/{}.............'.format(epoch, n_epochs), end=' ')
        print("Train Loss: {:.4f}".format(np.mean(epoch_losses)), end=' ')
        print("Val Loss: {:.4f}".format(np.mean(val_losses)))
        
    return epoch_losses

Before we start building the model, let's use a build in feature in PyTorch to check the device we're running on (CPU or GPU). This implementation will not require GPU as the training is really simple. However, as you progress on to large datasets and models with millions of trainable parameters, using the GPU will be very important to speed up your training.

def set_device():
    # torch.cuda.is_available() checks and returns a Boolean True if a GPU is available, else it'll return False
    is_cuda = torch.cuda.is_available()

    # If we have a GPU available, we'll set our device to GPU. We'll use this device variable later in our code.
    if is_cuda:
        device = torch.device("cuda")
        print("GPU is available")
    else:
        device = torch.device("cpu")
        print("GPU not available, CPU used")
    
    return device

After defining the model above, we'll have to instantiate the model with the relevant parameters and define our hyperparamters as well. The hyperparameters we're defining below are:

  • n_epochs: Number of Epochs --> This refers to the number of times our model will go through the entire training dataset
  • lr: Learning Rate --> This affects the rate at which our model updates the weights in the cells each time backpropogation is done
    • A smaller learning rate means that the model changes the values of the weight with a smaller magnitude
    • A larger learning rate means that the weights are updated to a larger extent for each time step
  • batch_size: Number of examples to train on every train step
  • maxlen: Length of the input sequence of char
  • embedding_size: the vocab size because the input feature is one-hot-encoded
  • hidden_dim: the number of hidden units in our LSTM module
  • n_layers: number of layers of our LSTM module
# Define hyperparameters for training
n_epochs = 5
lr=0.01
batch_size=64
maxlen=64
clip_norm=5
val_fraction = 0.1

# Define hypeparameters of the model
hidden_dim = 64 #64
n_layers = 1
embedding_size=len(vocab.char2int)
dict_size = len(vocab.char2int)
drop_rate = 0.2

# Set the device for training
device = set_device()
print('Device: ', device)
# Set a seed to reproduce experiments
torch.manual_seed(seed)
GPU not available, CPU used
Device:  cpu
<torch._C.Generator at 0x7f5761eba3b0>

Similar to other neural networks, we have to define the optimizer and loss function as well. We’ll be using CrossEntropyLoss as the final output is basically a classification task.

# Instantiate the model with hyperparameters
model = RNNModel(dict_size,embedding_size, hidden_dim, n_layers)
# We'll also set the model to the device that we defined earlier (default is CPU)
model = model.to(device)
print(model)
# Define Loss, Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
RNNModel(
  (dropout): Dropout(p=0.2, inplace=False)
  (rnn): LSTM(38, 64, batch_first=True, dropout=0.2)
  (decoder): Linear(in_features=64, out_features=38, bias=True)
)
/home/ec2-user/anaconda3/envs/pytorch_p36/lib/python3.6/site-packages/torch/nn/modules/rnn.py:50: UserWarning: dropout option adds dropout after all but last recurrent layer, so non-zero dropout expects num_layers greater than 1, but got dropout=0.2 and num_layers=1
  "num_layers={}".format(dropout, num_layers))
# Limit the size of our input sequence to limit the training time, we are just testing the model
input_seq = input_seq[:100000]
target_seq = target_seq[:100000]
print(len(input_seq))
100000
# Calculate the number of batches to train
num_batches = len(input_seq) // (batch_size*maxlen)
val_batches = int(num_batches*val_fraction)
# Create the batch data generator
batch_data = batch_generator_sequence(input_seq, target_seq, batch_size, maxlen)
losses = train_main(model, optimizer, criterion, batch_data, num_batches, val_batches, batch_size, 
                    maxlen, n_epochs, clip_norm, device)
Epoch 1/6: 100%|██████████| 22/22 [00:02<00:00,  8.41it/s]
Val Epoch 1/6: 100%|██████████| 2/2 [00:00<00:00, 61.46it/s]
Epoch 2/6:  14%|█▎        | 3/22 [00:00<00:00, 24.20it/s]
Epoch: 1/5............. Train Loss: 3.0507 Val Loss: 2.8581
Epoch 2/6: 100%|██████████| 22/22 [00:00<00:00, 24.67it/s]
Val Epoch 2/6: 100%|██████████| 2/2 [00:00<00:00, 61.66it/s]
Epoch 3/6:  14%|█▎        | 3/22 [00:00<00:00, 24.21it/s]
Epoch: 2/5............. Train Loss: 2.6476 Val Loss: 2.4374
Epoch 3/6: 100%|██████████| 22/22 [00:00<00:00, 24.60it/s]
Val Epoch 3/6: 100%|██████████| 2/2 [00:00<00:00, 62.49it/s]
Epoch 4/6:  14%|█▎        | 3/22 [00:00<00:00, 24.54it/s]
Epoch: 3/5............. Train Loss: 2.3630 Val Loss: 2.2691
Epoch 4/6: 100%|██████████| 22/22 [00:00<00:00, 24.14it/s]
Val Epoch 4/6: 100%|██████████| 2/2 [00:00<00:00, 56.03it/s]
Epoch 5/6:  14%|█▎        | 3/22 [00:00<00:00, 23.93it/s]
Epoch: 4/5............. Train Loss: 2.2456 Val Loss: 2.1838
Epoch 5/6: 100%|██████████| 22/22 [00:00<00:00, 24.33it/s]
Val Epoch 5/6: 100%|██████████| 2/2 [00:00<00:00, 61.45it/s]
Epoch: 5/5............. Train Loss: 2.1780 Val Loss: 2.1277

Once the model is trained, we save it to disk then we can reload later and use ir for prediction. We also save the model parameters, they will be used to recreate the model if it is necessary.

# Save the parameters used to construct the model
model_info_path = os.path.join(model_dir, 'model_info.pth')
with open(model_info_path, 'wb') as f:
    model_info = {
        'n_layers': n_layers,
        'embedding_dim': embedding_size,
        'hidden_dim': hidden_dim,
        'vocab_size': dict_size,
        'drop_rate': drop_rate
    }
    torch.save(model_info, f)

# Save the model parameters
model_path = os.path.join(model_dir, 'model.pth')
with open(model_path, 'wb') as f:
    torch.save(model.state_dict(), f)

Predict an input sequence

def sample_from_probs(probs, top_n=10):
    """
    truncated weighted random choice.
    """
    _, indices = torch.sort(probs)
    # set probabilities after top_n to 0
    probs[indices.data[:-top_n]] = 0
    #print(probs.shape)
    sampled_index = torch.multinomial(probs, 1)
    return sampled_index

def predict_probs(model, hidden, character, vocab):
    # One-hot encoding our input to fit into the model
    character = np.array([[vocab.char2int[c] for c in character]])
    #character = one_hot_encode(character, len(vocab.char2int), character.shape[1], 1)
    character = one_hot_encode(character, model.vocab_size)
    character = torch.from_numpy(character)
    character = character.to(device)
    
    out, hidden = model(character, hidden)

    prob = nn.functional.softmax(out[-1], dim=0).data

    return prob, hidden

Let’s test our model now and see what kind of output we will get.

def generate_from_text(model, out_len, vocab, top_n=1, start='hey'):
    model.eval() # eval mode
    start = start.lower()
    # First off, run through the starting characters
    chars = [ch for ch in start]
    size = out_len - len(chars)
    # Generate the initial hidden state
    device = set_device()
    state = model.init_state(device, 1)
    
    # Warm up the initial state, predicting on the initial string
    for ch in chars:
        #char, state = predict(model, ch, state, top_n=top_k)
        probs, state = predict_probs(model, state, ch, vocab)
        next_index = sample_from_probs(probs, top_n)

    # Now pass in the previous characters and get a new one
    for ii in range(size):
        #char, h = predict_char(model, chars, vocab)
        probs, state = predict_probs(model, state, chars, vocab)
        next_index = sample_from_probs(probs, top_n)
        # append to sequence
        chars.append(vocab.int2char[next_index.data[0]])

    return ''.join(chars)
text_predicted = generate_from_text(model, 30, vocab, 3, 'we want ')
print(text_predicted)
print(len(text_predicted))
GPU not available, CPU used
we want the he ares. ar to hat
30

The next function will feed our model one character at a time instead of providing it with the entire string of text.

def generate_from_char(model, out_len, vocab, top_n=1, start='hey'):
    model.eval() # eval mode
    start = start.lower()
    # First off, run through the starting characters
    chars = [ch for ch in start]
    size = out_len - len(chars)
    # Generate the initial hidden state
    device = set_device()
    state = model.init_state(device, 1)
    # Warm up the initial state, predicting on the initial string
    for ch in chars:
        #char, state = predict(model, ch, state, top_n=top_k)
        probs, state = predict_probs(model, state, ch, vocab)
        next_index = sample_from_probs(probs, top_n)
        
    # Include the last char predicted to the predicted output
    chars.append(vocab.int2char[next_index.data[0]])   
    
    # Now pass in the previous characters and get a new one
    for ii in range(size-1):
        #char, h = predict_char(model, chars, vocab)
        probs, state = predict_probs(model, state, chars[-1], vocab)
        next_index = sample_from_probs(probs, top_n)
        # append to sequence
        chars.append(vocab.int2char[next_index.data[0]])

    return ''.join(chars)
text_predicted = generate_from_char(model, 30, vocab, 3, 'we want ')
print(text_predicted)
print(len(text_predicted))
GPU not available, CPU used
we want what, and and whin she
30

We also have developed a function to predict the next char given a initial string:

def predict_char(model, character, vocab):
    # One-hot encoding our input to fit into the model
    character = np.array([[vocab.char2int[c] for c in character]])
    #character = one_hot_encode(character, len(vocab.char2int), character.shape[1], 1)
    character = one_hot_encode(character, model.vocab_size)
    character = torch.from_numpy(character)
    # Generate set the device
    device = set_device()
    character = character.to(device)
    
    model.eval() # eval mode
    # Generate the initial hidden state
    state = model.init_state(device, 1)

    out, hidden = model(character, state)

    prob = nn.functional.softmax(out[-1], dim=0).data
    # Taking the class with the highest probability score from the output
    m = torch.max(prob, dim=0)
    char_ind = torch.max(prob, dim=0)[1].item()

    return vocab.int2char[char_ind], hidden
t,_ = predict_char(model, 'we want ', vocab)
print('Initial string: ', t)
GPU not available, CPU used
Initial string:  t

At this point we are ready to train our model in Amazon SageMaker using the whole data set and improving its performance training on many epochs for a longer time.