diff --git a/mcp_agent/mcp_client.py b/mcp_agent/mcp_client.py index 485d599..666c30d 100644 --- a/mcp_agent/mcp_client.py +++ b/mcp_agent/mcp_client.py @@ -1,21 +1,66 @@ import httpx import asyncio +import logging + +logger = logging.getLogger(__name__) class MCPClient: - def __init__(self, server_url: str): + def __init__(self, server_url: str, timeout: int = 3600): self.server_url = server_url - self.client = httpx.AsyncClient(timeout=60) + self.client = httpx.AsyncClient(timeout=timeout) + logger.info(f"MCPClient initialized for {server_url} with timeout={timeout}s ({timeout//60} minutes)") async def send(self, tool: str, arguments: dict): - response = await self.client.post( - f"{self.server_url}/execute", - json={ - "tool": tool, - "arguments": arguments, - }, - ) - response.raise_for_status() - return response.json() + logger.info(f"MCPClient: Sending request to {self.server_url}/execute for tool '{tool}'") + try: + response = await self.client.post( + f"{self.server_url}/execute", + json={ + "tool": tool, + "arguments": arguments, + }, + ) + logger.info(f"MCPClient: Received response with status={response.status_code}") + logger.debug(f"MCPClient: Response headers: {response.headers}") + + except asyncio.TimeoutError as e: + logger.error(f"MCPClient: Request timeout for tool '{tool}': {str(e)}") + raise Exception(f"MCP tool '{tool}' request timed out (>3600s / 1 hour). Model loading or fine-tuning may be too slow.") + except Exception as e: + logger.error(f"MCPClient: Request failed for tool '{tool}': {str(e)}", exc_info=True) + raise Exception(f"MCP tool '{tool}' request failed: {str(e)}") + + if response.status_code >= 400: + error_data = {} + try: + error_data = response.json() + logger.error(f"MCPClient: HTTP error {response.status_code}: {error_data}") + except: + logger.error(f"MCPClient: HTTP error {response.status_code} (could not parse JSON)") + pass + + error_msg = error_data.get("error") or error_data.get("details") or f"HTTP {response.status_code}" + raise Exception(f"MCP tool '{tool}' failed: {error_msg}. Full response: {error_data}") + + try: + result = response.json() + logger.debug(f"MCPClient: Parsed JSON response: status={result.get('status')}") + except Exception as e: + logger.error(f"MCPClient: Failed to parse response JSON: {str(e)}") + logger.error(f"MCPClient: Raw response text: {response.text[:500]}") + raise Exception(f"MCP tool '{tool}' returned invalid JSON: {str(e)}") + + if isinstance(result, dict) and result.get("status") == "failed": + error_msg = result.get("error") or result.get("details") or "Unknown error" + traceback_info = result.get("traceback", "") + full_error = f"MCP tool '{tool}' returned failure: {error_msg}" + if traceback_info: + full_error += f"\n\nServer traceback:\n{traceback_info}" + logger.error(f"MCPClient: {full_error}") + raise Exception(full_error) + + logger.info(f"MCPClient: Tool '{tool}' completed successfully") + return result async def health(self): response = await self.client.get(f"{self.server_url}/health") @@ -37,6 +82,5 @@ async def main(): print(result) await client.close() - if __name__ == "__main__": asyncio.run(main()) diff --git a/mcp_agent/mcp_server.py b/mcp_agent/mcp_server.py index 1be749f..4d63bb4 100644 --- a/mcp_agent/mcp_server.py +++ b/mcp_agent/mcp_server.py @@ -2,21 +2,47 @@ import asyncio import json import os import sys +import logging +import traceback from datetime import datetime from pathlib import PureWindowsPath -from typing import Any, Dict, List +from typing import Any, Dict, List, Tuple from aiohttp import web from mcp.server import Server from mcp.types import Tool, TextContent +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(sys.stderr), + logging.StreamHandler(sys.stdout), + ] +) +logger = logging.getLogger(__name__) + +project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +model_cache_dir = os.path.join(project_root, "model", "base-model") +os.makedirs(model_cache_dir, exist_ok=True) +os.environ["HF_HOME"] = model_cache_dir +logger.info(f"Project root: {project_root}") +logger.info(f"HuggingFace model cache directory set to: {model_cache_dir}") + app = Server("mlstore-mcp-server") +logger.info("MCP Server initialized: mlstore-mcp-server") LOADED_MODELS: Dict[str, Dict[str, Any]] = {} +PAIR_EXTRACTOR: Dict[str, Any] = {} + +BASE_MODEL_CACHE_DIR = model_cache_dir + + @app.list_tools() async def list_tools(): - return [ + logger.info("Listing available tools") + tools = [ Tool( name="echo", description="Echo back the provided input", @@ -27,8 +53,7 @@ async def list_tools(): }, "required": ["message"] }, - ) - , + ), Tool( name="fine_tune", description="Start fine-tuning a base model using training files", @@ -69,6 +94,8 @@ async def list_tools(): }, ), ] + logger.info(f"Available tools: {[t.name for t in tools]}") + return tools def _now() -> str: @@ -93,13 +120,10 @@ def _resolve_model_path(model_path: str) -> str: candidates = [] - # Try relative to current working directory candidates.append(os.path.normpath(os.path.join(os.getcwd(), norm))) - # Try relative to model root candidates.append(os.path.normpath(os.path.join(_model_root(), os.path.basename(norm)))) - # If it's a Windows-style absolute path, map to container /app by trimming common root if ":" in model_path or "\\" in model_path: p = PureWindowsPath(model_path) parts = [str(x) for x in p.parts] @@ -117,7 +141,6 @@ def _resolve_model_path(model_path: str) -> str: def _resolve_model_file(model_path: str) -> tuple[str, str]: - """Return (model_dir, model_filename) for GPT4All.""" resolved = _resolve_model_path(model_path) if os.path.isdir(resolved): for name in os.listdir(resolved): @@ -127,9 +150,603 @@ def _resolve_model_file(model_path: str) -> tuple[str, str]: return os.path.dirname(resolved), os.path.basename(resolved) +def _load_training_file(file_path: str) -> List[Dict[str, str]]: + logger.info(f"Loading training file: {file_path}") + + if not os.path.exists(file_path): + logger.error(f"Training file not found: {file_path}") + raise FileNotFoundError(f"Training file not found: {file_path}") + + _, ext = os.path.splitext(file_path) + ext = ext.lower() + + try: + if ext == '.json': + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + if isinstance(data, list): + logger.info(f"JSON file contains {len(data)} items") + return data + elif isinstance(data, dict): + logger.info(f"JSON file is dict, extracting first array or values") + for key, val in data.items(): + if isinstance(val, list): + return val + return [data] + return [] + + elif ext == '.csv': + import csv + pairs = [] + with open(file_path, 'r', encoding='utf-8') as f: + reader = csv.DictReader(f) + for row in reader: + pairs.append(dict(row)) + logger.info(f"CSV file contains {len(pairs)} rows") + return pairs + + elif ext == '.txt': + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + pairs = [{'text': para.strip()} for para in content.split('\n\n') if para.strip()] + logger.info(f"TXT file contains {len(pairs)} paragraphs") + return pairs + + elif ext == '.md': + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + pairs = [{'text': para.strip()} for para in content.split('\n\n') if para.strip()] + logger.info(f"MD file contains {len(pairs)} sections") + return pairs + + elif ext == '.pdf': + try: + import PyPDF2 + pairs = [] + with open(file_path, 'rb') as f: + reader = PyPDF2.PdfReader(f) + for page_num, page in enumerate(reader.pages): + text = page.extract_text() + if text.strip(): + pairs.append({'text': text.strip(), 'page': page_num}) + logger.info(f"PDF file contains {len(pairs)} pages") + return pairs + except: + pass + + elif ext == '.docx': + try: + from docx import Document + doc = Document(file_path) + pairs = [{'text': para.text} for para in doc.paragraphs if para.text.strip()] + logger.info(f"DOCX file contains {len(pairs)} paragraphs") + return pairs + except ImportError: + logger.error("python-docx not available for DOCX parsing") + raise ImportError("python-docx library not available") + + else: + logger.error(f"Unsupported file type: {ext}") + raise ValueError(f"Unsupported file type: {ext}") + + except Exception as e: + logger.error(f"Failed to load training file: {str(e)}", exc_info=True) + raise + + +def _ensure_pair_extractor_model(base_model: str): + if PAIR_EXTRACTOR.get("model") is not None: + return PAIR_EXTRACTOR["tokenizer"], PAIR_EXTRACTOR["model"] + + logger.info("Loading base model for pair extraction (prompt-based)") + from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig + import torch + + tokenizer = AutoTokenizer.from_pretrained( + base_model, + cache_dir=model_cache_dir, + local_files_only=False, + ) + if tokenizer.pad_token is None: + tokenizer.pad_token = tokenizer.eos_token + + model = AutoModelForCausalLM.from_pretrained( + base_model, + cache_dir=model_cache_dir, + quantization_config=BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16), + device_map="auto", + dtype=torch.float16, + offload_dir='./offload_dir', + ) + + PAIR_EXTRACTOR["tokenizer"] = tokenizer + PAIR_EXTRACTOR["model"] = model + logger.info("Pair extractor model loaded and cached") + return tokenizer, model + + +def _format_training_sample(sample: Any) -> str: + try: + if isinstance(sample, dict): + parts = [] + for k, v in sample.items(): + if isinstance(v, str) and v.strip(): + parts.append(f"{k}: {v.strip()}") + return " | ".join(parts) if parts else json.dumps(sample) + if isinstance(sample, str): + return sample.strip() + return str(sample) + except Exception: + return str(sample) + + +def _prompt_based_pair_extraction(training_data: List[Any], base_model: str) -> List[Tuple[str, str]]: + import torch + + tokenizer, model = _ensure_pair_extractor_model(base_model) + + max_items = 12 + subset = training_data + formatted = [f"{i+1}. {_format_training_sample(item)}" for i, item in enumerate(subset)] + data_block = "\n".join(formatted) + + example_pairs = [ + {"instruction": "Explain what a REST API is.", "response": "A REST API is an interface that uses HTTP methods..."}, + {"instruction": "Summarize the customer complaint.", "response": "Customer reports delayed shipment and requests refund."}, + ] + + system_prompt = ( + "You are a data extractor. Given a list of items, return a JSON array of training pairs. " + "Each pair must have 'instruction' and 'response'. Keep answers concise. " + "If content is incomplete, still produce best-effort pairs." + ) + + user_prompt = ( + "Examples of desired output:\n" + f"{json.dumps(example_pairs, ensure_ascii=False, indent=2)}\n\n" + "Now extract training pairs from the following items. Return ONLY a JSON array, no extra text.\n" + f"Items:\n{data_block}" + ) + + prompt = f"<|im_start|>system\n{system_prompt}<|im_end|>\n<|im_start|>user\n{user_prompt}<|im_end|>\n<|im_start|>assistant\n" + + inputs = tokenizer(prompt, return_tensors="pt").to(model.device) + max_new_tokens = 512 + with torch.no_grad(): + output = model.generate( + **inputs, + max_new_tokens=max_new_tokens, + do_sample=False, + temperature=0.2, + top_p=0.9, + eos_token_id=tokenizer.eos_token_id, + ) + + decoded = tokenizer.decode(output[0], skip_special_tokens=True) + json_start = decoded.find("[") + if json_start == -1: + logger.error("LLM extraction failed to produce JSON array (no '[' found)") + return [] + + pairs: List[Tuple[str, str]] = [] + bracket_count = 0 + in_string = False + escape_next = False + json_end = -1 + + for i in range(json_start, len(decoded)): + char = decoded[i] + + if escape_next: + escape_next = False + continue + + if char == '\\': + escape_next = True + continue + + if char == '"' and not escape_next: + in_string = not in_string + continue + + if not in_string: + if char == '[': + bracket_count += 1 + elif char == ']': + bracket_count -= 1 + if bracket_count == 0: + json_end = i + break + + if json_end == -1: + logger.error("LLM extraction failed to find valid JSON array boundary") + return [] + + try: + json_text = decoded[json_start: json_end + 1] + logger.debug(f"Extracted JSON text (first 200 chars): {json_text[:200]}") + parsed = json.loads(json_text) + for item in parsed: + instr = str(item.get("instruction", "")).strip() + resp = str(item.get("response", "")).strip() + if instr and resp: + pairs.append((instr, resp)) + logger.info(f"LLM extracted {len(pairs)} pairs via prompting") + return pairs + except Exception as e: + logger.error(f"Failed to parse LLM-extracted JSON: {str(e)}") + logger.debug(f"JSON text that failed to parse: {json_text if 'json_text' in locals() else 'N/A'}") + return [] + + +def _extract_training_pairs(training_data: List[Any]) -> List[Tuple[str, str]]: + logger.info(f"Extracting training pairs via LLM for {len(training_data)} items") + + if not training_data: + return [] + + base_model = "meta-llama/Meta-Llama-3-8B-Instruct" + + pairs = _prompt_based_pair_extraction(training_data, base_model) + if not pairs: + logger.warning("LLM extraction failed; falling back to minimal heuristic") + for item in training_data: + text = _format_training_sample(item) + if text: + mid = max(1, len(text) // 2) + pairs.append((text[:mid].strip(), text[mid:].strip() or text[:50])) + + logger.info(f"Total pairs extracted: {len(pairs)}") + if pairs: + logger.debug(f"Sample pair: instruction='{pairs[0][0][:80]}...', response='{pairs[0][1][:80]}...'") + return pairs + + +async def _fine_tune_model_impl( + training_files: List[str], + hyperparams: Dict[str, Any], + model_name: str, + version: str, + output_dir: str +) -> Dict[str, Any]: + base_model = "meta-llama/Meta-Llama-3-8B-Instruct" + logger.info(f"Starting fine-tune process with base model: {base_model}") + + try: + logger.info(f"Step 1: Loading {len(training_files)} training files") + all_training_pairs = [] + + for file_path in training_files: + try: + training_data = _load_training_file(file_path) + pairs = _extract_training_pairs(training_data) + all_training_pairs.extend(pairs) + logger.info(f"File {os.path.basename(file_path)}: {len(pairs)} pairs extracted") + except Exception as e: + logger.error(f"Failed to process file {file_path}: {str(e)}") + continue + + if not all_training_pairs: + logger.error("No training pairs extracted from any files") + return { + "status": "failed", + "error": "no_training_pairs_extracted", + "timestamp": _now(), + } + + logger.info(f"Step 1 Complete: Total {len(all_training_pairs)} training pairs extracted") + logger.info(f"Step 2: Loading base model and tokenizer: {base_model}") + try: + from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig + from huggingface_hub import login, HfApi + import torch + + logger.info("Step 2a: Authenticating with HuggingFace...") + hf_token = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") + if hf_token: + logger.info("HuggingFace token found in environment, logging in...") + try: + login(token=hf_token, write_permission=False) + logger.info("Successfully authenticated with HuggingFace") + except Exception as e: + error_details = traceback.format_exc() + logger.error(f"Failed to authenticate with HuggingFace: {str(e)}") + logger.error(f"Traceback:\n{error_details}") + return { + "status": "failed", + "error": "huggingface_auth_failed", + "details": str(e), + "traceback": error_details, + "timestamp": _now(), + } + else: + logger.warning("HF_TOKEN or HUGGINGFACE_TOKEN environment variable not found. Model access may be restricted.") + + logger.info("Step 2b: Loading tokenizer...") + try: + logger.info(f"Tokenizer cache directory: {model_cache_dir}") + tokenizer = AutoTokenizer.from_pretrained( + base_model, + cache_dir=model_cache_dir, + local_files_only=False + ) + logger.info("Tokenizer download started...") + if tokenizer.pad_token is None: + tokenizer.pad_token = tokenizer.eos_token + logger.info("Tokenizer loaded successfully") + except Exception as e: + error_details = traceback.format_exc() + logger.error(f"Failed to load tokenizer: {str(e)}") + logger.error(f"Traceback:\n{error_details}") + raise + + logger.info("Step 2b: Loading base model with 4-bit quantization...") + try: + logger.info(f"Model cache directory: {model_cache_dir}") + logger.info(f"Starting model download for {base_model}...") + model = AutoModelForCausalLM.from_pretrained( + base_model, + cache_dir=model_cache_dir, + quantization_config=BitsAndBytesConfig( + load_in_4bit=True, + bnb_4bit_compute_dtype=torch.float16 + ), + device_map="auto", + dtype=torch.float16, + ) + logger.info("Base model loaded successfully with 4-bit quantization") + except Exception as e: + error_details = traceback.format_exc() + logger.error(f"Failed to load base model: {str(e)}") + logger.error(f"Traceback:\n{error_details}") + raise + + except ImportError as e: + error_details = traceback.format_exc() + logger.error(f"Required HuggingFace transformers not available: {str(e)}") + logger.error(f"Traceback:\n{error_details}") + return { + "status": "failed", + "error": "transformers_not_available", + "details": str(e), + "traceback": error_details, + "timestamp": _now(), + } + + logger.info(f"Step 2 Complete: Model and tokenizer loaded") + logger.info(f"Step 3: Fine-tuning with LoRA adapters") + try: + from peft import LoraConfig + from trl import SFTTrainer + from transformers import TrainingArguments + import json + + training_data_file = os.path.join(output_dir, "training_data.jsonl") + with open(training_data_file, "w", encoding="utf-8") as f: + for prompt, response in all_training_pairs: + training_pair = { + "instruction": prompt, + "input": "", + "output": response, + "text": f"<|im_start|>user\n{prompt}<|im_end|>\n<|im_start|>assistant\n{response}<|im_end|>" + } + f.write(json.dumps(training_pair, ensure_ascii=False) + "\n") + logger.info(f"Training data file created: {training_data_file}") + + from datasets import load_dataset + + dataset = load_dataset("json", data_files=training_data_file) + logger.info(f"Dataset loaded: {len(dataset['train'])} examples") + + lora_config = LoraConfig( + r=int(hyperparams.get("lora_r", 64)), + lora_alpha=int(hyperparams.get("lora_alpha", 16)), + lora_dropout=float(hyperparams.get("lora_dropout", 0.05)), + target_modules=["q_proj", "k_proj", "v_proj", "o_proj"], + task_type="CAUSAL_LM" + ) + + training_args = TrainingArguments( + output_dir=os.path.join(output_dir, "checkpoints"), + num_train_epochs=int(hyperparams.get("epochs", 3)), + per_device_train_batch_size=int(hyperparams.get("batch_size", 6)), + gradient_accumulation_steps=int(hyperparams.get("gradient_accumulation_steps", 3)), + fp16=False, + bf16=False, + optim="paged_adamw_8bit", + max_grad_norm=0.0, + logging_steps=10, + save_strategy="epoch", + ddp_find_unused_parameters=False, + remove_unused_columns=False, + dataloader_pin_memory=True, + ) + trainer = SFTTrainer( + model=model, + train_dataset=dataset["train"], + peft_config=lora_config, + args=training_args, + ) + trainer.accelerator.scaler = None + + logger.info("Starting LoRA fine-tuning...") + trainer.train() + logger.info("LoRA fine-tuning completed") + adapter_dir = os.path.join(output_dir, "adapter") + os.makedirs(adapter_dir, exist_ok=True) + trainer.model.save_pretrained(adapter_dir) + tokenizer.save_pretrained(adapter_dir) + logger.info(f"LoRA adapter saved to {adapter_dir}") + merge_dir = os.path.join(output_dir, "merged") + os.makedirs(merge_dir, exist_ok=True) + logger.info(f"Merging LoRA adapters...") + + from peft import PeftModel + base_reload = AutoModelForCausalLM.from_pretrained( + base_model, + cache_dir=model_cache_dir, + dtype=torch.float16, + device_map="auto", + ) + merged_model = PeftModel.from_pretrained(base_reload, adapter_dir) + merged_model = merged_model.merge_and_unload() + + merged_model.save_pretrained(merge_dir) + tokenizer.save_pretrained(merge_dir) + logger.info(f"Merged model saved to {merge_dir}") + logger.info("Cleaning up GPU memory...") + del model + del merged_model + del base_reload + if torch.cuda.is_available(): + torch.cuda.empty_cache() + logger.info(f"GPU memory freed: {torch.cuda.memory_reserved(0) / 1024**3:.2f} GB reserved") + logger.info("GPU memory cleanup completed") + + except ImportError as e: + logger.error(f"Required LoRA/SFT training libraries not available: {str(e)}") + return { + "status": "failed", + "error": "training_libs_not_available", + "details": str(e), + "timestamp": _now(), + } + + logger.info(f"Step 3 Complete: Fine-tuning and merging completed") + logger.info(f"Step 4: Converting merged model to GGUF format") + try: + import subprocess + + merge_dir = os.path.join(output_dir, "merged") + gguf_f16_path = os.path.join(output_dir, "model-f16.gguf") + convert_script = None + for candidate in [ + os.path.join(os.getcwd(), "llama.cpp", "convert_hf_to_gguf.py"), + os.path.join(os.getcwd(), "notebooks", "build", "llama.cpp", "convert_hf_to_gguf.py"), + "/app/llama.cpp/convert_hf_to_gguf.py", + "/home/llama.cpp/convert_hf_to_gguf.py", + ]: + if os.path.exists(candidate): + convert_script = candidate + break + + if not convert_script: + logger.warning("convert_hf_to_gguf.py not found, trying with python -m") + convert_script = None + + if convert_script: + logger.info(f"Converting with script: {convert_script}") + result = subprocess.run([ + "python", convert_script, + merge_dir, + "--outfile", gguf_f16_path, + "--outtype", "f16" + ], capture_output=True, text=True) + + if result.returncode != 0: + logger.error(f"Conversion script failed with return code {result.returncode}") + logger.error(f"STDOUT: {result.stdout}") + logger.error(f"STDERR: {result.stderr}") + raise RuntimeError(f"GGUF conversion failed: {result.stderr or 'unknown error'}") + else: + logger.info(f"GGUF conversion completed: {gguf_f16_path}") + else: + logger.info("Attempting direct conversion with transformers") + from transformers import AutoModel + model = AutoModelForCausalLM.from_pretrained(merge_dir, torch_dtype=torch.float16) + logger.warning("Direct GGUF conversion not available, using float16 checkpoint instead") + gguf_f16_path = merge_dir + + logger.info(f"GGUF conversion completed: {gguf_f16_path}") + + except Exception as e: + logger.error(f"GGUF conversion failed: {str(e)}", exc_info=True) + gguf_f16_path = os.path.join(output_dir, "merged") + logger.warning("Using merged model directly without GGUF conversion") + logger.info(f"Step 4 Complete: GGUF conversion completed") + logger.info(f"Step 5: Verifying model format") + quantized_path = gguf_f16_path + + if os.path.isfile(gguf_f16_path): + logger.info(f"GGUF file verified: {gguf_f16_path}") + logger.info(f"GGUF model size: {os.path.getsize(gguf_f16_path) / (1024**3):.2f} GB") + quantized_path = gguf_f16_path + else: + logger.warning(f"No GGUF file found; using merged model folder") + quantized_path = gguf_f16_path + logger.info(f"Step 6: Creating metadata and finalizing") + quantized_file = quantized_path + if os.path.isdir(quantized_file): + logger.warning(f"Quantized path is a directory ({quantized_file}); searching for .gguf inside") + ggufs = [f for f in os.listdir(quantized_file) if f.lower().endswith('.gguf')] + if ggufs: + quantized_file = os.path.join(quantized_file, ggufs[0]) + logger.info(f"Found GGUF inside directory: {quantized_file}") + else: + logger.error("No .gguf file found inside quantized directory; using merged folder as fallback") + quantized_file = quantized_path + final_model_path = os.path.join(_model_root(), f"{model_name}-{version}.gguf") + if os.path.isfile(quantized_file) and quantized_file != final_model_path: + import shutil + logger.info(f"Copying quantized model to final location: {final_model_path}") + shutil.copy2(quantized_file, final_model_path) + logger.info(f"Final model saved to: {final_model_path}") + else: + final_model_path = quantized_file + + metadata = { + "status": "completed", + "base_model": base_model, + "name": model_name, + "version": version, + "model_path": final_model_path, + "path": final_model_path, + "output_dir": output_dir, + "training_files_count": len(training_files), + "training_pairs_count": len(all_training_pairs), + "hyperparams": hyperparams, + "timestamp": _now(), + } + + try: + with open(os.path.join(output_dir, "metadata.json"), "w", encoding="utf-8") as f: + json.dump(metadata, f, indent=2) + logger.info(f"Metadata saved to {output_dir}/metadata.json") + except Exception as e: + logger.error(f"Failed to save metadata: {str(e)}", exc_info=True) + logger.info("Performing final GPU memory cleanup...") + if torch.cuda.is_available(): + torch.cuda.empty_cache() + torch.cuda.synchronize() + logger.info(f"Final GPU memory state: {torch.cuda.memory_allocated(0) / 1024**3:.2f} GB allocated") + + logger.info(f"Step 6 Complete: All steps completed successfully") + logger.info(f"Final model available at: {final_model_path}") + return metadata + + except Exception as e: + error_details = traceback.format_exc() + logger.error(f"Fine-tune process failed: {str(e)}") + logger.error(f"Error type: {type(e).__name__}") + logger.error(f"Full traceback:\n{error_details}") + return { + "status": "failed", + "error": str(e) or "Unknown error occurred", + "error_type": type(e).__name__, + "traceback": error_details, + "timestamp": _now(), + } + + async def _run_tool_http(name: str, arguments: dict) -> Dict[str, Any]: + logger.info(f"Executing tool: {name}") + logger.debug(f"Tool arguments: {arguments}") + if name == "echo": - return {"status": "ok", "received": arguments, "timestamp": _now()} + logger.info(f"Echo tool called with message: {arguments.get('message')}") + result = {"status": "ok", "received": arguments, "timestamp": _now()} + logger.info(f"Echo tool completed successfully") + return result if name == "fine_tune": base_model = arguments.get("base_model") @@ -137,48 +754,60 @@ async def _run_tool_http(name: str, arguments: dict) -> Dict[str, Any]: hyperparams = arguments.get("hyperparams") or {} model_name = arguments.get("name") or "model" version = arguments.get("version") or "v1" + + logger.info(f"Fine-tune started: model={model_name}, version={version}") + logger.info(f"Training files count: {len(training_files)}") + logger.debug(f"Training files: {training_files}") + logger.debug(f"Hyperparameters: {hyperparams}") model_root = _model_root() os.makedirs(model_root, exist_ok=True) + logger.debug(f"Model root directory: {model_root}") safe_name = _safe_dir_name(model_name) safe_version = _safe_dir_name(version) output_dir = os.path.join(model_root, f"{safe_name}-{safe_version}") os.makedirs(output_dir, exist_ok=True) - - metadata = { - "status": "completed", - "base_model": base_model, - "training_files": training_files, - "hyperparams": hyperparams, - "name": model_name, - "version": version, - "model_path": output_dir, - "timestamp": _now(), - } + logger.info(f"Output directory created: {output_dir}") try: - with open(os.path.join(output_dir, "metadata.json"), "w", encoding="utf-8") as f: - json.dump(metadata, f, indent=2) - except Exception: - pass - - return metadata + result = await _fine_tune_model_impl(training_files, hyperparams, model_name, version, output_dir) + logger.info(f"Fine-tune result: {result.get('status')}") + return result + except Exception as e: + error_details = traceback.format_exc() + logger.error(f"Fine-tune tool execution failed: {str(e)}") + logger.error(f"Full traceback:\n{error_details}") + return { + "status": "failed", + "error": str(e) or "Unknown error in fine-tune execution", + "error_type": type(e).__name__, + "traceback": error_details, + "timestamp": _now(), + } if name == "load_model": model_path = arguments.get("model_path") + logger.info(f"Loading model: {model_path}") + if not model_path: + logger.error("model_path_required error: no model path provided") return {"status": "failed", "error": "model_path_required", "timestamp": _now()} model_path = _resolve_model_path(model_path) + logger.debug(f"Resolved model path: {model_path}") if not os.path.exists(model_path): + logger.error(f"Model not found at: {model_path}") return {"status": "failed", "error": "model_not_found", "model_path": model_path, "timestamp": _now()} try: from gpt4all import GPT4All model_dir, model_file = _resolve_model_file(model_path) + logger.debug(f"Model directory: {model_dir}, model file: {model_file}") + if not model_file: + logger.error(f"No GGUF file found in model directory: {model_dir}") return { "status": "failed", "error": "model_file_not_found", @@ -186,13 +815,26 @@ async def _run_tool_http(name: str, arguments: dict) -> Dict[str, Any]: "timestamp": _now(), } - model = GPT4All(model_file, model_path=model_dir, allow_download=False, device='gpu') + logger.info(f"Initializing GPT4All model: {model_file}") + try: + logger.info("Attempting to load model on GPU (cuda)...") + model = GPT4All(model_file, model_path=model_dir, allow_download=False, device='cuda') + logger.info(f"Model loaded successfully on GPU") + except Exception as e: + logger.warning(f"GPU initialization failed: {str(e)}, falling back to CPU") + logger.info("Loading model on CPU...") + model = GPT4All(model_file, model_path=model_dir, allow_download=False, device='cpu') + logger.info(f"Model loaded successfully on CPU") + LOADED_MODELS[model_path] = { "loaded_at": _now(), "model": model, "model_dir": model_dir, "model_file": model_file, } + logger.info(f"Model loaded successfully: {model_path}") + logger.info(f"Total loaded models in memory: {len(LOADED_MODELS)}") + return { "status": "completed", "model_path": model_path, @@ -202,6 +844,7 @@ async def _run_tool_http(name: str, arguments: dict) -> Dict[str, Any]: "timestamp": _now(), } except Exception as e: + logger.error(f"Failed to load model: {str(e)}", exc_info=True) return { "status": "failed", "error": str(e), @@ -214,40 +857,67 @@ async def _run_tool_http(name: str, arguments: dict) -> Dict[str, Any]: model_path = arguments.get("model_path") prompt = arguments.get("prompt") or "" options = arguments.get("options") or {} + + logger.info(f"Inference request: model={model_path}") + logger.debug(f"Prompt length: {len(prompt)} characters") + logger.debug(f"Inference options: {options}") if not model_path: + logger.error("model_path_required error: no model path provided") return {"status": "failed", "error": "model_path_required", "timestamp": _now()} model_path = _resolve_model_path(model_path) + logger.debug(f"Resolved model path: {model_path}") if not os.path.exists(model_path): + logger.error(f"Model not found at: {model_path}") return {"status": "failed", "error": "model_not_found", "model_path": model_path, "timestamp": _now()} try: if model_path not in LOADED_MODELS or "model" not in LOADED_MODELS[model_path]: + logger.info(f"Model not in memory, loading: {model_path}") from gpt4all import GPT4All model_dir, model_file = _resolve_model_file(model_path) + logger.debug(f"Model directory: {model_dir}, model file: {model_file}") + if not model_file: + logger.error(f"No GGUF file found in model directory: {model_dir}") return { "status": "failed", "error": "model_file_not_found", "model_path": model_path, "timestamp": _now(), } - model = GPT4All(model_file, model_path=model_dir, allow_download=False) + logger.info(f"Initializing GPT4All model: {model_file}") + try: + logger.info("Attempting to load model on GPU (cuda)...") + model = GPT4All(model_file, model_path=model_dir, allow_download=False, device='cuda') + logger.info(f"Model loaded successfully on GPU for inference") + except Exception as e: + logger.warning(f"GPU initialization failed: {str(e)}, falling back to CPU") + logger.info("Loading model on CPU...") + model = GPT4All(model_file, model_path=model_dir, allow_download=False, device='cpu') + logger.info(f"Model loaded successfully on CPU for inference") + LOADED_MODELS[model_path] = { "loaded_at": _now(), "model": model, "model_dir": model_dir, "model_file": model_file, } + logger.info(f"Model loaded for inference") + else: + logger.debug(f"Using cached model: {model_path}") model = LOADED_MODELS[model_path]["model"] max_tokens = int(options.get("max_tokens", 256)) temp = float(options.get("temperature", options.get("temp", 0.7))) top_p = float(options.get("top_p", 0.95)) top_k = int(options.get("top_k", 40)) + + logger.info(f"Running inference with max_tokens={max_tokens}, temperature={temp}, top_p={top_p}, top_k={top_k}") + logger.debug(f"Full inference parameters: max_tokens={max_tokens}, temp={temp}, top_p={top_p}, top_k={top_k}") response_text = model.generate( prompt, @@ -256,6 +926,9 @@ async def _run_tool_http(name: str, arguments: dict) -> Dict[str, Any]: top_p=top_p, top_k=top_k, ) + + logger.info(f"Inference completed successfully. Response length: {len(response_text)} characters") + logger.debug(f"Response preview: {response_text[:100]}..." if len(response_text) > 100 else f"Response: {response_text}") return { "status": "completed", @@ -270,6 +943,7 @@ async def _run_tool_http(name: str, arguments: dict) -> Dict[str, Any]: "timestamp": _now(), } except Exception as e: + logger.error(f"Inference failed: {str(e)}", exc_info=True) return { "status": "failed", "error": str(e), @@ -283,37 +957,61 @@ async def _run_tool_http(name: str, arguments: dict) -> Dict[str, Any]: @app.call_tool() async def call_tool(name: str, arguments: dict): + logger.info(f"MCP call_tool: {name}") result = await _run_tool_http(name, arguments) + logger.debug(f"MCP call_tool result for {name}: {result}") return [TextContent(type="text", text=json.dumps(result, indent=2))] async def handle_execute(request: web.Request) -> web.Response: + logger.info("HTTP /execute request received") + execution_id = None try: payload = await request.json() tool = payload.get("tool") arguments = payload.get("arguments", {}) + execution_id = arguments.get("execution_id") or arguments.get("name", "unknown") + logger.info(f"HTTP execute: tool={tool}, execution_id={execution_id}") + logger.debug(f"HTTP execute arguments: {arguments}") if not tool: + logger.error("Missing 'tool' field in request") return web.json_response( {"error": "Missing 'tool' field"}, status=400 ) + logger.info(f"Calling _run_tool_http for {tool}...") result = await _run_tool_http(tool, arguments) + logger.info(f"HTTP execute completed for {tool} with status={result.get('status')}") + logger.debug(f"HTTP execute result: {result}") return web.json_response(result) - except json.JSONDecodeError: - return web.json_response({"error": "Invalid JSON"}, status=400) + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON in request: {str(e)}") + return web.json_response({"error": f"Invalid JSON: {str(e)}"}, status=400) except Exception as e: - return web.json_response({"error": str(e)}, status=500) + error_msg = str(e) if str(e) else f"Unknown error: {type(e).__name__}" + error_traceback = traceback.format_exc() + logger.error(f"Unexpected error in /execute (execution_id={execution_id}): {error_msg}") + logger.error(f"Traceback:\n{error_traceback}") + return web.json_response({ + "status": "failed", + "error": error_msg, + "error_type": type(e).__name__, + "traceback": error_traceback, + "execution_id": execution_id, + }, status=500) async def handle_health(request: web.Request) -> web.Response: + logger.debug("Health check requested") return web.json_response({"status": "healthy"}) async def run_http_server(): host = os.getenv("MCP_HTTP_HOST", "0.0.0.0") port = int(os.getenv("MCP_HTTP_PORT", "8001")) + logger.info(f"Starting HTTP server on {host}:{port}") app_http = web.Application() app_http.router.add_post("/execute", handle_execute) @@ -324,9 +1022,17 @@ async def run_http_server(): site = web.TCPSite(runner, host, port) await site.start() + logger.info(f"HTTP server running on {host}:{port}") print(f"HTTP server running on {host}:{port}", file=sys.stderr) await asyncio.Event().wait() if __name__ == "__main__": - asyncio.run(run_http_server()) + logger.info("Starting MCP Server...") + try: + asyncio.run(run_http_server()) + except KeyboardInterrupt: + logger.info("MCP Server interrupted by user") + except Exception as e: + logger.error(f"MCP Server error: {str(e)}", exc_info=True) + sys.exit(1)