Spaces:
Runtime error
Runtime error
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
import gradio as gr | |
from torch.utils.data import Dataset | |
from transformers import PreTrainedModel, PretrainedConfig, Trainer, TrainingArguments | |
from datasets import load_dataset | |
import numpy as np | |
# ===================== | |
# 1. Load Dataset Subsets | |
# ===================== | |
dataset = load_dataset("bashyaldhiraj2067/500k_copy_error_dataset") | |
train_subset = dataset["train"].select(range(int(len(dataset["train"]) * 0.1))) | |
test_subset = dataset["test"].select(range(int(len(dataset["test"]) * 0.1))) | |
print(f"Subset train size: {len(train_subset)}") | |
print(f"Subset test size: {len(test_subset)}") | |
# ===================== | |
# 2. Tokenizer | |
# ===================== | |
special_tokens = ["<pad>", "<s>", "</s>", "<unk>"] | |
nepali_chars = list("अआइईउऊऋॠऌॡऎएऐओऔकखगघङचछजझञटठडढणतथदधनपफबभमयरलवशषसह्ािीुूृॄेैोौंंःँ।०१२३४५६७८९,.;?!़ॅंःॊॅऒऽॉड़ॐ॥ऑऱफ़ढ़") | |
char_vocab = special_tokens + nepali_chars | |
char2id = {char: idx for idx, char in enumerate(char_vocab)} | |
id2char = {idx: char for char, idx in char2id.items()} | |
vocab_size = len(char2id) | |
class CharTokenizer: | |
def __init__(self, char2id, id2char, vocab_size): | |
self.char2id = char2id | |
self.id2char = id2char | |
self.pad_token_id = char2id["<pad>"] | |
self.unk_token_id = char2id["<unk>"] | |
self.bos_token_id = char2id["<s>"] | |
self.eos_token_id = char2id["</s>"] | |
self.vocab_size = vocab_size | |
def encode(self, text, max_length=128): | |
ids = [self.char2id.get(ch, self.unk_token_id) for ch in text] | |
ids = ids[:max_length] | |
return ids + [self.pad_token_id] * (max_length - len(ids)) | |
def decode(self, ids): | |
return ''.join([self.id2char.get(i, '') for i in ids if i != self.pad_token_id]) | |
def __call__(self, text, text_target=None, max_length=128): | |
input_ids = self.encode(text, max_length) | |
input_ids = torch.clamp(torch.tensor(input_ids), max=self.vocab_size - 1).tolist() | |
result = {"input_ids": input_ids, "attention_mask": [1 if i != self.pad_token_id else 0 for i in input_ids]} | |
if text_target: | |
labels = self.encode(text_target, max_length) | |
result["labels"] = labels | |
return result | |
tokenizer = CharTokenizer(char2id, id2char, vocab_size=vocab_size) | |
# ===================== | |
# 3. Dataset | |
# ===================== | |
class CopyDataset(Dataset): | |
def __init__(self, data, tokenizer, max_length=128): | |
self.data = data | |
self.tokenizer = tokenizer | |
self.max_length = max_length | |
def __len__(self): | |
return len(self.data) | |
def __getitem__(self, idx): | |
noisy = self.data[idx]['incorrect'] | |
clean = self.data[idx]['correct'] | |
return self.tokenizer(noisy, text_target=clean, max_length=self.max_length) | |
train_dataset = CopyDataset(train_subset, tokenizer) | |
eval_dataset = CopyDataset(test_subset, tokenizer) | |
# ===================== | |
# 4. Transformer with Copy Mechanism | |
# ===================== | |
class TransformerCopyConfig(PretrainedConfig): | |
def __init__(self, vocab_size=len(char2id), **kwargs): | |
super().__init__(**kwargs) | |
self.vocab_size = vocab_size | |
# --- Model Components --- | |
class PositionalEncoding(nn.Module): | |
def __init__(self, d_model, max_len=512): | |
super().__init__() | |
pe = torch.zeros(max_len, d_model) | |
position = torch.arange(0, max_len).unsqueeze(1) | |
div_term = torch.exp(torch.arange(0, d_model, 2) * (-torch.log(torch.tensor(10000.0)) / d_model)) | |
pe[:, 0::2] = torch.sin(position * div_term) | |
pe[:, 1::2] = torch.cos(position * div_term) | |
self.register_buffer('pe', pe.unsqueeze(0)) | |
def forward(self, x): | |
return x + self.pe[:, :x.size(1)] | |
class TransformerCopyModel(nn.Module): | |
def __init__(self, vocab_size, d_model=256, nhead=8, num_layers=4, dim_ff=512, dropout=0.1): | |
super().__init__() | |
self.embedding = nn.Embedding(vocab_size, d_model) | |
self.positional_encoding = PositionalEncoding(d_model) | |
encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_ff, dropout) | |
decoder_layer = nn.TransformerDecoderLayer(d_model, nhead, dim_ff, dropout) | |
self.encoder = nn.TransformerEncoder(encoder_layer, num_layers) | |
self.decoder = nn.TransformerDecoder(decoder_layer, num_layers) | |
self.copy_attention = nn.MultiheadAttention(d_model, nhead, dropout=dropout) | |
self.copy_gate = nn.Linear(d_model * 2, 1) | |
self.output_layer = nn.Linear(d_model, vocab_size) | |
def forward(self, input_ids, attention_mask=None, labels=None): | |
src = input_ids | |
tgt = labels[:, :-1] | |
tgt_y = labels[:, 1:] | |
src_embed = self.embedding(src) | |
tgt_embed = self.embedding(tgt) | |
src_embed = self.positional_encoding(src_embed) | |
tgt_embed = self.positional_encoding(tgt_embed) | |
src_mask = (src == tokenizer.pad_token_id) | |
tgt_mask = (tgt == tokenizer.pad_token_id) | |
memory = self.encoder(src_embed.transpose(0, 1), src_key_padding_mask=src_mask) | |
output = self.decoder( | |
tgt_embed.transpose(0, 1), | |
memory, | |
tgt_key_padding_mask=tgt_mask, | |
memory_key_padding_mask=src_mask | |
) | |
attn_output, attn_weights = self.copy_attention(output, memory, memory, key_padding_mask=src_mask) | |
concat = torch.cat([output, attn_output], dim=-1) | |
copy_prob = torch.sigmoid(self.copy_gate(concat)) | |
gen_logits = self.output_layer(output) | |
gen_probs = F.softmax(gen_logits, dim=-1) | |
loss = F.cross_entropy( | |
gen_logits.transpose(0, 1).reshape(-1, gen_logits.size(-1)), | |
tgt_y.reshape(-1), | |
ignore_index=tokenizer.pad_token_id | |
) if labels is not None else None | |
return {"loss": loss, "logits": gen_logits.transpose(0, 1)} | |
# --- HF Wrapper --- | |
class TransformerCopyHF(PreTrainedModel): | |
config_class = TransformerCopyConfig | |
def __init__(self, config): | |
super().__init__(config) | |
self.model = TransformerCopyModel(config.vocab_size) | |
def forward(self, input_ids, attention_mask=None, labels=None): | |
return self.model(input_ids, attention_mask, labels) | |
model = TransformerCopyHF.from_pretrained("bashyaldhiraj2067/remove1_copy_transformer") | |
model.eval() | |
# ===================== | |
# 5. Inference Function | |
# ===================== | |
def generate_clean_text(input_text, max_length=128): | |
model_input = tokenizer.encode(input_text, max_length=max_length) | |
input_ids = torch.tensor([model_input]) | |
# Create dummy target input (just start token) | |
decoder_input = torch.tensor([[tokenizer.bos_token_id]]) | |
output_tokens = [] | |
for _ in range(max_length): | |
with torch.no_grad(): | |
out = model(input_ids=input_ids, labels=torch.cat([decoder_input, torch.zeros((1, 1), dtype=torch.long)], dim=1)) | |
next_token_logits = out["logits"][:, -1, :] | |
next_token = torch.argmax(next_token_logits, dim=-1) | |
next_token_id = next_token.item() | |
if next_token_id == tokenizer.pad_token_id: | |
break | |
output_tokens.append(next_token_id) | |
decoder_input = torch.cat([decoder_input, next_token.unsqueeze(0)], dim=1) | |
return tokenizer.decode(output_tokens) | |
# Gradio Interface Setup | |
iface = gr.Interface( | |
fn=generate_clean_text, | |
inputs=gr.Textbox(label="Noisy Text"), | |
outputs=gr.Textbox(label="Cleaned Text"), | |
live=True | |
) | |
iface.launch(debug=True) | |