File size: 8,276 Bytes
7993ea5
 
7e1bf0b
7993ea5
 
 
 
7e1bf0b
 
7993ea5
7e1bf0b
7993ea5
 
 
 
 
 
 
 
 
7e1bf0b
 
 
 
 
7993ea5
7e1bf0b
7993ea5
 
7e1bf0b
 
 
 
 
 
7993ea5
7e1bf0b
 
 
 
 
 
 
 
 
 
 
 
7993ea5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7e1bf0b
7993ea5
 
7e1bf0b
 
 
 
 
 
 
7993ea5
 
 
 
 
 
 
7e1bf0b
7993ea5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7e1bf0b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
from typing import Dict
from transformers import pipeline
from markdown_it import MarkdownIt
from smolagents.tools import Tool
import torchcodec


class VisitWebpageTool(Tool):
    name = "visit_webpage"
    description = (
        "Visits a web page at the given url and reads its content as a markdown string and store it to a file"
    )
    inputs = {
        "url": {
            "type": "string",
            "description": "The url of the webpage to visit.",
        },
    }
    output_type = "string"

    def __init__(
        self,
        file_name: str = "web_content.md",
        user_agent: str = "agent-course"
        ):
        super().__init__()
        self.file_name = file_name
        self.headers = {"User-Agent": user_agent}

    #def _truncate_content(self, content: str, max_length: int) -> str:
    #    if len(content) <= max_length:
    #        return content
    #    return (
    #        content[:max_length] + f"\n..._This content has been truncated to stay below {max_length} characters_...\n"
    #    )

    def _inspect(self, doc: str) -> str:
      mdit = MarkdownIt()
      tokens = mdit.parse(doc)
      content_table = ""
      for token in tokens:
        if token.type == "heading_open":
          level = int(token.tag[-1]) - 1
          text = token.map and tokens[tokens.index(token) + 1].content
          content_table += "   " * level + text + "\n"
      return content_table

    def forward(self, url: str) -> str:
        try:
            import re
            import requests
            from markdownify import markdownify
            from requests.exceptions import RequestException
        except ImportError as e:
            raise ImportError(
                "You must install packages `markdownify` and `requests` to run this tool: for instance run `pip install markdownify requests`."
            ) from e
        try:
            # Send a GET request to the URL with a 20-second timeout
            response = requests.get(url, timeout=20, headers=self.headers)
            response.raise_for_status()  # Raise an exception for bad status codes

            # Convert the HTML content to Markdown
            markdown_content = markdownify(response.text).strip()

            # Remove multiple line breaks
            markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content)
            with open(self.file_name, "w") as f:
              f.write(markdown_content)
            try:
              content_summary = self._inspect(markdown_content)
              return f"Web page content saved in '{self.file_name}'. The content has the following section tree:\n {content_summary}. To read the full website content you can call 'read_mddoc('web_content.md')'"
            except Exception:
              return f"Web page content saved in {self.file_name}."
        except requests.exceptions.Timeout:
            return "The request timed out. Please try again later or check the URL."
        except RequestException as e:
            return f"Error fetching the webpage: {str(e)}"
        except Exception as e:
            return f"An unexpected error occurred: {str(e)}"
        
        
class SpeechToTextTool(Tool):
  name = "transcriber"
  description = "This is a tool that transcribes an audio into text. It returns the transcribed text."
  inputs = {
      "audio": {
          "type": "audio",
          "description": "The audio to transcribe it should be bytes.",
      },
      "sample_rate": {
          "type": "integer",
          "description": "The sampling rate to use to decode the audio, defaults to 16000",
          "nullable": True
      }
  }
  output_type = "string"
  def __init__(self, model: str = "openai/whisper-small"):
    super().__init__()
    self.pipe = pipeline("automatic-speech-recognition", model=model)

  def forward(self, audio: bytes, sample_rate: int=16000) -> str:
    sample_rate = sample_rate if sample_rate is not None else 16000
    decoder = torchcodec.decoders.AudioDecoder(audio, sample_rate=sample_rate)
    out = self.pipe(decoder)
    return out["text"]
  
class SpeechToTextTool(Tool):
  name = "transcriber"
  description = "This is a tool that transcribes an audio into text. It returns the transcribed text."
  inputs = {
      "audio_file": {
          "type": "string",
          "description": "The path to the audio file to transcribe.",
      },
      "sample_rate": {
          "type": "integer",
          "description": "The sampling rate to use to decode the audio, defaults to 16000",
          "nullable": True
      }
  }
  output_type = "string"
  def __init__(self, model: str = "openai/whisper-small"):
    super().__init__()
    self.pipe = pipeline("automatic-speech-recognition", model=model)

  def forward(self, audio_file: str, sample_rate: int=16000) -> str:
    try:
      sample_rate = sample_rate if sample_rate is not None else 16000
      with open(audio_file, "rb") as f:
        decoder = torchcodec.decoders.AudioDecoder(f, sample_rate=sample_rate)
        audio_length = decoder.get_all_samples().data.shape[1]
        out = self.pipe(decoder)
      return out["text"]
    except ValueError as e:
        max_length = 300000
        suggest_sample_rate = int(sample_rate * max_length/audio_length)
        return f"The audio file to transcribe is too long, number of samples {audio_length}. You used a sample_rate of {sample_rate}, try using a smaller sample rate, like {suggest_sample_rate}"
    except Exception as e:
      raise e


class ReadMdDoc(Tool):
  name = "read_mddoc"
  description = (
        "Read an entire markdown file or a specific section of it."
    )
  inputs = {
        "file_name": {
            "type": "string",
            "description": "The file to read it should have 'md' extension.",
        },
        "section": {
            "type": "string",
            "nullable": True,
            "description": "If you want to read the entire file set this to 'all'. Otherwise you can look for a specific section title."
        },
        "max_length":{
            "type": "integer",
            "nullable": True,
            "description": "The maximum number of characters to return if the content has more characters it will be truncated. Use 40000 as a default."
        }
    }
  output_type = "string"

  def __init__(self):
    super().__init__()

  def _truncate_content(self, content: str, max_length: int) -> str:
      if len(content) <= max_length:
          return content
      return (
          content[:max_length] + f"\n..._This content has been truncated to stay below {max_length} characters_...\n Does it have the information you need otherwise increase the max_length."
      )

  def get_token_map(self, tokens):
    token_map = defaultdict(list)
    stack = []
    for i, token in enumerate(tokens):
      if token.type == "heading_open":
        text = token.map and tokens[tokens.index(token) + 1].content
        token_map[text].append(i)
        level = int(token.tag[-1])
        while stack and level <= stack[-1][-1]:
          key, _ = stack.pop()
          token_map[key].append(i)
        stack.append((text, level))
    while stack:
      text, _ = stack.pop()
      token_map[text].append(i)
    return token_map

  def forward(
      self,
      file_name: str,
      section: str = "all",
      max_length: int = 40000):
    try:
      with open(file_name, "r") as f:
        doc = f.read()
    except FileNotFoundError:
      return f"Can't find {file_name}, are you sure the file exists and that you have spelled it crrectly?"
    try:
      mdit = MarkdownIt()
      tokens = mdit.parse(doc)
    except Exception:
      return "Error using the markdown parser, are you sure the file is in markdown format?"
    token_map = self.get_token_map(tokens)
    token_map["all"] = [0, len(tokens)]
    if section in token_map:
      start, end = tuple(token_map[section])
      content = "\n".join([t.content for t in tokens[start:end]])
      return self._truncate_content(content, max_length)
    else:
      return f"The required Section is not found in the document. The available sections are:\n {list(token_map.keys())}. If you don't see what you are looking for here, you can try returning all the document using setting argument section to 'all'"