Character-level Text Generator using Pytorch and Amazon SageMaker
Implementing a simple LSTM encoder-decoder model with PyTorch to familiarize ourselves with the PyTorch library and Amazon SageMaker framework. We will cover how to use Amazon Sagemaker to train a model, deploy as an endpoint service and invoke it to get some predictions
- Character-level text generator with Pytorch
- Using PyTorch and SageMaker
- General Outline
- Loading the libraries
- Step 1: Downloading and loading the data
- Step 2: Preparing and Processing the data
- Step 3: Upload the data to S3
- Step 4: Build and Train the PyTorch Model
- Create a batch data generator
- Step 5: Testing the model
- Step 6 - Deploy the model for inference
- Step 7 - Use the model for testing
Character-level text generator with Pytorch
Using PyTorch and SageMaker
Some parts of the notebook have been extracted or modified from a notebook of my exercises in the Machine Learning Egineer Nanodegree.
In this notebook we will be implementing a simple RNN character model with PyTorch to familiarize ourselves with the PyTorch library and get started with RNNs. The goal is to build a model that can complete your sentence based on a few characters or a word used as input. And we will use AWS Sagemaker to train the model, evaluate and deploy.
General Outline
Recall the general outline for SageMaker projects using a notebook instance.
- Download or otherwise retrieve the data.
- Process / Prepare the data.
- Upload the processed data to S3.
- Train a chosen model.
- Test the trained model (typically using a batch transform job).
- Deploy the trained model.
- Use the deployed model.
For this project, you will be following the steps in the general outline with some modifications.
First, we will not be testing the model in its own step. We will still be testing the model, however, we will do it by deploying your model and then using the deployed model by sending the test data to it. One of the reasons for doing this is so that we can make sure that our deployed model is working correctly before moving forward.
import os
import random as rnd
import numpy as np
import pickle
import time
Step 1: Downloading and loading the data
First, we'll define the sentences that we want our model to output when fed with the first word or the first few characters. Our dataset is a text file containing Shakespeare's plays or books from which we will extract sequence of chars to use as input to our model. Then our model will learn how to complete sentences like "Shakespeare would do".
This dataset can be downloaded from Karpathy's Github account: https://github.com/karpathy/char-rnn/blob/master/data/tinyshakespeare/input.txt.
The dataset is stored in our notebook instance, it is small and easy to "move", so we do not need to store it in S3 or other cloud storage service.
As in many of my notebooks, we set some variables to the data directory and filenames. If you want to run this code on your own enviroment you must change these values:
# Set the root folder
root_folder='.'
# Set the folder with the dataset
data_folder_name='data'
model_folder_name='model'
# Set the filename
filename='input.txt'
# Path to the data folder
DATA_PATH = os.path.abspath(os.path.join(root_folder, data_folder_name))
model_dir = os.path.abspath(os.path.join(root_folder, model_folder_name))
# Set the path where the text for training is stored
train_path = os.path.join(DATA_PATH, filename)
# Set a seed
seed = 1
def load_text_data(filename, init_dialog=False):
''' Load the texts from the filename, splitting by lines and removing empty strings.
Setting init_dialog = True will remove lines where the character who is going to speak is indicated
'''
sentences = []
with open(filename, 'r') as reader:
#sentences = reader.readlines()
for line in reader:
#if ':' not in line and line !='\n':
if init_dialog or ':' not in line:
# Append the line to the sentences, removing the end of line character
sentences.append(line[:-1])
return sentences
Loading the input data, sentences from Shakespeare's plays.
sentences = load_text_data(train_path)
print('Number of sentences: ', len(sentences))
print(sentences[:20])
def clean_text(sentences, alpha=False):
''' Cleaning process of the text'''
if alpha:
# Remove non alphabetic character
cleaned_text = [''.join([t.lower() for t in text if t.isalpha() or t.isspace()]) for text in sentences]
else:
# Simply lower the characters
cleaned_text = [t.lower() for t in sentences]
# Remove any emoty string
cleaned_text = [t for t in cleaned_text if t!='']
return cleaned_text
sentences = clean_text(sentences, False)
# Join all the sentences in a one long string
sentences = ' '.join(sentences)
print('Number of characters: ', len(sentences))
print(sentences[:100])
Our input data is a sequence of 900,000 characters, we will extract the label data from this sequence and split it into a train and validation dataset. But we will do this tasks after encoding the text data.
class CharVocab:
''' Create a Vocabulary for '''
def __init__(self, type_vocab,pad_token='<PAD>', eos_token='<EOS>', unk_token='<UNK>'): #Initialization of the type of vocabulary
self.type = type_vocab
#self.int2char ={}
self.int2char = []
if pad_token !=None:
self.int2char += [pad_token]
if eos_token !=None:
self.int2char += [eos_token]
if unk_token !=None:
self.int2char += [unk_token]
#self.int2char[1]=eos_token
#self.int2char[2]=unk_token
self.char2int = {}
def __call__(self, text): #When called, adds the values of parameters x_1 and x_2, prints and returns the result
# Join all the sentences together and extract the unique characters from the combined sentences
chars = set(''.join(text))
# Creating a dictionary that maps integers to the characters
self.int2char += list(chars)
# Creating another dictionary that maps characters to integers
self.char2int = {char: ind for ind, char in enumerate(self.int2char)}
vocab = CharVocab('char',None,None,'<UNK>')
vocab(sentences)
print('Length of vocabulary: ', len(vocab.int2char))
print('Int to Char: ', vocab.int2char)
print('Char to Int: ', vocab.char2int)
Save the dictionary
In this example it is not mandatory to save the dictionary inmediately, because it is a fast and easy to reproduce task. But when dealing with a huge corpus and a large dictionary, we should save the dictionary to restore it latter when new experiments will be executed.
Later on when we construct an endpoint which processes a submitted review we will need to make use of the char2int and int2char dictionaries which we have created. As such, we will save them to a file now for future use.
# Check or create the directory where dictionary will be saved
if not os.path.exists(DATA_PATH): # Make sure that the folder exists
os.makedirs(DATA_PATH)
# Save the dictionary to data path dir
with open(os.path.join(DATA_PATH, 'char_dict.pkl'), "wb") as f:
pickle.dump(vocab.char2int, f)
with open(os.path.join(DATA_PATH, 'int_dict.pkl'), "wb") as f:
pickle.dump(vocab.int2char, f)
Create the input data and labels for training
As we're going to predict the next character in the sequence at each time step, we'll have to divide each sentence into:
- Input data: The last input character should be excluded as it does not need to be fed into the model (it is the target label for the last input character)
- Target/Ground Truth Label: One time-step ahead of the Input data as this will be the "correct answer" for the model at each time step corresponding to the input data
def one_hot_encode(indices, dict_size):
''' Define one hot encode matrix for our sequences'''
# Creating a multi-dimensional array with the desired output shape
# Encode every integer with its one hot representation
features = np.eye(dict_size, dtype=np.float32)[indices.flatten()]
# Finally reshape it to get back to the original array
features = features.reshape((*indices.shape, dict_size))
return features
def encode_text(input_text, vocab, one_hot = False):
# Replace every char by its integer value based on the vocabulary
output = [vocab.char2int.get(character,0) for character in input_text]
if one_hot:
# One hot encode every integer of the sequence
dict_size = len(vocab.char2int)
return one_hot_encode(output, dict_size)
else:
return np.array(output)
Now, we can encode our text, replacing every character by the integer value in the dictionary. When we have our dataset unified and prepared, we should do a quick check and see an example of the data our model will be trained on. This is generally a good idea as it allows you to see how each of the further processing steps affects the reviews and it also ensures that the data has been loaded correctly.
# Encode the train dataset
train_data = encode_text(sentences, vocab, one_hot = False)
# Create the input sequence, from 0 to len-1
input_seq=train_data[:-1]
# Create the target sequence, from 1 to len. It is right-shifted one place
target_seq=train_data[1:]
print('\nOriginal text:')
print(sentences[:100])
print('\nEncoded text:')
print(train_data[:100])
print('\nInput sequence:')
print(input_seq[:100])
print('\nTarget sequence:')
print(target_seq[:100])
Lets check our one-hot-encode function that we will use later during the training phase:
print('Encoded characters: ',train_data[100:102])
print('One-hot-encoded characters: ',one_hot_encode(train_data[100:102], len(vocab.int2char)))
Step 3: Upload the data to S3
Now, we will need to upload the training dataset to S3 in order for our training code to access it. For now we will save it locally and we will upload to S3 later on.
Save the processed training dataset locally
It is important to note the format of the data that we are saving as we will need to know it when we write the training code. In our case, we will save the dataset as a pickle object, it is a list containing the whole dataset encoded as an integer value for every character.
# Save the encoded text to a file
encoded_data = os.path.join(DATA_PATH, 'input_data.pkl')
with open(encoded_data, 'wb') as fp:
pickle.dump(train_data, fp)
import sagemaker
# Get the session id
sagemaker_session = sagemaker.Session()
# Get the bucet, in our example the default buack
bucket = sagemaker_session.default_bucket()
# Set the S3 subfolder where our data will be stored
prefix = 'sagemaker/char_level_rnn'
# Get the role for permission
role = sagemaker.get_execution_role()
input_data = sagemaker_session.upload_data(path=DATA_PATH, bucket=bucket, key_prefix=prefix)
NOTE: The cell above uploads the entire contents of our data directory. This includes the char_dict.pkl
and int_dict.pkl
file. This is fortunate as we will need this later on when we create an endpoint that accepts an arbitrary input text. For now, we will just take note of the fact that it resides in the data directory (and so also in the S3 training bucket) and that we will need to make sure it gets saved in the model directory.
Step 4: Build and Train the PyTorch Model
A model in the SageMaker framework, in particular, comprises three objects:
- Model Artifacts,
- Training Code, and
- Inference Code,
each of which interact with one another.
We will start by implementing our own neural network in PyTorch along with a training script. For the purposes of this project we need to provide the model object implementation in the model.py
file, inside of the train
folder. You can see the provided implementation by running the cell below.
!pygmentize train/model.py
Create a batch data generator
When training on the dataset, we need to extract a batch size examples from the inputs and targets, forward and backward the RNN on them and then repite the iteration with another batch size examples. A batch generator will help us to extract a batch size examples from our datasets.
The next code defines our batch generator:
def batch_generator_sequence(features_seq, label_seq, batch_size, seq_len):
"""Generator function that yields batches of data (input and target)
Args:
batch_size (int): number of examples (in this case, sentences) per batch.
max_length (int): maximum length of the output tensor.
NOTE: max_length includes the end-of-sentence character that will be added
to the tensor.
Keep in mind that the length of the tensor is always 1 + the length
of the original line of characters.
input_lines (list): list of the input data to group into batches.
target_lines (list): list of the target data to group into batches.
shuffle (bool, optional): True if the generator should generate random batches of data. Defaults to True.
Yields:
tuple: two copies of the batch and the mask
"""
# calculate the number of batches we can supply
num_batches = len(features_seq) // (batch_size * seq_len)
if num_batches == 0:
raise ValueError("No batches created. Use smaller batch size or sequence length.")
# calculate effective length of text to use
rounded_len = num_batches * batch_size * seq_len
# Reshape the features matrix in batch size x num_batches * seq_len
x = np.reshape(features_seq[: rounded_len], [batch_size, num_batches * seq_len])
# Reshape the target matrix in batch size x num_batches * seq_len
y = np.reshape(label_seq[: rounded_len], [batch_size, num_batches * seq_len])
epoch = 0
while True:
# roll so that no need to reset rnn states over epochs
x_epoch = np.split(np.roll(x, -epoch, axis=0), num_batches, axis=1)
y_epoch = np.split(np.roll(y, -epoch, axis=0), num_batches, axis=1)
for batch in range(num_batches):
yield x_epoch[batch], y_epoch[batch]
epoch += 1
def train_main(model, optimizer, loss_fn, batch_data, num_batches, val_batches, batch_size, seq_len, n_epochs, clip_norm, device):
# Training Run
for epoch in range(1, n_epochs + 1):
start_time = time.time()
# Store the loss in every batch iteration
epoch_losses =[]
# Init the hidden state
hidden = model.init_state(device, batch_size)
# Train all the batches in every epoch
for i in range(num_batches-val_batches):
#print('Batch :', i)
# Get the next batch data for input and target
input_batch, target_batch = next(batch_data)
# Onr hot encode the input data
input_batch = one_hot_encode(input_batch, model.vocab_size)
# Tranform to tensor
input_data = torch.from_numpy(input_batch)
target_data = torch.from_numpy(target_batch)
# Create a new variable for the hidden state, necessary to calculate the gradients
hidden = tuple(([Variable(var.data) for var in hidden]))
# Move the input data to the device
input_data = input_data.to(device)
#print('Input shape: ', input_data.shape)
#print('Hidden shape: ', hidden[0].shape, hidden[1].shape)
# Set the model to train and prepare the gradients
model.train()
optimizer.zero_grad() # Clears existing gradients from previous epoch
# Pass Fordward the RNN
output, hidden = model(input_data, hidden)
#print('Output shape: ', output.shape)
output = output.to(device)
#print('Output shape: ', output.shape)
#print('Target shape; ', target_data.shape)
# Move the target data to the device
target_data = target_data.to(device)
#print('Target shape; ', target_data.shape)
target_data = torch.reshape(target_data, (batch_size*seq_len,))
#print('Target shape; ', target_data.shape)
loss = loss_fn(output, target_data.view(batch_size*seq_len))
#print(loss)
# Save the loss
epoch_losses.append(loss.item()) #data[0]
loss.backward() # Does backpropagation and calculates gradients
# clip gradient norm
nn.utils.clip_grad_norm_(model.parameters(), clip_norm)
optimizer.step() # Updates the weights accordingly
# Now, when epoch is finished, evaluate the model on validation data
model.eval()
val_hidden = model.init_state(device, batch_size)
val_losses = []
for i in range(val_batches):
# Get the next batch data for input and target
input_batch, target_batch = next(batch_data)
# Onr hot encode the input data
input_batch = one_hot_encode(input_batch, model.vocab_size)
# Tranform to tensor
input_data = torch.from_numpy(input_batch)
target_data = torch.from_numpy(target_batch)
# Create a new variable for the hidden state, necessary to calculate the gradients
hidden = tuple(([Variable(var.data) for var in val_hidden]))
# Move the input data to the device
input_data = input_data.to(device)
# Pass Fordward the RNN
output, hidden = model(input_data, hidden)
#print('Output shape: ', output.shape)
output = output.to(device)
#print('Output shape: ', output.shape)
#print('Target shape; ', target_data.shape)
# Move the target data to the device
target_data = target_data.to(device)
#print('Target shape; ', target_data.shape)
target_data = torch.reshape(target_data, (batch_size*seq_len,))
#print('Target shape; ', target_data.shape)
loss = loss_fn(output, target_data.view(batch_size*seq_len))
#print(loss)
# Save the loss
val_losses.append(loss.item()) #data[0]
model.train()
#if epoch%2 == 0:
print('Epoch: {}/{}.............'.format(epoch, n_epochs), end=' ')
print('Time: {:.4f}'.format(time.time() - start_time), end=' ')
print("Train Loss: {:.4f}".format(np.mean(epoch_losses)), end=' ')
print("Val Loss: {:.4f}".format(np.mean(val_losses)))
return epoch_losses
Supposing we have the training method above, we will test that it is working by writing a bit of code in the notebook that executes our training method on a small sample training set. Because we are not using a GPU and we are just testing the training code we take 50,000 characters from the input data. The reason for doing this in the notebook is so that we have an opportunity to fix any errors that arise early when they are easier to diagnose.
import torch
from torch import nn
from torch.autograd import Variable
from tqdm import tqdm
from train.model import RNNModel
# Set a seed to reproduce experiments
torch.manual_seed(seed)
# Set the device for training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Create the model
model = RNNModel(38, 38, 16, 1).to(device)
# Define Loss, Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
# Limit the size of our input sequence for this simple test
input_seq = input_seq[:50000]
target_seq = target_seq[:50000]
# Calculate the number of batches to train
batch_size=32
maxlen=64
num_batches = len(input_seq) // (batch_size*maxlen)
# Calculate the validation batches
val_frac = 0.1
val_batches = int(num_batches*val_frac)
# Create the batch data generator
batch_data = batch_generator_sequence(input_seq, target_seq, batch_size, maxlen)
# Train the model
losses = train_main(model, optimizer, criterion, batch_data, num_batches, val_batches, batch_size, maxlen, 5, 5, device)
In order to construct a PyTorch model using SageMaker we must provide SageMaker with a training script. We may optionally include a directory which will be copied to the container and from which our training code will be run. When the training container is executed it will check the uploaded directory (if there is one) for a requirements.txt
file and install any required Python libraries, after which the training script will be run.
In this example, we only requiere the numpy package.
Training the model
When a PyTorch model is constructed in SageMaker, an entry point must be specified. This is the Python file which will be executed when the model is trained. Inside of the train
directory is a file called train.py
which contains most of the necessary code to train our model.
NOTICE: The train()
method written above and has been pasted into the train/train.py
file where required.
The way that SageMaker passes hyperparameters to the training script is by way of arguments. These arguments can then be parsed and used in the training script. To see how this is done take a look at the provided train/train.py
file.
First, we need to set which type of instance will run our training:
- Local: We do not launch a real compute instance, just a container where our scripts will run. This scenario is very useful to test that the train script is working fine because it is faster to run a container than an compute instance. But finally, when we confirm that everything is working we must change the instance type for a "real" training instance.
- ml.m4.4xlarge: It is a CPU instance
- ml.p2.xlarge: A GPU instance to use when managing a big volume of data to train on.
# Select the type of instance to use for training
#instance_type='ml.m4.4xlarge' # CPU instance
instance_type='ml.p2.xlarge' # GPU instance
#instance_type='local'
from sagemaker.pytorch import PyTorch
estimator = PyTorch(entry_point="train.py",
source_dir="train",
role=role,
framework_version='0.4.0',
train_instance_count=1,
train_instance_type=instance_type,
hyperparameters={
'epochs': 50,
'hidden_dim': 512,
'n_layers': 2,
})
estimator.fit({'training': input_data})
Step 6 - Deploy the model for inference
Now that our model is trained, it's time to create some custom inference code so that we can send the model a initial string which has not been processed and determine the next caracters on the string.
By default the estimator which we created, when deployed, will use the entry script and directory which we provided when creating the model. However, since we wish to accept a string as input and our model expects a processed review, we need to write some custom inference code.
We will store the code that we write in the serve
directory. Provided in this directory is the model.py
file that we used to construct our model, a utils.py
file which contains the one-hot-encode
and encode_text
pre-processing functions which we used during the initial data processing, and predict.py
, the file which will contain our custom inference code. Note also that requirements.txt
is present which will tell SageMaker what Python libraries are required by our custom inference code.
When deploying a PyTorch model in SageMaker, you are expected to provide four functions which the SageMaker inference container will use.
-
model_fn
: This function is the same function that we used in the training script and it tells SageMaker how to load our model. This function must be calledmodel_fn()
and takes as its only parameter a path to the directory where the model artifacts are stored. This function must also be present in the python file which we specified as the entry point. It also reads the saved dictionaries because they could be used during the inference process. -
input_fn
: This function receives the raw serialized input that has been sent to the model's endpoint and its job is to de-serialize and make the input available for the inference code. Latter we will mention what our input_fn function is doing. -
output_fn
: This function takes the output of the inference code and its job is to serialize this output and return it to the caller of the model's endpoint. -
predict_fn
: The heart of the inference script, this is where the actual prediction is done and is the function which you will need to complete.
For the simple example that we are constructing during this project, the input_fn
and output_fn
methods are relatively straightforward. We require being able to accept a string as input, composed by the desired length of the output and the initial string. And we expect to return a single string as output, the new text generated. You might imagine though that in a more complex application the input or output may be image data or some other binary data which would require some effort to serialize.
Writing inference code
Before writing our custom inference code, we will begin by taking a look at the code which has been provided.
!pygmentize serve/predict.py
As mentioned earlier, the model_fn
method is the same as the one provided in the training code and the input_fn
and output_fn
methods are very simple. Finally we must build a predict_fn
method that will receive the input string, encode it (char2int), one-hot-encode and send it to the model. Every output will be decode (int2char) and appended to the final output string.
Make sure that you save the completed file as predict.py
in the serve
directory.
Deploying the model
Now that the custom inference code has been written, we will create and deploy our model. To begin with, we need to construct a new PyTorchModel object which points to the model artifacts created during training and also points to the inference code that we wish to use. Then we can call the deploy method to launch the deployment container.
NOTE: The default behaviour for a deployed PyTorch model is to assume that any input passed to the predictor is a numpy
array. In our case we want to send a string so we need to construct a simple wrapper around the RealTimePredictor
class to accomodate simple strings. In a more complicated situation you may want to provide a serialization object, for example if you wanted to sent image data.
NOTE: When deploying a model you are asking SageMaker to launch an compute instance that will wait for data to be sent to it. As a result, this compute instance will continue to run until you shut it down. This is important to know since the cost of a deployed endpoint depends on how long it has been running for.
In other words If you are no longer using a deployed endpoint, shut it down!
Now, we can deploy our trained model
Loading a previously trained model
In many situations, you have trained the model in another execution of this notebook or you shutdown the notebook while the model was training. So the estimator variable is empty or undefined and then, you want to restore or use a previous training job and deploy it. In the next cell, we attach that trained model to the estimator variable and continue the necessary steps to launch and deploy the model.
# Attach the estimator to a oreviously trained job
from sagemaker.pytorch import PyTorch
my_training_job_name = 'sagemaker-pytorch-2020-09-02-19-49-57-475'
estimator = PyTorch.attach(my_training_job_name)
from sagemaker.predictor import RealTimePredictor
from sagemaker.pytorch import PyTorchModel
class StringPredictor(RealTimePredictor):
def __init__(self, endpoint_name, sagemaker_session):
super(StringPredictor, self).__init__(endpoint_name, sagemaker_session, content_type='text/plain')
model = PyTorchModel(model_data=estimator.model_data,
role = role,
framework_version='0.4.0',
entry_point='predict.py',
source_dir='serve',
predictor_cls=StringPredictor)
predictor = model.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge')
Step 7 - Use the model for testing
Now that we have deployed our model with the custom inference code, we should test it to see if everything is working. Here we test our model by creating an initial string and send it to the endpoint, then collect the result. But we also want to tell the inference how long should be the expected output.
It means that we need to send to our predict function not only the initial string but the the length of the output. As the expected input by the deserialized function, input_fn, is a string and we are looking for a simple solution, our input data will be a string composed by: the length of the output+'-'+initial string.
Now, it is time to test our model, sending a very common initial string: you are
.
test_text = '100-you are '
new_text = predictor.predict(test_text).decode('utf-8')
print(new_text)
Another example to try out is to send the model a text included in the training dataset and see what the model predicts:
print('Text: ',sentences[963:1148])
init_text = sentences[963:1148]
print('Init text: ', sentences[963:1020])
test_text = str(len(init_text))+'-'+init_text
new_text = predictor.predict(test_text).decode('utf-8')
print(new_text)
We can check that the model "remember" the texts during the training, it can mostly reproduce the original text.
Now that we know our endpoint is working as expected, we can set up a web page or app that will interact with it.
Make sure to skip down to the end of this notebook and shut down your endpoint. You can deploy it again when you come back.
predictor.delete_endpoint()