A guide on Recurrent Neural Networks: Character-level Text Generator
A simple RNN model with PyTorch to familiarize ourselves with the PyTorch library and get started with RNNs. The goal is to build a model that can complete your sentence based on a few characters or a word used as input. In the second part, next post, we will train this model in Amazon SageMaker
Character-level text generator with Pytorch using Long Short-Term Memory Units
In this notebook we will be implementing a simple RNN character model with PyTorch to familiarize ourselves with the PyTorch library and get started with RNNs. The goal is to build a model that can complete your sentence based on a few characters or a word used as input.
The model will be fed with a word and will predict what the next character in the sentence will be. This process will repeat itself until we generate a sentence of our desired length.
import torch
from torch import nn
from torch.autograd import Variable
import os
import random as rnd
import numpy as np
import pickle
import time
import string
from tqdm import tqdm
Loading the dataset
First, we'll define the sentences that we want our model to output when fed with the first word or the first few characters. Our dataset is a text file containing Shakespeare's plays or books that we will extract sequence of chars to use as input to our model. Then our model will learn how to complete sentences like "Shakespeare would do".
This dataset can be downloaded from Karpathy's Github account: https://github.com/karpathy/char-rnn/blob/master/data/tinyshakespeare/input.txt.
As in many of my notebooks, we set some variables to the data directory and filenames. If you want to run this code on your own enviroment you must change these values:
# Set the root folder
root_folder='.'
# Set the folder with the dataset
data_folder_name='data'
model_folder_name='model'
# Set the filename
filename='input.txt'
# Path to the data folder
DATA_PATH = os.path.abspath(os.path.join(root_folder, data_folder_name))
model_dir = os.path.abspath(os.path.join(root_folder, model_folder_name))
# Set the path where the text for training is stored
train_path = os.path.join(DATA_PATH, filename)
# Set a seed
seed = 1
def load_text_data(filename, init_dialog=False):
''' Load the texts from the filename, splitting by lines and removing empty strings.
Setting init_dialog = True will remove lines where the character who is going to speak is indicated
'''
sentences = []
with open(filename, 'r') as reader:
#sentences = reader.readlines()
for line in reader:
#if ':' not in line and line !='\n':
if init_dialog or ':' not in line:
# Append the line to the sentences, removing the end of line character
sentences.append(line[:-1])
return sentences
Loading the input data, sentences from Shakespeare's plays.
sentences = load_text_data(train_path)
print('Number of sentences: ', len(sentences))
print(sentences[:20])
def clean_text(sentences, alpha=False):
''' Cleaning process of the text'''
if alpha:
# Remove non alphabetic character
cleaned_text = [''.join([t.lower() for t in text if t.isalpha() or t.isspace()]) for text in sentences]
else:
# Simply lower the characters
cleaned_text = [t.lower() for t in sentences]
# Remove any emoty string
cleaned_text = [t for t in cleaned_text if t!='']
return cleaned_text
# Clean the sentences
sentences = clean_text(sentences, False)
# Join all the sentences in a one long string
sentences = ' '.join(sentences)
print('Number of characters: ', len(sentences))
print(sentences[:100])
class CharVocab:
''' Create a Vocabulary for '''
def __init__(self, type_vocab,pad_token='<PAD>', eos_token='<EOS>', unk_token='<UNK>'): #Initialization of the type of vocabulary
self.type = type_vocab
#self.int2char ={}
self.int2char = []
if pad_token !=None:
self.int2char += [pad_token]
if eos_token !=None:
self.int2char += [eos_token]
if unk_token !=None:
self.int2char += [unk_token]
self.char2int = {}
def __call__(self, text): #When called, adds the values of parameters x_1 and x_2, prints and returns the result
# Join all the sentences together and extract the unique characters from the combined sentences
chars = set(''.join(text))
# Creating a dictionary that maps integers to the characters
self.int2char += list(chars)
# Creating another dictionary that maps characters to integers
self.char2int = {char: ind for ind, char in enumerate(self.int2char)}
vocab = CharVocab('char',None,None,'<UNK>')
vocab(sentences)
print('Length of vocabulary: ', len(vocab.int2char))
print('Int to Char: ', vocab.int2char)
print('Char to Int: ', vocab.char2int)
# Check or create the directory where dictionary will be saved
if not os.path.exists(DATA_PATH): # Make sure that the folder exists
os.makedirs(DATA_PATH)
# Save the dictionary to data path dir
with open(os.path.join(DATA_PATH, 'char_dict.pkl'), "wb") as f:
pickle.dump(vocab.char2int, f)
with open(os.path.join(DATA_PATH, 'int_dict.pkl'), "wb") as f:
pickle.dump(vocab.int2char, f)
Create the input data and labels for training
As we're going to predict the next character in the sequence at each time step, we'll have to divide each sentence into
- Input data: The last input character should be excluded as it does not need to be fed into the model
- Target/Ground Truth Label: One time-step ahead of the Input data as this will be the "correct answer" for the model at each time step corresponding to the input data
def one_hot_encode(indices, dict_size):
''' Define one hot encode matrix for our sequences'''
# Creating a multi-dimensional array with the desired output shape
# Encode every integer with its one hot representation
features = np.eye(dict_size, dtype=np.float32)[indices.flatten()]
# Finally reshape it to get back to the original array
features = features.reshape((*indices.shape, dict_size))
return features
def encode_text(input_text, vocab, one_hot = False):
''' Encode the input_text replacing the char by its integer number based on the dictionary vocab'''
# Replace every char by its integer value based on the vocabulary
output = [vocab.char2int.get(character,0) for character in input_text]
if one_hot:
# One hot encode every integer of the sequence
dict_size = len(vocab.char2int)
return one_hot_encode(output, dict_size)
else:
return np.array(output)
Now, we can encode our text, replacing every character by the integer value in the dictionary. When we have our dataset unified and prepared, we should do a quick check and see an example of the data our model will be trained on. This is generally a good idea as it allows you to see how each of the further processing steps affects the reviews and it also ensures that the data has been loaded correctly.
# Encode the train dataset
train_data = encode_text(sentences, vocab, one_hot = False)
# Create the input sequence, from 0 to len-1
input_seq=train_data[:-1]
# Create the target sequence, from 1 to len. It is right-shifted one place
target_seq=train_data[1:]
print('\nOriginal text:')
print(sentences[:100])
print('\nEncoded text:')
print(train_data[:100])
print('\nInput sequence:')
print(input_seq[:100])
print('\nTarget sequence:')
print(target_seq[:100])
Now we can save our encoded dataset to a file, so we can restore it whenever it is necessary. It is important to note the format of the data that we are saving as we will need to know it when we write the training code. In our case, we will save the dataset as a pickle object, it is the array containing the whole dataset encoded as an integer value for every character.
# Save the encoded text to a file
encoded_data = os.path.join(DATA_PATH, 'input_data.pkl')
with open(encoded_data, 'wb') as fp:
pickle.dump(train_data, fp)
Lets check our one-hot-encode function that we will use later during the training phase:
print('Encoded characters: ',train_data[100:102])
print('One-hot-encoded characters: ',one_hot_encode(train_data[100:102], 28))
Create a batch data generator
When training on the dataset, we need to extract a batch size examples from the inputs and targets, forward and backward the RNN and then repite the iteration with another batch size examples. A batch generator will help us to extract a batch size examples from our datasets.
First we will load a small portion of the training data set to use as a sample. It would be very time consuming to try and train the model completely in the notebook as we do not have access to a gpu and the compute instance that we are using is not particularly powerful. However, we can work on a small bit of the data to get a feel for how our training script is behaving.
def batch_generator_sequence(features_seq, label_seq, batch_size, seq_len):
"""Generator function that yields batches of data (input and target)
Args:
features_seq: sequence of chracters, feature of our model.
label_seq: sequence of chracters, the target label of our model
batch_size (int): number of examples (in this case, sentences) per batch.
seq_len (int): maximum length of the output tensor.
Yields:
x_epoch: sequence of features for the epoch
y_epoch: sequence of labels for the epoch
"""
# calculate the number of batches we can supply
num_batches = len(features_seq) // (batch_size * seq_len)
if num_batches == 0:
raise ValueError("No batches created. Use smaller batch size or sequence length.")
# calculate effective length of text to use
rounded_len = num_batches * batch_size * seq_len
# Reshape the features matrix in batch size x num_batches * seq_len
x = np.reshape(features_seq[: rounded_len], [batch_size, num_batches * seq_len])
# Reshape the target matrix in batch size x num_batches * seq_len
y = np.reshape(label_seq[: rounded_len], [batch_size, num_batches * seq_len])
epoch = 0
while True:
# roll so that no need to reset rnn states over epochs
x_epoch = np.split(np.roll(x, -epoch, axis=0), num_batches, axis=1)
y_epoch = np.split(np.roll(y, -epoch, axis=0), num_batches, axis=1)
for batch in range(num_batches):
yield x_epoch[batch], y_epoch[batch]
epoch += 1
Define the RNN model
The model is very simple_
- An LSTM layer to encode the input (there is no need for an embedding layer because the data is one-hot-encoded)
- A dropout layer to reduce overfitting
- The decoder, a fully connected layer mapping to a vocabulary size outputs
The output provides the probability of every item in the vocabulary to be the next char.
class RNNModel(nn.Module):
def __init__(self, vocab_size, embedding_size, hidden_dim, n_layers, drop_rate=0.2):
super(RNNModel, self).__init__()
# Defining some parameters
self.hidden_dim = hidden_dim
self.embedding_size = embedding_size
self.n_layers = n_layers
self.vocab_size = vocab_size
self.drop_rate = drop_rate
self.char2int = None
self.int2char = None
#Defining the layers
# Define the encoder as an Embedding layer
#self.encoder = nn.Embedding(vocab_size, embedding_size)
# Dropout layer
self.dropout = nn.Dropout(drop_rate)
# RNN Layer
self.rnn = nn.LSTM(embedding_size, hidden_dim, n_layers, dropout=drop_rate, batch_first = True)
# Fully connected layer
self.decoder = nn.Linear(hidden_dim, vocab_size)
def forward(self, x, state):
# input shape: [batch_size, seq_len, embedding_size]
# Apply the embedding layer and dropout
#embed_seq = self.dropout(self.encoder(x))
#print('Input RNN shape: ', embed_seq.shape)
# shape: [batch_size, seq_len, embedding_size]
rnn_out, state = self.rnn(x, state)
#print('Out RNN shape: ', rnn_out.shape)
# rnn_out shape: [batch_size, seq_len, rnn_size]
# hidden shape: [2, num_layers, batch_size, rnn_size]
rnn_out = self.dropout(rnn_out)
# shape: [seq_len, batch_size, rnn_size]
# Stack up LSTM outputs using view
# you may need to use contiguous to reshape the output
rnn_out = rnn_out.contiguous().view(-1, self.hidden_dim)
logits = self.decoder(rnn_out)
# output shape: [seq_len * batch_size, vocab_size]
#print('Output model shape: ', logits.shape)
return logits, state
def init_state(self, device, batch_size=1):
"""
initialises rnn states.
"""
#return (Variable(torch.zeros(self.n_layers, batch_size, self.hidden_dim)),
# Variable(torch.zeros(self.n_layers, batch_size, self.hidden_dim)))
return (torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device),
torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device))
def predict(self, input):
# input shape: [seq_len, batch_size]
logits, hidden = self.forward(input)
# logits shape: [seq_len * batch_size, vocab_size]
# hidden shape: [2, num_layers, batch_size, rnn_size]
probs = F.softmax(logits)
# shape: [seq_len * batch_size, vocab_size]
probs = probs.view(input.size(0), input.size(1), probs.size(1))
# output shape: [seq_len, batch_size, vocab_size]
return probs, hidden
After defining a RNN model, we can code the main training function. It is very simple and the steps involved are the usual ones in any other training of a neural network: in every epoch get the next batch data, move the tensors to the device, call the model (Forward pass), calculate the loss function, get the gradients and update the weights.
def train_main(model, optimizer, loss_fn, batch_data, num_batches, val_batches, batch_size, seq_len, n_epochs, clip_norm, device):
# Training Run
for epoch in range(1, n_epochs + 1):
# Store the loss in every batch iteration
#epoch_losses = torch.Tensor(num_batches)
epoch_losses = []
# Init the hidden state
hidden = model.init_state(device, batch_size)
# Train all the batches in every epoch
for i in tqdm(range(num_batches-val_batches), desc="Epoch {}/{}".format(epoch, n_epochs+1)):
#print('Batch :', i)
# Get the next batch data for input and target
input_batch, target_batch = next(batch_data)
# Onr hot encode the input data
input_batch = one_hot_encode(input_batch, model.vocab_size)
# Tranform to tensor
input_data = torch.from_numpy(input_batch)
target_data = torch.from_numpy(target_batch)
# Create a new variable for the hidden state, necessary to calculate the gradients
hidden = tuple(([Variable(var.data) for var in hidden]))
# Move the input data to the device
input_data = input_data.to(device)
#print('Input shape: ', input_data.shape)
#print('Hidden shape: ', hidden[0].shape, hidden[1].shape)
# Set the model to train and prepare the gradients
model.train()
optimizer.zero_grad() # Clears existing gradients from previous epoch
# Pass Fordward the RNN
output, hidden = model(input_data, hidden)
#print('Output shape: ', output.shape)
output = output.to(device)
#print('Output shape: ', output.shape)
#print('Target shape; ', target_data.shape)
# Move the target data to the device
target_data = target_data.to(device)
#print('Target shape; ', target_data.shape)
target_data = torch.reshape(target_data, (batch_size*seq_len,))
#print('Target shape; ', target_data.shape)
loss = loss_fn(output, target_data.view(batch_size*seq_len))
#print(loss)
# Save the loss
#epoch_losses[i] = loss.item() #data[0]
epoch_losses.append(loss.item()) #data[0]
loss.backward() # Does backpropagation and calculates gradients
# clip gradient norm
nn.utils.clip_grad_norm_(model.parameters(), clip_norm)
optimizer.step() # Updates the weights accordingly
# Now, when epoch is finished, evaluate the model on validation data
model.eval()
val_hidden = model.init_state(device, batch_size)
val_losses = []
for i in tqdm(range(val_batches), desc="Val Epoch {}/{}".format(epoch, n_epochs+1)):
# Get the next batch data for input and target
input_batch, target_batch = next(batch_data)
# Onr hot encode the input data
input_batch = one_hot_encode(input_batch, model.vocab_size)
# Tranform to tensor
input_data = torch.from_numpy(input_batch)
target_data = torch.from_numpy(target_batch)
# Create a new variable for the hidden state, necessary to calculate the gradients
hidden = tuple(([Variable(var.data) for var in val_hidden]))
# Move the input data to the device
input_data = input_data.to(device)
# Pass Fordward the RNN
output, hidden = model(input_data, hidden)
#print('Output shape: ', output.shape)
output = output.to(device)
#print('Output shape: ', output.shape)
#print('Target shape; ', target_data.shape)
# Move the target data to the device
target_data = target_data.to(device)
#print('Target shape; ', target_data.shape)
target_data = torch.reshape(target_data, (batch_size*seq_len,))
#print('Target shape; ', target_data.shape)
loss = loss_fn(output, target_data.view(batch_size*seq_len))
#print(loss)
# Save the loss
val_losses.append(loss.item()) #data[0]
model.train()
#if epoch%2 == 0:
print('Epoch: {}/{}.............'.format(epoch, n_epochs), end=' ')
print("Train Loss: {:.4f}".format(np.mean(epoch_losses)), end=' ')
print("Val Loss: {:.4f}".format(np.mean(val_losses)))
return epoch_losses
Before we start building the model, let's use a build in feature in PyTorch to check the device we're running on (CPU or GPU). This implementation will not require GPU as the training is really simple. However, as you progress on to large datasets and models with millions of trainable parameters, using the GPU will be very important to speed up your training.
def set_device():
# torch.cuda.is_available() checks and returns a Boolean True if a GPU is available, else it'll return False
is_cuda = torch.cuda.is_available()
# If we have a GPU available, we'll set our device to GPU. We'll use this device variable later in our code.
if is_cuda:
device = torch.device("cuda")
print("GPU is available")
else:
device = torch.device("cpu")
print("GPU not available, CPU used")
return device
After defining the model above, we'll have to instantiate the model with the relevant parameters and define our hyperparamters as well. The hyperparameters we're defining below are:
- n_epochs: Number of Epochs --> This refers to the number of times our model will go through the entire training dataset
- lr: Learning Rate --> This affects the rate at which our model updates the weights in the cells each time backpropogation is done
- A smaller learning rate means that the model changes the values of the weight with a smaller magnitude
- A larger learning rate means that the weights are updated to a larger extent for each time step
- batch_size: Number of examples to train on every train step
- maxlen: Length of the input sequence of char
- embedding_size: the vocab size because the input feature is one-hot-encoded
- hidden_dim: the number of hidden units in our LSTM module
- n_layers: number of layers of our LSTM module
# Define hyperparameters for training
n_epochs = 5
lr=0.01
batch_size=64
maxlen=64
clip_norm=5
val_fraction = 0.1
# Define hypeparameters of the model
hidden_dim = 64 #64
n_layers = 1
embedding_size=len(vocab.char2int)
dict_size = len(vocab.char2int)
drop_rate = 0.2
# Set the device for training
device = set_device()
print('Device: ', device)
# Set a seed to reproduce experiments
torch.manual_seed(seed)
Similar to other neural networks, we have to define the optimizer and loss function as well. We’ll be using CrossEntropyLoss as the final output is basically a classification task.
# Instantiate the model with hyperparameters
model = RNNModel(dict_size,embedding_size, hidden_dim, n_layers)
# We'll also set the model to the device that we defined earlier (default is CPU)
model = model.to(device)
print(model)
# Define Loss, Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
# Limit the size of our input sequence to limit the training time, we are just testing the model
input_seq = input_seq[:100000]
target_seq = target_seq[:100000]
print(len(input_seq))
# Calculate the number of batches to train
num_batches = len(input_seq) // (batch_size*maxlen)
val_batches = int(num_batches*val_fraction)
# Create the batch data generator
batch_data = batch_generator_sequence(input_seq, target_seq, batch_size, maxlen)
losses = train_main(model, optimizer, criterion, batch_data, num_batches, val_batches, batch_size,
maxlen, n_epochs, clip_norm, device)
Once the model is trained, we save it to disk then we can reload later and use ir for prediction. We also save the model parameters, they will be used to recreate the model if it is necessary.
# Save the parameters used to construct the model
model_info_path = os.path.join(model_dir, 'model_info.pth')
with open(model_info_path, 'wb') as f:
model_info = {
'n_layers': n_layers,
'embedding_dim': embedding_size,
'hidden_dim': hidden_dim,
'vocab_size': dict_size,
'drop_rate': drop_rate
}
torch.save(model_info, f)
# Save the model parameters
model_path = os.path.join(model_dir, 'model.pth')
with open(model_path, 'wb') as f:
torch.save(model.state_dict(), f)
def sample_from_probs(probs, top_n=10):
"""
truncated weighted random choice.
"""
_, indices = torch.sort(probs)
# set probabilities after top_n to 0
probs[indices.data[:-top_n]] = 0
#print(probs.shape)
sampled_index = torch.multinomial(probs, 1)
return sampled_index
def predict_probs(model, hidden, character, vocab):
# One-hot encoding our input to fit into the model
character = np.array([[vocab.char2int[c] for c in character]])
#character = one_hot_encode(character, len(vocab.char2int), character.shape[1], 1)
character = one_hot_encode(character, model.vocab_size)
character = torch.from_numpy(character)
character = character.to(device)
out, hidden = model(character, hidden)
prob = nn.functional.softmax(out[-1], dim=0).data
return prob, hidden
Let’s test our model now and see what kind of output we will get.
def generate_from_text(model, out_len, vocab, top_n=1, start='hey'):
model.eval() # eval mode
start = start.lower()
# First off, run through the starting characters
chars = [ch for ch in start]
size = out_len - len(chars)
# Generate the initial hidden state
device = set_device()
state = model.init_state(device, 1)
# Warm up the initial state, predicting on the initial string
for ch in chars:
#char, state = predict(model, ch, state, top_n=top_k)
probs, state = predict_probs(model, state, ch, vocab)
next_index = sample_from_probs(probs, top_n)
# Now pass in the previous characters and get a new one
for ii in range(size):
#char, h = predict_char(model, chars, vocab)
probs, state = predict_probs(model, state, chars, vocab)
next_index = sample_from_probs(probs, top_n)
# append to sequence
chars.append(vocab.int2char[next_index.data[0]])
return ''.join(chars)
text_predicted = generate_from_text(model, 30, vocab, 3, 'we want ')
print(text_predicted)
print(len(text_predicted))
The next function will feed our model one character at a time instead of providing it with the entire string of text.
def generate_from_char(model, out_len, vocab, top_n=1, start='hey'):
model.eval() # eval mode
start = start.lower()
# First off, run through the starting characters
chars = [ch for ch in start]
size = out_len - len(chars)
# Generate the initial hidden state
device = set_device()
state = model.init_state(device, 1)
# Warm up the initial state, predicting on the initial string
for ch in chars:
#char, state = predict(model, ch, state, top_n=top_k)
probs, state = predict_probs(model, state, ch, vocab)
next_index = sample_from_probs(probs, top_n)
# Include the last char predicted to the predicted output
chars.append(vocab.int2char[next_index.data[0]])
# Now pass in the previous characters and get a new one
for ii in range(size-1):
#char, h = predict_char(model, chars, vocab)
probs, state = predict_probs(model, state, chars[-1], vocab)
next_index = sample_from_probs(probs, top_n)
# append to sequence
chars.append(vocab.int2char[next_index.data[0]])
return ''.join(chars)
text_predicted = generate_from_char(model, 30, vocab, 3, 'we want ')
print(text_predicted)
print(len(text_predicted))
We also have developed a function to predict the next char given a initial string:
def predict_char(model, character, vocab):
# One-hot encoding our input to fit into the model
character = np.array([[vocab.char2int[c] for c in character]])
#character = one_hot_encode(character, len(vocab.char2int), character.shape[1], 1)
character = one_hot_encode(character, model.vocab_size)
character = torch.from_numpy(character)
# Generate set the device
device = set_device()
character = character.to(device)
model.eval() # eval mode
# Generate the initial hidden state
state = model.init_state(device, 1)
out, hidden = model(character, state)
prob = nn.functional.softmax(out[-1], dim=0).data
# Taking the class with the highest probability score from the output
m = torch.max(prob, dim=0)
char_ind = torch.max(prob, dim=0)[1].item()
return vocab.int2char[char_ind], hidden
t,_ = predict_char(model, 'we want ', vocab)
print('Initial string: ', t)
At this point we are ready to train our model in Amazon SageMaker using the whole data set and improving its performance training on many epochs for a longer time.