|
|
from typing import Dict |
|
|
from transformers import pipeline |
|
|
from markdown_it import MarkdownIt |
|
|
from smolagents.tools import Tool |
|
|
import torchcodec |
|
|
|
|
|
|
|
|
class VisitWebpageTool(Tool): |
|
|
name = "visit_webpage" |
|
|
description = ( |
|
|
"Visits a web page at the given url and reads its content as a markdown string and store it to a file" |
|
|
) |
|
|
inputs = { |
|
|
"url": { |
|
|
"type": "string", |
|
|
"description": "The url of the webpage to visit.", |
|
|
}, |
|
|
} |
|
|
output_type = "string" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
file_name: str = "web_content.md", |
|
|
user_agent: str = "agent-course" |
|
|
): |
|
|
super().__init__() |
|
|
self.file_name = file_name |
|
|
self.headers = {"User-Agent": user_agent} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _inspect(self, doc: str) -> str: |
|
|
mdit = MarkdownIt() |
|
|
tokens = mdit.parse(doc) |
|
|
content_table = "" |
|
|
for token in tokens: |
|
|
if token.type == "heading_open": |
|
|
level = int(token.tag[-1]) - 1 |
|
|
text = token.map and tokens[tokens.index(token) + 1].content |
|
|
content_table += " " * level + text + "\n" |
|
|
return content_table |
|
|
|
|
|
def forward(self, url: str) -> str: |
|
|
try: |
|
|
import re |
|
|
import requests |
|
|
from markdownify import markdownify |
|
|
from requests.exceptions import RequestException |
|
|
except ImportError as e: |
|
|
raise ImportError( |
|
|
"You must install packages `markdownify` and `requests` to run this tool: for instance run `pip install markdownify requests`." |
|
|
) from e |
|
|
try: |
|
|
|
|
|
response = requests.get(url, timeout=20, headers=self.headers) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
markdown_content = markdownify(response.text).strip() |
|
|
|
|
|
|
|
|
markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content) |
|
|
with open(self.file_name, "w") as f: |
|
|
f.write(markdown_content) |
|
|
try: |
|
|
content_summary = self._inspect(markdown_content) |
|
|
return f"Web page content saved in '{self.file_name}'. The content has the following section tree:\n {content_summary}. To read the full website content you can call 'read_mddoc('web_content.md')'" |
|
|
except Exception: |
|
|
return f"Web page content saved in {self.file_name}." |
|
|
except requests.exceptions.Timeout: |
|
|
return "The request timed out. Please try again later or check the URL." |
|
|
except RequestException as e: |
|
|
return f"Error fetching the webpage: {str(e)}" |
|
|
except Exception as e: |
|
|
return f"An unexpected error occurred: {str(e)}" |
|
|
|
|
|
|
|
|
class SpeechToTextTool(Tool): |
|
|
name = "transcriber" |
|
|
description = "This is a tool that transcribes an audio into text. It returns the transcribed text." |
|
|
inputs = { |
|
|
"audio": { |
|
|
"type": "audio", |
|
|
"description": "The audio to transcribe it should be bytes.", |
|
|
}, |
|
|
"sample_rate": { |
|
|
"type": "integer", |
|
|
"description": "The sampling rate to use to decode the audio, defaults to 16000", |
|
|
"nullable": True |
|
|
} |
|
|
} |
|
|
output_type = "string" |
|
|
def __init__(self, model: str = "openai/whisper-small"): |
|
|
super().__init__() |
|
|
self.pipe = pipeline("automatic-speech-recognition", model=model) |
|
|
|
|
|
def forward(self, audio: bytes, sample_rate: int=16000) -> str: |
|
|
sample_rate = sample_rate if sample_rate is not None else 16000 |
|
|
decoder = torchcodec.decoders.AudioDecoder(audio, sample_rate=sample_rate) |
|
|
out = self.pipe(decoder) |
|
|
return out["text"] |
|
|
|
|
|
class SpeechToTextTool(Tool): |
|
|
name = "transcriber" |
|
|
description = "This is a tool that transcribes an audio into text. It returns the transcribed text." |
|
|
inputs = { |
|
|
"audio_file": { |
|
|
"type": "string", |
|
|
"description": "The path to the audio file to transcribe.", |
|
|
}, |
|
|
"sample_rate": { |
|
|
"type": "integer", |
|
|
"description": "The sampling rate to use to decode the audio, defaults to 16000", |
|
|
"nullable": True |
|
|
} |
|
|
} |
|
|
output_type = "string" |
|
|
def __init__(self, model: str = "openai/whisper-small"): |
|
|
super().__init__() |
|
|
self.pipe = pipeline("automatic-speech-recognition", model=model) |
|
|
|
|
|
def forward(self, audio_file: str, sample_rate: int=16000) -> str: |
|
|
try: |
|
|
sample_rate = sample_rate if sample_rate is not None else 16000 |
|
|
with open(audio_file, "rb") as f: |
|
|
decoder = torchcodec.decoders.AudioDecoder(f, sample_rate=sample_rate) |
|
|
audio_length = decoder.get_all_samples().data.shape[1] |
|
|
out = self.pipe(decoder) |
|
|
return out["text"] |
|
|
except ValueError as e: |
|
|
max_length = 300000 |
|
|
suggest_sample_rate = int(sample_rate * max_length/audio_length) |
|
|
return f"The audio file to transcribe is too long, number of samples {audio_length}. You used a sample_rate of {sample_rate}, try using a smaller sample rate, like {suggest_sample_rate}" |
|
|
except Exception as e: |
|
|
raise e |
|
|
|
|
|
|
|
|
class ReadMdDoc(Tool): |
|
|
name = "read_mddoc" |
|
|
description = ( |
|
|
"Read an entire markdown file or a specific section of it." |
|
|
) |
|
|
inputs = { |
|
|
"file_name": { |
|
|
"type": "string", |
|
|
"description": "The file to read it should have 'md' extension.", |
|
|
}, |
|
|
"section": { |
|
|
"type": "string", |
|
|
"nullable": True, |
|
|
"description": "If you want to read the entire file set this to 'all'. Otherwise you can look for a specific section title." |
|
|
}, |
|
|
"max_length":{ |
|
|
"type": "integer", |
|
|
"nullable": True, |
|
|
"description": "The maximum number of characters to return if the content has more characters it will be truncated. Use 40000 as a default." |
|
|
} |
|
|
} |
|
|
output_type = "string" |
|
|
|
|
|
def __init__(self): |
|
|
super().__init__() |
|
|
|
|
|
def _truncate_content(self, content: str, max_length: int) -> str: |
|
|
if len(content) <= max_length: |
|
|
return content |
|
|
return ( |
|
|
content[:max_length] + f"\n..._This content has been truncated to stay below {max_length} characters_...\n Does it have the information you need otherwise increase the max_length." |
|
|
) |
|
|
|
|
|
def get_token_map(self, tokens): |
|
|
token_map = defaultdict(list) |
|
|
stack = [] |
|
|
for i, token in enumerate(tokens): |
|
|
if token.type == "heading_open": |
|
|
text = token.map and tokens[tokens.index(token) + 1].content |
|
|
token_map[text].append(i) |
|
|
level = int(token.tag[-1]) |
|
|
while stack and level <= stack[-1][-1]: |
|
|
key, _ = stack.pop() |
|
|
token_map[key].append(i) |
|
|
stack.append((text, level)) |
|
|
while stack: |
|
|
text, _ = stack.pop() |
|
|
token_map[text].append(i) |
|
|
return token_map |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
file_name: str, |
|
|
section: str = "all", |
|
|
max_length: int = 40000): |
|
|
try: |
|
|
with open(file_name, "r") as f: |
|
|
doc = f.read() |
|
|
except FileNotFoundError: |
|
|
return f"Can't find {file_name}, are you sure the file exists and that you have spelled it crrectly?" |
|
|
try: |
|
|
mdit = MarkdownIt() |
|
|
tokens = mdit.parse(doc) |
|
|
except Exception: |
|
|
return "Error using the markdown parser, are you sure the file is in markdown format?" |
|
|
token_map = self.get_token_map(tokens) |
|
|
token_map["all"] = [0, len(tokens)] |
|
|
if section in token_map: |
|
|
start, end = tuple(token_map[section]) |
|
|
content = "\n".join([t.content for t in tokens[start:end]]) |
|
|
return self._truncate_content(content, max_length) |
|
|
else: |
|
|
return f"The required Section is not found in the document. The available sections are:\n {list(token_map.keys())}. If you don't see what you are looking for here, you can try returning all the document using setting argument section to 'all'" |