Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import math | |
| import torch | |
| import pandas as pd | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from datasets import Dataset | |
| import transformers | |
| from transformers import AutoModelForCausalLM, DataCollatorForLanguageModeling, Trainer, TrainingArguments | |
| from peft import LoraConfig, get_peft_model | |
| from sentence_transformers import SentenceTransformer, util | |
| # ----------------------------- | |
| # ENVIRONMENT / CACHE | |
| # ----------------------------- | |
| os.environ["TRANSFORMERS_CACHE"] = "/tmp/huggingface_cache" | |
| os.environ["HF_HOME"] = "/tmp/huggingface_cache" | |
| os.environ["HF_DATASETS_CACHE"] = "/tmp/huggingface_cache" | |
| os.environ["HF_METRICS_CACHE"] = "/tmp/huggingface_cache" | |
| os.environ["WANDB_MODE"] = "disabled" | |
| # ----------------------------- | |
| # SETTINGS | |
| # ----------------------------- | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| tokenizer = transformers.AutoTokenizer.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0") | |
| embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| model = None | |
| # ----------------------------- | |
| # LoRA / MoE Modules | |
| # ----------------------------- | |
| class LoraLinear(nn.Module): | |
| def __init__(self, in_features, out_features, r=8, lora_alpha=16, lora_dropout=0.05, bias=False): | |
| super().__init__() | |
| self.in_features = in_features | |
| self.out_features = out_features | |
| self.r = r | |
| self.scaling = lora_alpha / r if r > 0 else 1.0 | |
| self.weight = nn.Parameter(torch.empty(out_features, in_features), requires_grad=False) | |
| self.bias = nn.Parameter(torch.zeros(out_features), requires_grad=False) if bias else None | |
| if r > 0: | |
| self.lora_A = nn.Parameter(torch.zeros((r, in_features))) | |
| self.lora_B = nn.Parameter(torch.zeros((out_features, r))) | |
| nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5)) | |
| nn.init.zeros_(self.lora_B) | |
| self.lora_dropout = nn.Dropout(p=lora_dropout) | |
| else: | |
| self.lora_A, self.lora_B, self.lora_dropout = None, None, None | |
| def forward(self, x): | |
| result = F.linear(x, self.weight, self.bias) | |
| if self.r > 0: | |
| lora_out = self.lora_dropout(x) @ self.lora_A.T @ self.lora_B.T | |
| result = result + self.scaling * lora_out | |
| return result | |
| class MoELoRALinear(nn.Module): | |
| def __init__(self, base_linear, r, num_experts=2, k=1, lora_alpha=16, lora_dropout=0.05): | |
| super().__init__() | |
| self.base_linear = base_linear | |
| self.num_experts = num_experts | |
| self.k = k | |
| self.experts = nn.ModuleList([ | |
| LoraLinear(base_linear.in_features, base_linear.out_features, r=r, lora_alpha=lora_alpha, lora_dropout=lora_dropout) | |
| for _ in range(num_experts) | |
| ]) | |
| self.gate = nn.Linear(base_linear.in_features, num_experts) | |
| def forward(self, x): | |
| base_out = self.base_linear(x) | |
| gate_scores = torch.softmax(self.gate(x), dim=-1) | |
| expert_out = 0 | |
| for i, expert in enumerate(self.experts): | |
| expert_out += gate_scores[..., i:i+1] * expert(x) | |
| return base_out + expert_out | |
| def replace_proj_with_moe_lora(model, r=8, num_experts=2, k=1, lora_alpha=16, lora_dropout=0.05): | |
| for layer in model.model.layers: | |
| for proj_name in ["up_proj", "down_proj"]: | |
| old = getattr(layer.mlp, proj_name) | |
| moe = MoELoRALinear( | |
| base_linear=old, | |
| r=r, | |
| num_experts=num_experts, | |
| k=k, | |
| lora_alpha=lora_alpha, | |
| lora_dropout=lora_dropout, | |
| ).to(next(old.parameters()).device) | |
| setattr(layer.mlp, proj_name, moe) | |
| return model | |
| # ----------------------------- | |
| # DATA PREPROCESSING | |
| # ----------------------------- | |
| def preprocess(example): | |
| tokens = tokenizer(example['text'], truncation=True, padding=False) | |
| text = example['text'] | |
| assistant_index = text.find("<|assistant|>") | |
| prefix_ids = tokenizer(text[:assistant_index], add_special_tokens=False)['input_ids'] | |
| prefix_len = len(prefix_ids) | |
| labels = tokens['input_ids'].copy() | |
| labels[:prefix_len] = [-100] * prefix_len | |
| tokens['labels'] = labels | |
| return tokens | |
| # ----------------------------- | |
| # LOAD & TRAIN MODEL | |
| # ----------------------------- | |
| def load_and_train(model_id="TinyLlama/TinyLlama-1.1B-Chat-v1.0"): | |
| global model | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| json_file_path = os.path.join(current_dir, 'makemytrip_qa_full.json') | |
| with open(json_file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| df = pd.DataFrame(data) | |
| print(f"Loaded dataset containing {len(df)} questions") | |
| system_prompt = "You are a helpful assistant that provides financial data from MakeMyTrip reports." | |
| training_data = [ | |
| {"text": f"<|system|>\n{system_prompt}</s>\n<|user|>\n{row['question']}</s>\n<|assistant|>\n{row['answer']}</s>"} | |
| for _, row in df.iterrows() | |
| ] | |
| dataset = Dataset.from_list(training_data) | |
| tokenized_dataset = dataset.map(preprocess, remove_columns=["text"]) | |
| base_model = AutoModelForCausalLM.from_pretrained(model_id, trust_remote_code=True).to(device) | |
| model = replace_proj_with_moe_lora(base_model) | |
| peft_config = LoraConfig(r=8, lora_alpha=16, lora_dropout=0.05, target_modules=["o_proj"], bias="none", task_type="CAUSAL_LM") | |
| model = get_peft_model(model, peft_config) | |
| model.config.use_cache = False | |
| model.gradient_checkpointing_disable() | |
| data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False) | |
| training_args = TrainingArguments( | |
| learning_rate=1e-4, | |
| lr_scheduler_type="cosine", | |
| output_dir="./results", | |
| num_train_epochs=10, | |
| per_device_train_batch_size=1, | |
| gradient_accumulation_steps=4, | |
| logging_steps=1, | |
| save_steps=10, | |
| save_total_limit=2, | |
| fp16=True, | |
| bf16=False, | |
| ) | |
| trainer = Trainer( | |
| model=model, | |
| args=training_args, | |
| train_dataset=tokenized_dataset, | |
| data_collator=data_collator | |
| ) | |
| print(torch.cuda.is_available()) # True if GPU is detected | |
| print(next(model.parameters()).device) # Shows where your model is | |
| print("Training started") | |
| trainer.train() | |
| model.eval() | |
| # ---------------- Guardrails ---------------- | |
| BLOCKED_TERMS = ["weather", "cricket", "movie", "song", "football", "holiday", | |
| "travel", "recipe", "music", "game", "sports", "politics", "election"] | |
| FINANCE_DOMAINS = [ | |
| "financial reporting", "balance sheet", "income statement", | |
| "assets and liabilities", "equity", "revenue", "profit and loss", | |
| "goodwill impairment", "cash flow", "dividends", "taxation", | |
| "investment", "valuation", "capital structure", "ownership interests", | |
| "subsidiaries", "shareholders equity", "expenses", "earnings", | |
| "debt", "amortization", "depreciation" | |
| ] | |
| finance_embeds = embed_model.encode(FINANCE_DOMAINS, convert_to_tensor=True) | |
| #-------------------------------------------------------------- | |
| # GUARD RAIL | |
| #-------------------------------------------------------------- | |
| def validate_query(query: str, threshold: float = 0.5) -> bool: | |
| q_lower = query.lower() | |
| if any(bad in q_lower for bad in BLOCKED_TERMS): | |
| print("[Guardrail] Rejected by blocklist.") | |
| return False | |
| q_emb = embed_model.encode(query, convert_to_tensor=True) | |
| sim_scores = util.cos_sim(q_emb, finance_embeds) | |
| max_score = float(sim_scores.max()) | |
| if max_score > threshold: | |
| print(f"[Guardrail] Accepted (semantic match {max_score:.2f})") | |
| return True | |
| else: | |
| print(f"[Guardrail] Rejected (low semantic score {max_score:.2f})") | |
| return False | |
| # ----------------------------- | |
| # GENERATE ANSWER | |
| # ----------------------------- | |
| def generate_answer(prompt, max_tokens=200): | |
| if prompt.strip() == "": | |
| return "Please enter a prompt!" | |
| if not validate_query(prompt): | |
| print("Query rejected: Not finance-related.") | |
| return "Query rejected: Please ask finance-related questions." | |
| system_prompt = "You are a helpful assistant that provides financial data from MakeMyTrip reports." | |
| messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": prompt}] | |
| input_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| inputs = tokenizer(input_text, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| outputs = model.generate( | |
| **inputs, | |
| max_new_tokens=max_tokens, | |
| do_sample=True, | |
| top_p=0.9, | |
| temperature=0.7, | |
| ) | |
| decoded_output = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| answer_start_token = '<|assistant|>' | |
| answer_start_index = decoded_output.rfind(answer_start_token) | |
| if answer_start_index != -1: | |
| generated_answer = decoded_output[answer_start_index + len(answer_start_token):].strip() | |
| if generated_answer.endswith('</s>'): | |
| generated_answer = generated_answer[:-len('</s>')].strip() | |
| else: | |
| generated_answer = "Could not extract answer from model output." | |
| return generated_answer |