File size: 8,276 Bytes
7993ea5 7e1bf0b 7993ea5 7e1bf0b 7993ea5 7e1bf0b 7993ea5 7e1bf0b 7993ea5 7e1bf0b 7993ea5 7e1bf0b 7993ea5 7e1bf0b 7993ea5 7e1bf0b 7993ea5 7e1bf0b 7993ea5 7e1bf0b 7993ea5 7e1bf0b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
from typing import Dict
from transformers import pipeline
from markdown_it import MarkdownIt
from smolagents.tools import Tool
import torchcodec
class VisitWebpageTool(Tool):
name = "visit_webpage"
description = (
"Visits a web page at the given url and reads its content as a markdown string and store it to a file"
)
inputs = {
"url": {
"type": "string",
"description": "The url of the webpage to visit.",
},
}
output_type = "string"
def __init__(
self,
file_name: str = "web_content.md",
user_agent: str = "agent-course"
):
super().__init__()
self.file_name = file_name
self.headers = {"User-Agent": user_agent}
#def _truncate_content(self, content: str, max_length: int) -> str:
# if len(content) <= max_length:
# return content
# return (
# content[:max_length] + f"\n..._This content has been truncated to stay below {max_length} characters_...\n"
# )
def _inspect(self, doc: str) -> str:
mdit = MarkdownIt()
tokens = mdit.parse(doc)
content_table = ""
for token in tokens:
if token.type == "heading_open":
level = int(token.tag[-1]) - 1
text = token.map and tokens[tokens.index(token) + 1].content
content_table += " " * level + text + "\n"
return content_table
def forward(self, url: str) -> str:
try:
import re
import requests
from markdownify import markdownify
from requests.exceptions import RequestException
except ImportError as e:
raise ImportError(
"You must install packages `markdownify` and `requests` to run this tool: for instance run `pip install markdownify requests`."
) from e
try:
# Send a GET request to the URL with a 20-second timeout
response = requests.get(url, timeout=20, headers=self.headers)
response.raise_for_status() # Raise an exception for bad status codes
# Convert the HTML content to Markdown
markdown_content = markdownify(response.text).strip()
# Remove multiple line breaks
markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content)
with open(self.file_name, "w") as f:
f.write(markdown_content)
try:
content_summary = self._inspect(markdown_content)
return f"Web page content saved in '{self.file_name}'. The content has the following section tree:\n {content_summary}. To read the full website content you can call 'read_mddoc('web_content.md')'"
except Exception:
return f"Web page content saved in {self.file_name}."
except requests.exceptions.Timeout:
return "The request timed out. Please try again later or check the URL."
except RequestException as e:
return f"Error fetching the webpage: {str(e)}"
except Exception as e:
return f"An unexpected error occurred: {str(e)}"
class SpeechToTextTool(Tool):
name = "transcriber"
description = "This is a tool that transcribes an audio into text. It returns the transcribed text."
inputs = {
"audio": {
"type": "audio",
"description": "The audio to transcribe it should be bytes.",
},
"sample_rate": {
"type": "integer",
"description": "The sampling rate to use to decode the audio, defaults to 16000",
"nullable": True
}
}
output_type = "string"
def __init__(self, model: str = "openai/whisper-small"):
super().__init__()
self.pipe = pipeline("automatic-speech-recognition", model=model)
def forward(self, audio: bytes, sample_rate: int=16000) -> str:
sample_rate = sample_rate if sample_rate is not None else 16000
decoder = torchcodec.decoders.AudioDecoder(audio, sample_rate=sample_rate)
out = self.pipe(decoder)
return out["text"]
class SpeechToTextTool(Tool):
name = "transcriber"
description = "This is a tool that transcribes an audio into text. It returns the transcribed text."
inputs = {
"audio_file": {
"type": "string",
"description": "The path to the audio file to transcribe.",
},
"sample_rate": {
"type": "integer",
"description": "The sampling rate to use to decode the audio, defaults to 16000",
"nullable": True
}
}
output_type = "string"
def __init__(self, model: str = "openai/whisper-small"):
super().__init__()
self.pipe = pipeline("automatic-speech-recognition", model=model)
def forward(self, audio_file: str, sample_rate: int=16000) -> str:
try:
sample_rate = sample_rate if sample_rate is not None else 16000
with open(audio_file, "rb") as f:
decoder = torchcodec.decoders.AudioDecoder(f, sample_rate=sample_rate)
audio_length = decoder.get_all_samples().data.shape[1]
out = self.pipe(decoder)
return out["text"]
except ValueError as e:
max_length = 300000
suggest_sample_rate = int(sample_rate * max_length/audio_length)
return f"The audio file to transcribe is too long, number of samples {audio_length}. You used a sample_rate of {sample_rate}, try using a smaller sample rate, like {suggest_sample_rate}"
except Exception as e:
raise e
class ReadMdDoc(Tool):
name = "read_mddoc"
description = (
"Read an entire markdown file or a specific section of it."
)
inputs = {
"file_name": {
"type": "string",
"description": "The file to read it should have 'md' extension.",
},
"section": {
"type": "string",
"nullable": True,
"description": "If you want to read the entire file set this to 'all'. Otherwise you can look for a specific section title."
},
"max_length":{
"type": "integer",
"nullable": True,
"description": "The maximum number of characters to return if the content has more characters it will be truncated. Use 40000 as a default."
}
}
output_type = "string"
def __init__(self):
super().__init__()
def _truncate_content(self, content: str, max_length: int) -> str:
if len(content) <= max_length:
return content
return (
content[:max_length] + f"\n..._This content has been truncated to stay below {max_length} characters_...\n Does it have the information you need otherwise increase the max_length."
)
def get_token_map(self, tokens):
token_map = defaultdict(list)
stack = []
for i, token in enumerate(tokens):
if token.type == "heading_open":
text = token.map and tokens[tokens.index(token) + 1].content
token_map[text].append(i)
level = int(token.tag[-1])
while stack and level <= stack[-1][-1]:
key, _ = stack.pop()
token_map[key].append(i)
stack.append((text, level))
while stack:
text, _ = stack.pop()
token_map[text].append(i)
return token_map
def forward(
self,
file_name: str,
section: str = "all",
max_length: int = 40000):
try:
with open(file_name, "r") as f:
doc = f.read()
except FileNotFoundError:
return f"Can't find {file_name}, are you sure the file exists and that you have spelled it crrectly?"
try:
mdit = MarkdownIt()
tokens = mdit.parse(doc)
except Exception:
return "Error using the markdown parser, are you sure the file is in markdown format?"
token_map = self.get_token_map(tokens)
token_map["all"] = [0, len(tokens)]
if section in token_map:
start, end = tuple(token_map[section])
content = "\n".join([t.content for t in tokens[start:end]])
return self._truncate_content(content, max_length)
else:
return f"The required Section is not found in the document. The available sections are:\n {list(token_map.keys())}. If you don't see what you are looking for here, you can try returning all the document using setting argument section to 'all'" |