RafaelJaime commited on
Commit
d4174bb
·
1 Parent(s): a941c51

some tools, and openai framework to generate image

Browse files
mcp_server.py CHANGED
@@ -2,6 +2,9 @@ from mcp.server.fastmcp import FastMCP
2
  from src.utils.change_format import change_format
3
  from src.utils.image_helpers import remove_background_from_url
4
  from src.utils.visualize_image import visualize_base64_image
 
 
 
5
 
6
  mcp = FastMCP("Youtube Service")
7
 
@@ -21,6 +24,9 @@ def say_hello(name: str) -> str:
21
  mcp.add_tool(remove_background_from_url)
22
  mcp.add_tool(change_format)
23
  mcp.add_tool(visualize_base64_image)
 
 
 
24
 
25
  if __name__ == "__main__":
26
  mcp.run()
 
2
  from src.utils.change_format import change_format
3
  from src.utils.image_helpers import remove_background_from_url
4
  from src.utils.visualize_image import visualize_base64_image
5
+ from src.utils.generate_image import generate_image
6
+ from src.utils.apply_filter import apply_filter
7
+ from src.utils.add_text import add_text_to_image
8
 
9
  mcp = FastMCP("Youtube Service")
10
 
 
24
  mcp.add_tool(remove_background_from_url)
25
  mcp.add_tool(change_format)
26
  mcp.add_tool(visualize_base64_image)
27
+ mcp.add_tool(generate_image)
28
+ mcp.add_tool(apply_filter)
29
+ mcp.add_tool(add_text_to_image)
30
 
31
  if __name__ == "__main__":
32
  mcp.run()
requirements.txt CHANGED
@@ -2,4 +2,5 @@ fastmcp
2
  requests
3
  Pillow
4
  rembg
5
- onnxruntime
 
 
2
  requests
3
  Pillow
4
  rembg
5
+ onnxruntime
6
+ openai
src/utils/add_text.py ADDED
@@ -0,0 +1,68 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from PIL import Image, ImageDraw, ImageFont
2
+ import os
3
+ from typing import Optional, Tuple, Dict, Any
4
+
5
+ def add_text_to_image(
6
+ image_path: str,
7
+ text: str,
8
+ color: Optional[Tuple[int, int, int]] = None,
9
+ output_name: Optional[str] = None
10
+ ) -> Dict[str, Any]:
11
+ """
12
+ Adds centered text to an image and saves the result in the same folder.
13
+ If no output_name is provided, '_edited' is appended to the original filename.
14
+ If no color is provided, black is used by default.
15
+
16
+ Args:
17
+ image_path: Path to the original image.
18
+ text: Text to write on the image.
19
+ color: Optional RGB color of the text. Defaults to black.
20
+ output_name: Optional output filename (without extension).
21
+
22
+ Returns:
23
+ Dictionary with success status and info.
24
+ """
25
+ try:
26
+ if color is None:
27
+ color = (0, 0, 0)
28
+
29
+ image = Image.open(image_path)
30
+ draw = ImageDraw.Draw(image)
31
+ font = ImageFont.load_default()
32
+
33
+ text_width = draw.textlength(text, font=font)
34
+ x = (image.width - text_width) / 2
35
+ y = image.height / 2
36
+
37
+ draw.text((x, y), text, fill=color, font=font)
38
+
39
+ base_dir = os.path.dirname(image_path)
40
+ base_name, ext = os.path.splitext(os.path.basename(image_path))
41
+
42
+ if output_name:
43
+ new_filename = f"{output_name}{ext}"
44
+ else:
45
+ new_filename = f"{base_name}_edited{ext}"
46
+
47
+ new_path = os.path.join(base_dir, new_filename)
48
+
49
+ image.save(new_path)
50
+ output_size = os.path.getsize(new_path)
51
+
52
+ return {
53
+ "success": True,
54
+ "message": "Text added successfully to the image",
55
+ "input_path": image_path,
56
+ "output_path": new_path,
57
+ "output_size_bytes": output_size,
58
+ "text": text,
59
+ "color": color
60
+ }
61
+
62
+ except Exception as e:
63
+ return {
64
+ "success": False,
65
+ "error": str(e),
66
+ "input_path": image_path,
67
+ "output_path": None
68
+ }
src/utils/add_watermark.py ADDED
@@ -0,0 +1,56 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from PIL import Image, ImageDraw, ImageFont
2
+ import os
3
+ from typing import Dict, Any
4
+
5
+ def add_watermark(image_path: str, watermark_text: str) -> Dict[str, Any]:
6
+ try:
7
+ image = Image.open(image_path)
8
+
9
+ overlay = Image.new('RGBA', image.size, (255, 255, 255, 0))
10
+ draw = ImageDraw.Draw(overlay)
11
+
12
+ try:
13
+ font_size = min(image.width, image.height) // 20
14
+ font = ImageFont.truetype("arial.ttf", font_size)
15
+ except:
16
+ font = ImageFont.load_default()
17
+
18
+ bbox = draw.textbbox((0, 0), watermark_text, font=font)
19
+ text_width = bbox[2] - bbox[0]
20
+ text_height = bbox[3] - bbox[1]
21
+
22
+ x = (image.width - text_width) // 2
23
+ y = (image.height - text_height) // 2
24
+
25
+ text_color = (255, 255, 255, 128)
26
+
27
+ draw.text((x-2, y-2), watermark_text, fill=(0, 0, 0, 64), font=font)
28
+ draw.text((x, y), watermark_text, fill=text_color, font=font)
29
+
30
+ watermarked = Image.alpha_composite(image.convert('RGBA'), overlay)
31
+ final_image = watermarked.convert('RGB')
32
+
33
+ base_dir = os.path.dirname(image_path)
34
+ base_name, ext = os.path.splitext(os.path.basename(image_path))
35
+ new_filename = f"{base_name}_watermarked{ext}"
36
+ new_path = os.path.join(base_dir, new_filename)
37
+
38
+ final_image.save(new_path, quality=95)
39
+ output_size = os.path.getsize(new_path)
40
+
41
+ return {
42
+ "success": True,
43
+ "message": "Watermark added successfully",
44
+ "input_path": image_path,
45
+ "output_path": new_path,
46
+ "output_size_bytes": output_size,
47
+ "watermark_text": watermark_text
48
+ }
49
+
50
+ except Exception as e:
51
+ return {
52
+ "success": False,
53
+ "error": str(e),
54
+ "input_path": image_path,
55
+ "output_path": None
56
+ }
src/utils/apply_filter.py ADDED
@@ -0,0 +1,123 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from PIL import Image, ImageFilter, ImageEnhance
2
+ from io import BytesIO
3
+ import requests
4
+ import base64
5
+
6
+ def apply_filter(image_url: str, filter_type: str, intensity: float = 1.0, output_format: str = 'JPEG') -> str:
7
+ """
8
+ Apply various filters to an image from a URL.
9
+
10
+ Args:
11
+ image_url: The URL of the input image.
12
+ filter_type: Type of filter to apply ('blur', 'sharpen', 'emboss', 'edge', 'smooth',
13
+ 'brightness', 'contrast', 'saturation', 'sepia', 'grayscale').
14
+ intensity: Filter intensity (0.1 to 3.0, default 1.0).
15
+ output_format: The desired output format (e.g., 'JPEG', 'PNG').
16
+
17
+ Returns:
18
+ The filtered image as a base64-encoded string.
19
+ """
20
+
21
+ response = requests.get(image_url, timeout=30)
22
+ response.raise_for_status()
23
+
24
+ img = Image.open(BytesIO(response.content))
25
+
26
+ if img.mode != 'RGB':
27
+ img = img.convert('RGB')
28
+
29
+ if filter_type == 'blur':
30
+ img = img.filter(ImageFilter.GaussianBlur(radius=intensity))
31
+ elif filter_type == 'sharpen':
32
+ img = img.filter(ImageFilter.UnsharpMask(radius=2, percent=int(intensity * 150), threshold=3))
33
+ elif filter_type == 'emboss':
34
+ img = img.filter(ImageFilter.EMBOSS)
35
+ elif filter_type == 'edge':
36
+ img = img.filter(ImageFilter.FIND_EDGES)
37
+ elif filter_type == 'smooth':
38
+ img = img.filter(ImageFilter.SMOOTH_MORE)
39
+ elif filter_type == 'brightness':
40
+ enhancer = ImageEnhance.Brightness(img)
41
+ img = enhancer.enhance(intensity)
42
+ elif filter_type == 'contrast':
43
+ enhancer = ImageEnhance.Contrast(img)
44
+ img = enhancer.enhance(intensity)
45
+ elif filter_type == 'saturation':
46
+ enhancer = ImageEnhance.Color(img)
47
+ img = enhancer.enhance(intensity)
48
+ elif filter_type == 'sepia':
49
+ img = apply_sepia_filter(img)
50
+ elif filter_type == 'grayscale':
51
+ img = img.convert('L').convert('RGB')
52
+
53
+ output = BytesIO()
54
+ img.save(output, format=output_format, quality=95)
55
+ output.seek(0)
56
+
57
+ encoded_image = base64.b64encode(output.getvalue()).decode('utf-8')
58
+
59
+ return encoded_image
60
+
61
+ def apply_sepia_filter(img: Image.Image) -> Image.Image:
62
+ """
63
+ Apply sepia tone effect to an image.
64
+
65
+ Args:
66
+ img: PIL Image object.
67
+
68
+ Returns:
69
+ Image with sepia effect applied.
70
+ """
71
+
72
+ pixels = img.load()
73
+ width, height = img.size
74
+
75
+ for y in range(height):
76
+ for x in range(width):
77
+ r, g, b = pixels[x, y]
78
+
79
+ tr = int(0.393 * r + 0.769 * g + 0.189 * b)
80
+ tg = int(0.349 * r + 0.686 * g + 0.168 * b)
81
+ tb = int(0.272 * r + 0.534 * g + 0.131 * b)
82
+
83
+ pixels[x, y] = (min(255, tr), min(255, tg), min(255, tb))
84
+
85
+ return img
86
+
87
+ def apply_vintage_filter(image_url: str, output_format: str = 'JPEG') -> str:
88
+ """
89
+ Apply a vintage effect combining multiple filters.
90
+
91
+ Args:
92
+ image_url: The URL of the input image.
93
+ output_format: The desired output format.
94
+
95
+ Returns:
96
+ The vintage-filtered image as a base64-encoded string.
97
+ """
98
+
99
+ response = requests.get(image_url, timeout=30)
100
+ response.raise_for_status()
101
+
102
+ img = Image.open(BytesIO(response.content))
103
+
104
+ if img.mode != 'RGB':
105
+ img = img.convert('RGB')
106
+
107
+ contrast_enhancer = ImageEnhance.Contrast(img)
108
+ img = contrast_enhancer.enhance(0.8)
109
+
110
+ brightness_enhancer = ImageEnhance.Brightness(img)
111
+ img = brightness_enhancer.enhance(1.1)
112
+
113
+ img = apply_sepia_filter(img)
114
+
115
+ img = img.filter(ImageFilter.GaussianBlur(radius=0.5))
116
+
117
+ output = BytesIO()
118
+ img.save(output, format=output_format, quality=90)
119
+ output.seek(0)
120
+
121
+ encoded_image = base64.b64encode(output.getvalue()).decode('utf-8')
122
+
123
+ return encoded_image
src/utils/generate_image.py ADDED
@@ -0,0 +1,85 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import base64
3
+ from typing import Dict, Any
4
+ from openai import OpenAI
5
+
6
+ async def generate_image(
7
+ prompt: str,
8
+ output_path: str = "generated_image.png",
9
+ width: int = 1024,
10
+ height: int = 1024,
11
+ num_inference_steps: int = 28,
12
+ negative_prompt: str = "",
13
+ seed: int = -1
14
+ ) -> Dict[str, Any]:
15
+ """
16
+ Generate an image using Nebius API.
17
+
18
+ Args:
19
+ prompt: Text prompt for image generation
20
+ output_path: Path where to save the generated image
21
+ width: Image width
22
+ height: Image height
23
+ num_inference_steps: Number of inference steps
24
+ negative_prompt: Negative prompt for generation
25
+ seed: Random seed (-1 for random)
26
+
27
+ Returns:
28
+ Dictionary with result information
29
+ """
30
+
31
+ try:
32
+ client = OpenAI(
33
+ base_url="https://api.studio.nebius.com/v1/",
34
+ api_key=os.environ.get("NEBIUS_API_KEY")
35
+ )
36
+
37
+ response = client.images.generate(
38
+ model="black-forest-labs/flux-dev",
39
+ response_format="b64_json",
40
+ extra_body={
41
+ "response_extension": "png",
42
+ "width": width,
43
+ "height": height,
44
+ "num_inference_steps": num_inference_steps,
45
+ "negative_prompt": negative_prompt,
46
+ "seed": seed
47
+ },
48
+ prompt=prompt
49
+ )
50
+
51
+ image_data = base64.b64decode(response.data[0].b64_json)
52
+
53
+ with open(output_path, 'wb') as output_file:
54
+ output_file.write(image_data)
55
+
56
+ output_size = os.path.getsize(output_path)
57
+
58
+ return {
59
+ "success": True,
60
+ "message": "Image generated successfully",
61
+ "prompt": prompt,
62
+ "output_path": output_path,
63
+ "output_size_bytes": output_size,
64
+ "generation_params": {
65
+ "width": width,
66
+ "height": height,
67
+ "num_inference_steps": num_inference_steps,
68
+ "negative_prompt": negative_prompt,
69
+ "seed": seed
70
+ }
71
+ }
72
+
73
+ except Exception as e:
74
+ if "NEBIUS_API_KEY" in str(e) or not os.environ.get("NEBIUS_API_KEY"):
75
+ return {
76
+ "success": False,
77
+ "error": "NEBIUS_API_KEY environment variable not set",
78
+ "output_path": None,
79
+ 'user': os.environ.get("USER")
80
+ }
81
+ return {
82
+ "success": False,
83
+ "error": f"Failed to generate image: {str(e)}",
84
+ "output_path": None
85
+ }