Encoding DNA sequences for DL training

pytorch
statistics
Author

Temi

Published

September 29, 2023

Note

This post is still under construction; I am adding sutff as I get the time to.

When doing DL with DNA sequences, you want to represent the sequences in formats that the computer can process. Since DNA sequences (ACGT) can be thought of as , we need to one-hot encode them

show code
import random
import numpy as np
import pandas as pd
show code
random.seed(2023)

Here, I define a function to generate a sequence of a cerain length

show code
def generate_random_sequence_inputs(size=10):
    r_seq_list = np.random.choice(['A', 'G', 'T', 'C'], size)
    return(''.join(r_seq_list))
show code
generate_random_sequence_inputs(size=5)
'GGGCT'

Our data

I’ll generate a random DNA sequence of length 500

show code
dna_one = generate_random_sequence_inputs(size=501)

Encoding the data

There are a number of ways sequence data can be encoded.

one-hot encoding

show code
from sklearn.preprocessing import OneHotEncoder
show code
encoder = OneHotEncoder(sparse_output=False)
show code
dna_one_array = np.array(list(dna_one)).reshape(-1, 1)
dna_one_array.shape
(501, 1)
show code
dna_one_enc = encoder.fit_transform(dna_one_array)
print(dna_one_enc)
[[1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 0. 1.]
 ...
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]]
show code
dna_one_array[0:4, ]
array([['A'],
       ['A'],
       ['T'],
       ['G']], dtype='<U1')

TF-IDF

TF-IDF

We can use k-mers

show code
dna_one = generate_random_sequence_inputs(size=5001)
dna_two = generate_random_sequence_inputs(size=5001)
show code
# k = 5
# offset = 3
# for i in range(0, len(dna_one), offset):
#     print(f'{i}:{i+k}')
show code
def create_kmers(seq, k=3, offset=1, space=True):
    if offset == 0:
        raise('ERROR - offset value must be greater than 0')
    else:
        out = [seq[i:(i+k)] for i in range(0, len(seq), offset)]
        if space == True:
            out = ' '.join(out)
        return(out)
show code
dna_one_3mer = create_kmers(dna_one, k=9, offset=4)
dna_two_3mer = create_kmers(dna_two, k=9, offset=4)
dna_document = [dna_one_3mer, dna_two_3mer]
show code
docs = ['dna_one', 'dna_two']
show code
from sklearn.feature_extraction.text import TfidfVectorizer
show code
vectorizer = TfidfVectorizer()
show code
vector = vectorizer.fit_transform(dna_document)
show code
vectorizer.get_feature_names_out()
array(['aaaaactgg', 'aaaaatttg', 'aaaacatag', ..., 'ttttagcaa',
       'ttttcatcc', 'ttttctgtg'], dtype=object)
show code
tfidf_dt = pd.DataFrame(vector.toarray(), columns=vectorizer.get_feature_names_out(), index=docs)
tfidf_dt
aaaaactgg aaaaatttg aaaacatag aaaaccgcg aaaacgttg aaaagaccc aaaagcaaa aaaagcaag aaaagggtg aaaagtccc ... tttggcata tttgggtgc tttgtacaa tttgtggtt tttgttaag ttttaaaga ttttaaggg ttttagcaa ttttcatcc ttttctgtg
dna_one 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.028261 0.028261 0.000000 ... 0.000000 0.028261 0.000000 0.000000 0.028261 0.000000 0.028261 0.028261 0.000000 0.028261
dna_two 0.028261 0.028261 0.028261 0.028261 0.028261 0.028261 0.028261 0.000000 0.000000 0.028261 ... 0.028261 0.000000 0.028261 0.028261 0.000000 0.028261 0.000000 0.000000 0.028261 0.000000

2 rows × 2486 columns

show code
tfidf_dt.loc['kmer_frequency'] = (tfidf_dt > 0).sum()
tfidf_dt.T
dna_one dna_two kmer_frequency
aaaaactgg 0.000000 0.028261 1.0
aaaaatttg 0.000000 0.028261 1.0
aaaacatag 0.000000 0.028261 1.0
aaaaccgcg 0.000000 0.028261 1.0
aaaacgttg 0.000000 0.028261 1.0
... ... ... ...
ttttaaaga 0.000000 0.028261 1.0
ttttaaggg 0.028261 0.000000 1.0
ttttagcaa 0.028261 0.000000 1.0
ttttcatcc 0.000000 0.028261 1.0
ttttctgtg 0.028261 0.000000 1.0

2486 rows × 3 columns

show code
x = np.array(tfidf_dt.T['dna_one'].tolist()).reshape(1,-1)
y = np.array(tfidf_dt.T['dna_two'].tolist()).reshape(1,-1)

Now we can measure how close these two DNA sequences are using, say, the cosine similarity. The cosine similarity between two vectors is defined as follows:

\[\text{cosine distance}(\vec{a},\vec{b}) = 1 - \text{cosine similarity}(\vec{a},\vec{b}) = \frac{\vec{a}\cdot\vec{b}}{|\vec{a}||\vec{b}|}\]

show code
from sklearn.metrics import pairwise
pairwise.cosine_similarity(x,y)
array([[0.00323466]])

Continuous bag-of-words, or CBOW

A word2vec method

CBOW learns embeddings by a context-target mapping. i.e., you provide a context, and it is expected to provide the target.

Here is our input data

show code
import torch
import torch.nn as nn
import torch.optim as optim
import torchtext
import tqdm
import logging
import numpy as np
import dataclasses
show code
responses = ["Hold fast to dreams, for if dreams die, life is a broken-winged bird that cannot fly.", "No bird soars too high if he soars with his own wings.", "A bird does not sing because it has an answer, it sings because it has a song."]
print(responses[0])
Hold fast to dreams, for if dreams die, life is a broken-winged bird that cannot fly.

So, an example of context-target pairs would be: - “Hold” “to” ==> “fast” - “if” “die” ==> “dreams”

show code
input_file = '../../../../../external_data/word2vec/on_the_sweeny_wire.txt'
show code
class TextPreProcessor:
    def __init__(self, input_file) -> None:
        self.input_file = input_file

    def generate_tokens(self):

        if isinstance(self.input_file, list):
            for line in self.input_file:
                yield line.split()
        else:
            with open(self.input_file, encoding="utf-8") as f:
                for line in f:
                    line = line.replace("\\", "")
                    yield line.strip().split()

    # def generate_corpus(self):

    def build_vocab(self):
        vlist = list(self.generate_tokens())
        #vlist = [f for f in vlist for f in f]
        vocab = torchtext.vocab.build_vocab_from_iterator(vlist, specials=["<unk>"], min_freq=1)
        return vocab
show code
text_corpus = TextPreProcessor([responses[0]])
text_vocab = text_corpus.build_vocab()
#list(pp.generate_tokens())[0:3]
show code
def concat(*iterables):
    for iterable in iterables:
        yield from iterable

def one_hot_encode(id, vocab_size):
    res = [0] * vocab_size
    res[id] = 1
    return res
show code
one_hot_encode(4, 8)
[0, 0, 0, 0, 1, 0, 0, 0]
show code
class TrainingData:
    def __init__(self, tokens, word_to_id, context_length) -> None:
        self.tokens = tokens
        self.word_to_id = word_to_id
        self.context_length = context_length
        self.XY = list(self.generate_training_tokens())


    def one_hot_encode(self, id, vocab_size):
        res = [0] * vocab_size
        res[id] = 1
        return res
    
    def generate_training_words(self):
        self.data = []
        n_tokens = len(self.tokens)
        
        for i in range(self.context_length, n_tokens - self.context_length):
            context = (
                [self.tokens[i - j - 1] for j in range(self.context_length)]
                + [self.tokens[i + j + 1] for j in range(self.context_length)]
            )
            target = self.tokens[i]

            yield (context, target)

    def generate_training_tokens(self):
        for generated in self.generate_training_words():
            context, target = generated
            context = torch.asarray([self.word_to_id[t] for t in context])
            target = torch.asarray(self.word_to_id[target])

            yield (context, target)
        
    def generate_training_encoded(self):
        for generated in self.generate_training_tokens():
            context, target = generated
            context = [self.one_hot_encode(t, len(self.word_to_id)) for t in context]
            #print([t for t in context])
            target = self.one_hot_encode(target, len(self.word_to_id)) 

            yield (torch.asarray(context), torch.asarray(target))

    def __len__(self): # the dataloader needs to know the number of observations you have
        return len(list(self.generate_training_tokens()))

    def __getitem__(self, idx): # this is what returns just one observation or one unit of training
        return(self.XY[idx][0], self.XY[idx][1]) # essentially, I am just slicing the np array
        
show code
tokens = [f for f in list(text_corpus.generate_tokens()) for f in f]
#tokens = list(text_corpus.generate_tokens())
word_to_id = text_corpus.build_vocab().get_stoi()
show code
training_data = TrainingData(tokens, word_to_id, context_length=2)
#list(training_data.generate_training_words())
show code
training_data.__len__(), training_data.__getitem__(3)
(12, (tensor([11,  8,  7,  6]), tensor(12)))
show code
list(training_data.generate_training_words())[0]
(['fast', 'Hold', 'dreams,', 'for'], 'to')

So, I can train an embedding on the onehot encoded text or on the tokens, directly.

show code
@dataclasses.dataclass
class HyperparametersConfig:
    num_epochs: int = 3
    context_size: int = 2
    embedding_dim: int = 100
    learning_rate: int = 0.001
    vocab_size: int = len(word_to_id)
    num_layers: int = 3
    device: str = 'cuda' if torch.cuda.is_available() else 'cpu'
    bias: bool = False

config = HyperparametersConfig()
config
HyperparametersConfig(num_epochs=3, context_size=2, embedding_dim=100, learning_rate=0.001, vocab_size=17, num_layers=3, device='cuda', bias=False)
show code
class CBOW(torch.nn.Module):
    def __init__(self, config):
        super(CBOW, self).__init__()
        self.config = config
        self.embeddings = nn.Embedding(num_embeddings=self.config.vocab_size, embedding_dim=self.config.embedding_dim)
        #self.linear = nn.Linear(in_features=self.config.embedding_dim, out_features=self.config.vocab_size)
        #self.relu_activation = nn.ReLU()

        self.linears = nn.ModuleList([nn.Linear(self.config.embedding_dim, self.config.vocab_size)])
        self.linears.extend([
            nn.ReLU(nn.Linear(self.config.vocab_size, self.config.vocab_size))
                for i in range(1, self.config.num_layers-1)
        ])
        self.linears.append(nn.Linear(self.config.vocab_size, self.config.vocab_size))
        

    def forward(self, x):
        x = self.embeddings(x)
        for layer in self.linears:
            x = layer(x)
        out = torch.nn.functional.log_softmax(x, dim=0)
        return(out)
show code
cbow_model = CBOW(config)
cbow_model
CBOW(
  (embeddings): Embedding(17, 100)
  (linears): ModuleList(
    (0): Linear(in_features=100, out_features=17, bias=True)
    (1): ReLU(
      inplace=True
      (inplace): Linear(in_features=17, out_features=17, bias=True)
    )
    (2): Linear(in_features=17, out_features=17, bias=True)
  )
)
show code
X = list(training_data.generate_training_tokens())
X = torch.LongTensor(X[0][0])
X
tensor([ 9,  1,  8, 11])
show code
cbow_model(X)
tensor([[-1.3945, -0.9238, -1.3686, -1.0968, -1.1188, -1.3827, -1.3699, -1.6717,
         -1.0653, -1.6560, -1.2714, -1.3249, -1.8447, -1.0406, -1.4343, -1.3614,
         -1.2163],
        [-1.1671, -1.4844, -1.4347, -1.6444, -1.6739, -1.3932, -1.2698, -1.4112,
         -1.3617, -1.4124, -1.5198, -1.4879, -1.4759, -1.5775, -1.4362, -1.2266,
         -1.3263],
        [-1.5651, -2.0091, -1.4869, -1.4529, -1.3914, -1.3272, -1.6173, -1.0889,
         -1.8360, -1.1119, -1.7909, -1.4547, -1.0009, -2.0002, -1.5587, -1.5392,
         -1.6190],
        [-1.4623, -1.4179, -1.2684, -1.4311, -1.4394, -1.4456, -1.3222, -1.4625,
         -1.4284, -1.4413, -1.0966, -1.2915, -1.4032, -1.1876, -1.1601, -1.4445,
         -1.4266]], grad_fn=<LogSoftmaxBackward0>)
show code
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
show code
mydataloader = DataLoader(training_data, batch_size=3, shuffle=True)
show code
for i, batch in enumerate(mydataloader):
    print(f'batch {i}: number of observations and ground truth are {batch[0].size(0)} and {batch[1].size(0)} respectively')
batch 0: number of observations and ground truth are 3 and 3 respectively
batch 1: number of observations and ground truth are 3 and 3 respectively
batch 2: number of observations and ground truth are 3 and 3 respectively
batch 3: number of observations and ground truth are 3 and 3 respectively
show code
for epoch in config.num_epochs:
    print(f'INFO - Epoch {epoch}')
    for batch, data in enumerate(mydataloader):
        cbow_model.zero_grad()
        X, y = data
        loss = cbow_model(X)
        
        loss.backward()
        optimizer.step()
        losses.append(loss.data)
show code
class CBOW(torch.nn.Module):
    def __init__(self): # we pass in vocab_size and embedding_dim as hyperparams
        super(CBOW, self).__init__()
        self.num_epochs = 3
        self.context_size = 2 # 2 words to the left, 2 words to the right
        self.embedding_dim = 100 # Size of your embedding vector
        self.learning_rate = 0.001
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.vocab = TextPreProcessor().build_vocab()
        self.word_to_ix = self.vocab.get_stoi()
        self.ix_to_word = self.vocab.get_itos()
        self.vocab_list = list(self.vocab.get_stoi().keys())
        self.vocab_size = len(self.vocab)
        print(f'Vocabulary size is {self.vocab_size}')

        self.model = None
        # out: 1 x embedding_dim
        self.embeddings = nn.Embedding(self.vocab_size, self.embedding_dim) # initialize an Embedding matrix based on our inputs
        self.linear1 = nn.Linear(self.embedding_dim, 128)
        self.activation_function1 = nn.ReLU()
        # out: 1 x vocab_size
        self.linear2 = nn.Linear(128, self.vocab_size)
        self.activation_function2 = nn.LogSoftmax(dim=-1)


    def make_context_vector(self, context, word_to_ix) -> torch.LongTensor:
        """
        For each word in the vocab, find sliding windows of [-2,1,0,1,2] indexes
        relative to the position of the word
        :param vocab: list of words in the vocab
        :return: torch.LongTensor
        """
        idxs = [word_to_ix[w] for w in context]
        tensor = torch.LongTensor(idxs)
    
    def train_model(self):
        # Loss and optimizer
        self.model = CBOW().to(self.device)
        optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        loss_function = nn.NLLLoss()

        logging.warning('Building training data')
        data = self.generate_training_data()

        logging.warning('Starting forward pass')
        for epoch in tqdm(range(self.num_epochs)):
            # we start tracking how accurate our initial words are
            total_loss = 0
            # for the x, y in the training data:
            for context, target in data:
                context_vector = self.make_context_vector(context, self.word_to_ix)
                # we look at loss
                log_probs = self.model(context_vector)
                # compare loss
                total_loss += loss_function(
                    log_probs, torch.tensor([self.word_to_ix[target]])
                )
            # optimize at the end of each epoch
            optimizer.zero_grad()
            total_loss.backward()
            optimizer.step()
            # Log out some metrics to see if loss decreases
            logging.warning("end of epoch {} | loss {:2.3f}".format(epoch, total_loss))
            torch.save(self.model.state_dict(), self.model_path)
            logging.warning(f'Save model to {self.model_path}')
show code
Vocabulary size is 9
show code
[([9, 1, 8, 11], 16),
 ([16, 9, 11, 12], 8),
 ([8, 16, 12, 7], 11),
 ([11, 8, 7, 6], 12),
 ([12, 11, 6, 14], 7),
 ([7, 12, 14, 13], 6),
 ([6, 7, 13, 2], 14),
 ([14, 6, 2, 4], 13),
 ([13, 14, 4, 3], 2),
 ([2, 13, 3, 15], 4),
 ([4, 2, 15, 5], 3),
 ([3, 4, 5, 10], 15)]
show code
cbow_model.train_model()
WARNING:root:Building training data
AttributeError: 'CBOW' object has no attribute 'build_training_data'