alberto
tool improvement, smaller model
7e1bf0b
from typing import Dict
from transformers import pipeline
from markdown_it import MarkdownIt
from smolagents.tools import Tool
import torchcodec
class VisitWebpageTool(Tool):
name = "visit_webpage"
description = (
"Visits a web page at the given url and reads its content as a markdown string and store it to a file"
)
inputs = {
"url": {
"type": "string",
"description": "The url of the webpage to visit.",
},
}
output_type = "string"
def __init__(
self,
file_name: str = "web_content.md",
user_agent: str = "agent-course"
):
super().__init__()
self.file_name = file_name
self.headers = {"User-Agent": user_agent}
#def _truncate_content(self, content: str, max_length: int) -> str:
# if len(content) <= max_length:
# return content
# return (
# content[:max_length] + f"\n..._This content has been truncated to stay below {max_length} characters_...\n"
# )
def _inspect(self, doc: str) -> str:
mdit = MarkdownIt()
tokens = mdit.parse(doc)
content_table = ""
for token in tokens:
if token.type == "heading_open":
level = int(token.tag[-1]) - 1
text = token.map and tokens[tokens.index(token) + 1].content
content_table += " " * level + text + "\n"
return content_table
def forward(self, url: str) -> str:
try:
import re
import requests
from markdownify import markdownify
from requests.exceptions import RequestException
except ImportError as e:
raise ImportError(
"You must install packages `markdownify` and `requests` to run this tool: for instance run `pip install markdownify requests`."
) from e
try:
# Send a GET request to the URL with a 20-second timeout
response = requests.get(url, timeout=20, headers=self.headers)
response.raise_for_status() # Raise an exception for bad status codes
# Convert the HTML content to Markdown
markdown_content = markdownify(response.text).strip()
# Remove multiple line breaks
markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content)
with open(self.file_name, "w") as f:
f.write(markdown_content)
try:
content_summary = self._inspect(markdown_content)
return f"Web page content saved in '{self.file_name}'. The content has the following section tree:\n {content_summary}. To read the full website content you can call 'read_mddoc('web_content.md')'"
except Exception:
return f"Web page content saved in {self.file_name}."
except requests.exceptions.Timeout:
return "The request timed out. Please try again later or check the URL."
except RequestException as e:
return f"Error fetching the webpage: {str(e)}"
except Exception as e:
return f"An unexpected error occurred: {str(e)}"
class SpeechToTextTool(Tool):
name = "transcriber"
description = "This is a tool that transcribes an audio into text. It returns the transcribed text."
inputs = {
"audio": {
"type": "audio",
"description": "The audio to transcribe it should be bytes.",
},
"sample_rate": {
"type": "integer",
"description": "The sampling rate to use to decode the audio, defaults to 16000",
"nullable": True
}
}
output_type = "string"
def __init__(self, model: str = "openai/whisper-small"):
super().__init__()
self.pipe = pipeline("automatic-speech-recognition", model=model)
def forward(self, audio: bytes, sample_rate: int=16000) -> str:
sample_rate = sample_rate if sample_rate is not None else 16000
decoder = torchcodec.decoders.AudioDecoder(audio, sample_rate=sample_rate)
out = self.pipe(decoder)
return out["text"]
class SpeechToTextTool(Tool):
name = "transcriber"
description = "This is a tool that transcribes an audio into text. It returns the transcribed text."
inputs = {
"audio_file": {
"type": "string",
"description": "The path to the audio file to transcribe.",
},
"sample_rate": {
"type": "integer",
"description": "The sampling rate to use to decode the audio, defaults to 16000",
"nullable": True
}
}
output_type = "string"
def __init__(self, model: str = "openai/whisper-small"):
super().__init__()
self.pipe = pipeline("automatic-speech-recognition", model=model)
def forward(self, audio_file: str, sample_rate: int=16000) -> str:
try:
sample_rate = sample_rate if sample_rate is not None else 16000
with open(audio_file, "rb") as f:
decoder = torchcodec.decoders.AudioDecoder(f, sample_rate=sample_rate)
audio_length = decoder.get_all_samples().data.shape[1]
out = self.pipe(decoder)
return out["text"]
except ValueError as e:
max_length = 300000
suggest_sample_rate = int(sample_rate * max_length/audio_length)
return f"The audio file to transcribe is too long, number of samples {audio_length}. You used a sample_rate of {sample_rate}, try using a smaller sample rate, like {suggest_sample_rate}"
except Exception as e:
raise e
class ReadMdDoc(Tool):
name = "read_mddoc"
description = (
"Read an entire markdown file or a specific section of it."
)
inputs = {
"file_name": {
"type": "string",
"description": "The file to read it should have 'md' extension.",
},
"section": {
"type": "string",
"nullable": True,
"description": "If you want to read the entire file set this to 'all'. Otherwise you can look for a specific section title."
},
"max_length":{
"type": "integer",
"nullable": True,
"description": "The maximum number of characters to return if the content has more characters it will be truncated. Use 40000 as a default."
}
}
output_type = "string"
def __init__(self):
super().__init__()
def _truncate_content(self, content: str, max_length: int) -> str:
if len(content) <= max_length:
return content
return (
content[:max_length] + f"\n..._This content has been truncated to stay below {max_length} characters_...\n Does it have the information you need otherwise increase the max_length."
)
def get_token_map(self, tokens):
token_map = defaultdict(list)
stack = []
for i, token in enumerate(tokens):
if token.type == "heading_open":
text = token.map and tokens[tokens.index(token) + 1].content
token_map[text].append(i)
level = int(token.tag[-1])
while stack and level <= stack[-1][-1]:
key, _ = stack.pop()
token_map[key].append(i)
stack.append((text, level))
while stack:
text, _ = stack.pop()
token_map[text].append(i)
return token_map
def forward(
self,
file_name: str,
section: str = "all",
max_length: int = 40000):
try:
with open(file_name, "r") as f:
doc = f.read()
except FileNotFoundError:
return f"Can't find {file_name}, are you sure the file exists and that you have spelled it crrectly?"
try:
mdit = MarkdownIt()
tokens = mdit.parse(doc)
except Exception:
return "Error using the markdown parser, are you sure the file is in markdown format?"
token_map = self.get_token_map(tokens)
token_map["all"] = [0, len(tokens)]
if section in token_map:
start, end = tuple(token_map[section])
content = "\n".join([t.content for t in tokens[start:end]])
return self._truncate_content(content, max_length)
else:
return f"The required Section is not found in the document. The available sections are:\n {list(token_map.keys())}. If you don't see what you are looking for here, you can try returning all the document using setting argument section to 'all'"