import torch import torch.nn as nn import torch.nn.functional as F import gradio as gr from torch.utils.data import Dataset from transformers import PreTrainedModel, PretrainedConfig, Trainer, TrainingArguments from datasets import load_dataset import numpy as np # ===================== # 1. Load Dataset Subsets # ===================== dataset = load_dataset("bashyaldhiraj2067/500k_copy_error_dataset") train_subset = dataset["train"].select(range(int(len(dataset["train"]) * 0.1))) test_subset = dataset["test"].select(range(int(len(dataset["test"]) * 0.1))) print(f"Subset train size: {len(train_subset)}") print(f"Subset test size: {len(test_subset)}") # ===================== # 2. Tokenizer # ===================== special_tokens = ["", "", "", ""] nepali_chars = list("अआइईउऊऋॠऌॡऎएऐओऔकखगघङचछजझञटठडढणतथदधनपफबभमयरलवशषसह्ािीुूृॄेैोौंंःँ।०१२३४५६७८९,.;?!़ॅंःॊॅऒऽॉड़ॐ॥ऑऱफ़ढ़") char_vocab = special_tokens + nepali_chars char2id = {char: idx for idx, char in enumerate(char_vocab)} id2char = {idx: char for char, idx in char2id.items()} vocab_size = len(char2id) class CharTokenizer: def __init__(self, char2id, id2char, vocab_size): self.char2id = char2id self.id2char = id2char self.pad_token_id = char2id[""] self.unk_token_id = char2id[""] self.bos_token_id = char2id[""] self.eos_token_id = char2id[""] self.vocab_size = vocab_size def encode(self, text, max_length=128): ids = [self.char2id.get(ch, self.unk_token_id) for ch in text] ids = ids[:max_length] return ids + [self.pad_token_id] * (max_length - len(ids)) def decode(self, ids): return ''.join([self.id2char.get(i, '') for i in ids if i != self.pad_token_id]) def __call__(self, text, text_target=None, max_length=128): input_ids = self.encode(text, max_length) input_ids = torch.clamp(torch.tensor(input_ids), max=self.vocab_size - 1).tolist() result = {"input_ids": input_ids, "attention_mask": [1 if i != self.pad_token_id else 0 for i in input_ids]} if text_target: labels = self.encode(text_target, max_length) result["labels"] = labels return result tokenizer = CharTokenizer(char2id, id2char, vocab_size=vocab_size) # ===================== # 3. Dataset # ===================== class CopyDataset(Dataset): def __init__(self, data, tokenizer, max_length=128): self.data = data self.tokenizer = tokenizer self.max_length = max_length def __len__(self): return len(self.data) def __getitem__(self, idx): noisy = self.data[idx]['incorrect'] clean = self.data[idx]['correct'] return self.tokenizer(noisy, text_target=clean, max_length=self.max_length) train_dataset = CopyDataset(train_subset, tokenizer) eval_dataset = CopyDataset(test_subset, tokenizer) # ===================== # 4. Transformer with Copy Mechanism # ===================== class TransformerCopyConfig(PretrainedConfig): def __init__(self, vocab_size=len(char2id), **kwargs): super().__init__(**kwargs) self.vocab_size = vocab_size # --- Model Components --- class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=512): super().__init__() pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2) * (-torch.log(torch.tensor(10000.0)) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) self.register_buffer('pe', pe.unsqueeze(0)) def forward(self, x): return x + self.pe[:, :x.size(1)] class TransformerCopyModel(nn.Module): def __init__(self, vocab_size, d_model=256, nhead=8, num_layers=4, dim_ff=512, dropout=0.1): super().__init__() self.embedding = nn.Embedding(vocab_size, d_model) self.positional_encoding = PositionalEncoding(d_model) encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_ff, dropout) decoder_layer = nn.TransformerDecoderLayer(d_model, nhead, dim_ff, dropout) self.encoder = nn.TransformerEncoder(encoder_layer, num_layers) self.decoder = nn.TransformerDecoder(decoder_layer, num_layers) self.copy_attention = nn.MultiheadAttention(d_model, nhead, dropout=dropout) self.copy_gate = nn.Linear(d_model * 2, 1) self.output_layer = nn.Linear(d_model, vocab_size) def forward(self, input_ids, attention_mask=None, labels=None): src = input_ids tgt = labels[:, :-1] tgt_y = labels[:, 1:] src_embed = self.embedding(src) tgt_embed = self.embedding(tgt) src_embed = self.positional_encoding(src_embed) tgt_embed = self.positional_encoding(tgt_embed) src_mask = (src == tokenizer.pad_token_id) tgt_mask = (tgt == tokenizer.pad_token_id) memory = self.encoder(src_embed.transpose(0, 1), src_key_padding_mask=src_mask) output = self.decoder( tgt_embed.transpose(0, 1), memory, tgt_key_padding_mask=tgt_mask, memory_key_padding_mask=src_mask ) attn_output, attn_weights = self.copy_attention(output, memory, memory, key_padding_mask=src_mask) concat = torch.cat([output, attn_output], dim=-1) copy_prob = torch.sigmoid(self.copy_gate(concat)) gen_logits = self.output_layer(output) gen_probs = F.softmax(gen_logits, dim=-1) loss = F.cross_entropy( gen_logits.transpose(0, 1).reshape(-1, gen_logits.size(-1)), tgt_y.reshape(-1), ignore_index=tokenizer.pad_token_id ) if labels is not None else None return {"loss": loss, "logits": gen_logits.transpose(0, 1)} # --- HF Wrapper --- class TransformerCopyHF(PreTrainedModel): config_class = TransformerCopyConfig def __init__(self, config): super().__init__(config) self.model = TransformerCopyModel(config.vocab_size) def forward(self, input_ids, attention_mask=None, labels=None): return self.model(input_ids, attention_mask, labels) model = TransformerCopyHF.from_pretrained("bashyaldhiraj2067/remove1_copy_transformer") model.eval() # ===================== # 5. Inference Function # ===================== def generate_clean_text(input_text, max_length=128): model_input = tokenizer.encode(input_text, max_length=max_length) input_ids = torch.tensor([model_input]) # Create dummy target input (just start token) decoder_input = torch.tensor([[tokenizer.bos_token_id]]) output_tokens = [] for _ in range(max_length): with torch.no_grad(): out = model(input_ids=input_ids, labels=torch.cat([decoder_input, torch.zeros((1, 1), dtype=torch.long)], dim=1)) next_token_logits = out["logits"][:, -1, :] next_token = torch.argmax(next_token_logits, dim=-1) next_token_id = next_token.item() if next_token_id == tokenizer.pad_token_id: break output_tokens.append(next_token_id) decoder_input = torch.cat([decoder_input, next_token.unsqueeze(0)], dim=1) return tokenizer.decode(output_tokens) # Gradio Interface Setup iface = gr.Interface( fn=generate_clean_text, inputs=gr.Textbox(label="Noisy Text"), outputs=gr.Textbox(label="Cleaned Text"), live=True ) iface.launch(debug=True)