mcp-server实现图片自动转换

需求背景

将已有的网络图片地址转换为github镜像仓库的地址,插入markdown文档中

使用框架

mcp.server.fastmcp

代码逻辑

  1. 创建github仓库并获取对应仓库的token

  2. 判断为网络图片还是本地图片。

    1. 网络图片则首选获取其二进制内容
    2. 本地图片直接读取二进制内容
  3. 从地址或url中获取文件的格式并保存,如获取不到则默认为png

  4. 获取文件二进制内容的哈希值,作为文件名。

  5. 获取当前年月,按照’年-月/文件hash‘格式上传到代码仓库路径

  6. 上传文件前首先尝试调用githu api获取文件在githu仓库的hash值

    1. 如果上一步的hash存在,则是更新文件,需要在body体中添加’sha‘字段,
    2. 如果hash不存在则为新文件
  7. 发送文件上传请求,获取返回值

  8. 结果拼接

    1. download_url为文件的直接下载地址
    2. cdn_url用于展示加速,其拼接逻辑为:https://cdn.jsdelivr.net/gh/{repo}@{branch}/{target_path}
    3. 基于cdn_url拼接一个markdown格式文件标签
  9. 天机mcp协议支持

    1. 实例化mcp: mcp = FastMCP()
    2. 使用 @mcp.tool将需要转换的函数工具化
    3. 在main函数中实现mcp server,建议使用stdio本地调用方式,相对安全
  10. 在客户端实现调用 以 cherry studio为例,分别填下如下内容: 图 0

  11. 撰写prompt

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    任务:调用github upload 工具,将用户输入的图片地址或传入的图片附件传入到github仓库,并将返回的结果以j字符串形式输出
    执行要求:
    如果传入的是图片那么将图片地址直接传给工具进行使用,且不需要将图片传给模型。
    
    输出要求:
    不展示图片,返回工具的json格式执行结果
    
    
    /no_think
    
  12. 效果测试 图 1

代码全文

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176

import argparse
import urllib.request
import urllib.error
import urllib.parse
import json
import base64
import os
import re
import hashlib
from datetime import datetime
from mcp.server.fastmcp import FastMCP
mcp = FastMCP() 
# 内置配置
DEFAULT_REPO = ""  # 替换为你的仓库名
DEFAULT_TOKEN = ""       # 替换为你的GitHub个人访问令牌

def is_url(path):
    """改进的URL检测函数"""
    try:
        result = urllib.parse.urlparse(path)
        return all([result.scheme in ('http', 'https'), result.netloc])
    except:
        return False

def get_image_content(image_path):
    """改进的内容获取函数"""
    try:
        if is_url(image_path):
            # 对URL进行安全编码处理
            safe_url = urllib.parse.quote(image_path, safe=':/?&=')
            with urllib.request.urlopen(safe_url) as response:
                return response.read()
        else:
            with open(image_path, "rb") as f:
                return f.read()
    except Exception as e:
        raise ValueError(f"读取失败: {str(e)}")


def get_file_sha(repo, token, branch, path):
    """
    获取指定GitHub仓库中文件的SHA值。

    Args:
        repo (str): GitHub仓库的名称,格式为'用户名/仓库名'。
        token (str): GitHub的Personal Access Token。
        branch (str): 仓库的分支名。
        path (str): 文件在仓库中的路径。

    Returns:
        str or None: 文件的SHA值,如果获取失败则返回None。

    """
    url = f"https://api.github.com/repos/{repo}/contents/{path}"
    headers = {
        "Authorization": f"token {token}",
        "Accept": "application/vnd.github.v3+json"
    }
    params = {"ref": branch}
    # 手动拼接参数到URL
    query_string = urllib.parse.urlencode(params)
    url_with_params = f"{url}?{query_string}"
    rs =None
    req = urllib.request.Request(url_with_params, headers=headers, method="GET", data=None)
    try:
        with urllib.request.urlopen(req) as response:
            data = json.loads(response.read().decode('utf-8'))
            rs = data["sha"] if "sha" in data else None  
    finally:
        return rs

def calculate_sha1(content):
    """计算文件内容的SHA1哈希值"""
    sha1 = hashlib.sha1()
    sha1.update(content)
    return sha1.hexdigest()




@mcp.tool()
def upload_file(image_path, repo=DEFAULT_REPO, token=DEFAULT_TOKEN, branch="main"):
    """
    Uploads an image file to a specified GitHub repository.
    Args:
        image_path (str): The local file path or URL of the image to upload.
        repo (str, optional): The GitHub repository in the format 'owner/repo'. Defaults to DEFAULT_REPO.
        token (str, optional): The GitHub personal access token for authentication. Defaults to DEFAULT_TOKEN.
        branch (str, optional): The branch to upload the file to. Defaults to "main".
    Returns:
        dict: The JSON response from the GitHub API after uploading the file.
    Raises:
        RuntimeError: If the upload fails or the GitHub API returns an error.
    """
    """上传图片到GitHub仓库"""
    try:
        content = get_image_content(image_path)
        # 计算SHA1哈希值
        file_hash = calculate_sha1(content)

        if is_url(image_path):
            # 尝试从URL中提取扩展名
            parsed = urllib.parse.urlparse(image_path)
            base_name = os.path.basename(parsed.path)
            _, ext = os.path.splitext(base_name)
            filename = f"{file_hash}.{ext}" if ext else f"{file_hash}.png"
        else:
            # 从本地路径获取扩展名
            _, ext = os.path.splitext(image_path)
            filename = f"{file_hash}.{ext}" if ext else f"{file_hash}.png"
        now = datetime.now()
        year_month = now.strftime("%Y-%m")
        target_path = f"{year_month}/{filename}"
        url = f"https://api.github.com/repos/{repo}/contents/{target_path}"
        headers = {
            "Authorization": f"token {token}",
            "Accept": "application/vnd.github.v3+json",
            "User-Agent": "Python-urllib"
        }

        data = {
            "message": f"Add {filename}",
            "content": base64.b64encode(content).decode('utf-8'),
            "branch": branch

        }
        current_sha = get_file_sha(repo, token, branch, target_path)
        # 如果文件不存在,current_sha 可能为 None,需要根据业务逻辑处理
        # 例如,可以决定创建一个新文件而不是更新一个不存在的文件
        if current_sha :
            data["sha"] = current_sha


        req = urllib.request.Request(
            url,
            data=json.dumps(data).encode("utf-8"),
            headers=headers,
            method="PUT"
        )

        with urllib.request.urlopen(req) as response:
            rs = json.loads(response.read().decode("utf-8"))
            file_url = rs['content']['download_url']
            cdn_url= f"https://cdn.jsdelivr.net/gh/{repo}@{branch}/{target_path}"



            return {
                'file_url' : file_url,
                'cdn_url':cdn_url,
                'md_cdn':f'![{filename}]({cdn_url})'
            }


    except urllib.error.HTTPError as e:
        error_msg = e.read().decode('utf-8')
        raise RuntimeError(f"GitHub API错误: {error_msg}")
    except Exception as e:
        raise RuntimeError(f"上传失败: {str(e)}")

def main():
    parser = argparse.ArgumentParser(description="GitHub图片上传工具")
    parser.add_argument("image_path", help="要上传的图片路径(本地路径或URL)")
    args = parser.parse_args()

    try:
        result = upload_file(args.image_path)
        print(result)
        print(f"图片上传成功: {result['content']['html_url']}")
    except Exception as e:
        print(f"错误: {str(e)}")


if __name__ == "__main__":
    mcp.run(transport='stdio')
使用 Hugo 构建
主题 StackJimmy 设计