show code
import random
import numpy as np
import pandas as pd
Temi
September 29, 2023
This post is still under construction; I am adding sutff as I get the time to.
When doing DL with DNA sequences, you want to represent the sequences in formats that the computer can process. Since DNA sequences (ACGT) can be thought of as , we need to one-hot encode them
Here, I define a function to generate a sequence of a cerain length
I’ll generate a random DNA sequence of length 500
There are a number of ways sequence data can be encoded.
[[1. 0. 0. 0.]
[1. 0. 0. 0.]
[0. 0. 0. 1.]
...
[0. 1. 0. 0.]
[1. 0. 0. 0.]
[1. 0. 0. 0.]]
TF-IDF
We can use k-mers
array(['aaaaactgg', 'aaaaatttg', 'aaaacatag', ..., 'ttttagcaa',
'ttttcatcc', 'ttttctgtg'], dtype=object)
aaaaactgg | aaaaatttg | aaaacatag | aaaaccgcg | aaaacgttg | aaaagaccc | aaaagcaaa | aaaagcaag | aaaagggtg | aaaagtccc | ... | tttggcata | tttgggtgc | tttgtacaa | tttgtggtt | tttgttaag | ttttaaaga | ttttaaggg | ttttagcaa | ttttcatcc | ttttctgtg | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
dna_one | 0.000000 | 0.000000 | 0.000000 | 0.000000 | 0.000000 | 0.000000 | 0.000000 | 0.028261 | 0.028261 | 0.000000 | ... | 0.000000 | 0.028261 | 0.000000 | 0.000000 | 0.028261 | 0.000000 | 0.028261 | 0.028261 | 0.000000 | 0.028261 |
dna_two | 0.028261 | 0.028261 | 0.028261 | 0.028261 | 0.028261 | 0.028261 | 0.028261 | 0.000000 | 0.000000 | 0.028261 | ... | 0.028261 | 0.000000 | 0.028261 | 0.028261 | 0.000000 | 0.028261 | 0.000000 | 0.000000 | 0.028261 | 0.000000 |
2 rows × 2486 columns
dna_one | dna_two | kmer_frequency | |
---|---|---|---|
aaaaactgg | 0.000000 | 0.028261 | 1.0 |
aaaaatttg | 0.000000 | 0.028261 | 1.0 |
aaaacatag | 0.000000 | 0.028261 | 1.0 |
aaaaccgcg | 0.000000 | 0.028261 | 1.0 |
aaaacgttg | 0.000000 | 0.028261 | 1.0 |
... | ... | ... | ... |
ttttaaaga | 0.000000 | 0.028261 | 1.0 |
ttttaaggg | 0.028261 | 0.000000 | 1.0 |
ttttagcaa | 0.028261 | 0.000000 | 1.0 |
ttttcatcc | 0.000000 | 0.028261 | 1.0 |
ttttctgtg | 0.028261 | 0.000000 | 1.0 |
2486 rows × 3 columns
Now we can measure how close these two DNA sequences are using, say, the cosine similarity. The cosine similarity between two vectors is defined as follows:
\[\text{cosine distance}(\vec{a},\vec{b}) = 1 - \text{cosine similarity}(\vec{a},\vec{b}) = \frac{\vec{a}\cdot\vec{b}}{|\vec{a}||\vec{b}|}\]
CBOW learns embeddings by a context-target mapping. i.e., you provide a context, and it is expected to provide the target.
Here is our input data
Hold fast to dreams, for if dreams die, life is a broken-winged bird that cannot fly.
So, an example of context-target pairs would be: - “Hold” “to” ==> “fast” - “if” “die” ==> “dreams”
class TextPreProcessor:
def __init__(self, input_file) -> None:
self.input_file = input_file
def generate_tokens(self):
if isinstance(self.input_file, list):
for line in self.input_file:
yield line.split()
else:
with open(self.input_file, encoding="utf-8") as f:
for line in f:
line = line.replace("\\", "")
yield line.strip().split()
# def generate_corpus(self):
def build_vocab(self):
vlist = list(self.generate_tokens())
#vlist = [f for f in vlist for f in f]
vocab = torchtext.vocab.build_vocab_from_iterator(vlist, specials=["<unk>"], min_freq=1)
return vocab
class TrainingData:
def __init__(self, tokens, word_to_id, context_length) -> None:
self.tokens = tokens
self.word_to_id = word_to_id
self.context_length = context_length
self.XY = list(self.generate_training_tokens())
def one_hot_encode(self, id, vocab_size):
res = [0] * vocab_size
res[id] = 1
return res
def generate_training_words(self):
self.data = []
n_tokens = len(self.tokens)
for i in range(self.context_length, n_tokens - self.context_length):
context = (
[self.tokens[i - j - 1] for j in range(self.context_length)]
+ [self.tokens[i + j + 1] for j in range(self.context_length)]
)
target = self.tokens[i]
yield (context, target)
def generate_training_tokens(self):
for generated in self.generate_training_words():
context, target = generated
context = torch.asarray([self.word_to_id[t] for t in context])
target = torch.asarray(self.word_to_id[target])
yield (context, target)
def generate_training_encoded(self):
for generated in self.generate_training_tokens():
context, target = generated
context = [self.one_hot_encode(t, len(self.word_to_id)) for t in context]
#print([t for t in context])
target = self.one_hot_encode(target, len(self.word_to_id))
yield (torch.asarray(context), torch.asarray(target))
def __len__(self): # the dataloader needs to know the number of observations you have
return len(list(self.generate_training_tokens()))
def __getitem__(self, idx): # this is what returns just one observation or one unit of training
return(self.XY[idx][0], self.XY[idx][1]) # essentially, I am just slicing the np array
(12, (tensor([11, 8, 7, 6]), tensor(12)))
(['fast', 'Hold', 'dreams,', 'for'], 'to')
So, I can train an embedding on the onehot encoded text or on the tokens, directly.
@dataclasses.dataclass
class HyperparametersConfig:
num_epochs: int = 3
context_size: int = 2
embedding_dim: int = 100
learning_rate: int = 0.001
vocab_size: int = len(word_to_id)
num_layers: int = 3
device: str = 'cuda' if torch.cuda.is_available() else 'cpu'
bias: bool = False
config = HyperparametersConfig()
config
HyperparametersConfig(num_epochs=3, context_size=2, embedding_dim=100, learning_rate=0.001, vocab_size=17, num_layers=3, device='cuda', bias=False)
class CBOW(torch.nn.Module):
def __init__(self, config):
super(CBOW, self).__init__()
self.config = config
self.embeddings = nn.Embedding(num_embeddings=self.config.vocab_size, embedding_dim=self.config.embedding_dim)
#self.linear = nn.Linear(in_features=self.config.embedding_dim, out_features=self.config.vocab_size)
#self.relu_activation = nn.ReLU()
self.linears = nn.ModuleList([nn.Linear(self.config.embedding_dim, self.config.vocab_size)])
self.linears.extend([
nn.ReLU(nn.Linear(self.config.vocab_size, self.config.vocab_size))
for i in range(1, self.config.num_layers-1)
])
self.linears.append(nn.Linear(self.config.vocab_size, self.config.vocab_size))
def forward(self, x):
x = self.embeddings(x)
for layer in self.linears:
x = layer(x)
out = torch.nn.functional.log_softmax(x, dim=0)
return(out)
CBOW(
(embeddings): Embedding(17, 100)
(linears): ModuleList(
(0): Linear(in_features=100, out_features=17, bias=True)
(1): ReLU(
inplace=True
(inplace): Linear(in_features=17, out_features=17, bias=True)
)
(2): Linear(in_features=17, out_features=17, bias=True)
)
)
tensor([ 9, 1, 8, 11])
tensor([[-1.3945, -0.9238, -1.3686, -1.0968, -1.1188, -1.3827, -1.3699, -1.6717,
-1.0653, -1.6560, -1.2714, -1.3249, -1.8447, -1.0406, -1.4343, -1.3614,
-1.2163],
[-1.1671, -1.4844, -1.4347, -1.6444, -1.6739, -1.3932, -1.2698, -1.4112,
-1.3617, -1.4124, -1.5198, -1.4879, -1.4759, -1.5775, -1.4362, -1.2266,
-1.3263],
[-1.5651, -2.0091, -1.4869, -1.4529, -1.3914, -1.3272, -1.6173, -1.0889,
-1.8360, -1.1119, -1.7909, -1.4547, -1.0009, -2.0002, -1.5587, -1.5392,
-1.6190],
[-1.4623, -1.4179, -1.2684, -1.4311, -1.4394, -1.4456, -1.3222, -1.4625,
-1.4284, -1.4413, -1.0966, -1.2915, -1.4032, -1.1876, -1.1601, -1.4445,
-1.4266]], grad_fn=<LogSoftmaxBackward0>)
batch 0: number of observations and ground truth are 3 and 3 respectively
batch 1: number of observations and ground truth are 3 and 3 respectively
batch 2: number of observations and ground truth are 3 and 3 respectively
batch 3: number of observations and ground truth are 3 and 3 respectively
class CBOW(torch.nn.Module):
def __init__(self): # we pass in vocab_size and embedding_dim as hyperparams
super(CBOW, self).__init__()
self.num_epochs = 3
self.context_size = 2 # 2 words to the left, 2 words to the right
self.embedding_dim = 100 # Size of your embedding vector
self.learning_rate = 0.001
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.vocab = TextPreProcessor().build_vocab()
self.word_to_ix = self.vocab.get_stoi()
self.ix_to_word = self.vocab.get_itos()
self.vocab_list = list(self.vocab.get_stoi().keys())
self.vocab_size = len(self.vocab)
print(f'Vocabulary size is {self.vocab_size}')
self.model = None
# out: 1 x embedding_dim
self.embeddings = nn.Embedding(self.vocab_size, self.embedding_dim) # initialize an Embedding matrix based on our inputs
self.linear1 = nn.Linear(self.embedding_dim, 128)
self.activation_function1 = nn.ReLU()
# out: 1 x vocab_size
self.linear2 = nn.Linear(128, self.vocab_size)
self.activation_function2 = nn.LogSoftmax(dim=-1)
def make_context_vector(self, context, word_to_ix) -> torch.LongTensor:
"""
For each word in the vocab, find sliding windows of [-2,1,0,1,2] indexes
relative to the position of the word
:param vocab: list of words in the vocab
:return: torch.LongTensor
"""
idxs = [word_to_ix[w] for w in context]
tensor = torch.LongTensor(idxs)
def train_model(self):
# Loss and optimizer
self.model = CBOW().to(self.device)
optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
loss_function = nn.NLLLoss()
logging.warning('Building training data')
data = self.generate_training_data()
logging.warning('Starting forward pass')
for epoch in tqdm(range(self.num_epochs)):
# we start tracking how accurate our initial words are
total_loss = 0
# for the x, y in the training data:
for context, target in data:
context_vector = self.make_context_vector(context, self.word_to_ix)
# we look at loss
log_probs = self.model(context_vector)
# compare loss
total_loss += loss_function(
log_probs, torch.tensor([self.word_to_ix[target]])
)
# optimize at the end of each epoch
optimizer.zero_grad()
total_loss.backward()
optimizer.step()
# Log out some metrics to see if loss decreases
logging.warning("end of epoch {} | loss {:2.3f}".format(epoch, total_loss))
torch.save(self.model.state_dict(), self.model_path)
logging.warning(f'Save model to {self.model_path}')
Vocabulary size is 9
[([9, 1, 8, 11], 16),
([16, 9, 11, 12], 8),
([8, 16, 12, 7], 11),
([11, 8, 7, 6], 12),
([12, 11, 6, 14], 7),
([7, 12, 14, 13], 6),
([6, 7, 13, 2], 14),
([14, 6, 2, 4], 13),
([13, 14, 4, 3], 2),
([2, 13, 3, 15], 4),
([4, 2, 15, 5], 3),
([3, 4, 5, 10], 15)]