File size: 11,625 Bytes
02cd084
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1adc43a
 
 
02cd084
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b258c4d
1adc43a
02cd084
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b258c4d
02cd084
 
 
 
 
 
 
 
 
 
 
b258c4d
 
 
02cd084
b258c4d
 
 
02cd084
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b258c4d
02cd084
 
b258c4d
02cd084
 
 
 
b258c4d
02cd084
 
 
 
b258c4d
02cd084
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b258c4d
02cd084
 
 
 
 
 
b258c4d
02cd084
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
import re
import random
import linecache
import subprocess
from langchain.prompts import PromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain
from langchain.llms import OpenAI
from langchain.chains import ConversationChain
from langchain.memory import ConversationBufferMemory
from langchain.prompts.chat import (
    ChatPromptTemplate,
    HumanMessagePromptTemplate,
)
from langchain.vectorstores import Chroma
from langchain.embeddings.openai import OpenAIEmbeddings
import os
from dotenv import load_dotenv
from PIL import Image

# Import required modules
from pymavlink import mavutil
import json

class PlotCreator:
    """
    PlotCreator is a class that generates Python scripts to plot MAVLink data
    provided by a user, leveraging OpenAI's models for conversational agents.
    """

    last_code = ""  # stores the last code generated
    logfile_name = ""
    script_path = ""
    plot_path = ""

    def __init__(self):
        """
        Initialize an instance of PlotCreator.
        """

        load_dotenv()  # load environment variables from a .env file

        self.model = os.getenv("OPENAI_MODEL")  # get the name of the OpenAI model to use

        # create an instance of ChatOpenAI with the specified model, maximum tokens, and temperature
        llm = ChatOpenAI(model_name=self.model, max_tokens=2000, temperature=0)

        # define the input variables and template for the prompt to generate Python scripts
        mavlink_data_prompt = PromptTemplate(
            input_variables=["data_types", "history", "human_input", "file", "output_file"],
            template="You are an AI conversation agent that will be used for generating python scripts to plot mavlink data provided by the user. Please create a python script using matplotlib and pymavlink's mavutil to plot the data provided by the user. Please do not explain the code just return the script. Please plot each independent variable over time in seconds. Please save the plot to file named {output_file} with at least 400 dpi and do not call plt.show(). please use blocking=false in your call to recv_match and be sure to break the loop if a msg in None. here are the relevant data types in the log:\n\n{data_types} \n\nChat History:\n{history} \n\nHUMAN: {human_input} \n\nplease read this data from the file {file}.",
        )

        # create an instance of LLMChain with the defined prompt and verbosity
        self.chain = LLMChain(verbose=True, llm=llm, prompt=mavlink_data_prompt)

    @staticmethod
    def extract_code_snippets(text):
        """
        Extracts code snippets from a text.

        This function searches the text for substrings enclosed in '```', which are assumed to be code snippets.

        Args:
            text (str): The text to search for code snippets.

        Returns:
            list: A list of code snippets found in the text. If no snippets are found, returns a list containing the original text.
        """

        pattern = r'```.*?\n(.*?)```'  # pattern to match code snippets enclosed in '```'
        # use regex to find all matches of the pattern in the text
        snippets = re.findall(pattern, text, re.DOTALL | re.MULTILINE)
        if len(snippets) == 0:  # if no snippets were found
            snippets = [text]  # treat the entire text as a single snippet
        return snippets

    @staticmethod
    def write_plot_script(filename, text):
        """
        Writes a script to a file.

        Args:
            filename (str): The name of the file to write the script to.
            text (str): The script to write to the file.
        """

        with open(filename, 'w') as file:  # open the file for writing
            file.write(text)  # write the script to the file

    def attempt_to_fix_sctript(self, filename, error_message):
        """
        Attempts to fix a script that caused an error.

        Args:
            filename (str): The name of the file containing the script.
            error_message (str): The error message produced by the script.

        Returns:
            list: A list containing the fixed script, or the original script with an error message if it couldn't be fixed.
        """

        # create an instance of ChatOpenAI with the specified model, maximum tokens, and temperature
        llm = ChatOpenAI(model_name=self.model , max_tokens=8000, temperature=0)

        # define the input variables and template for the prompt to generate Python scripts
        fix_plot_script_template = PromptTemplate(
            input_variables=["data_types", "error", "script"],
            template="You are an AI agent that is designed to debug scripts created to plot mavlink data using matplotlib and pymavlink's mavutil. the following script produced this error: \n\n{script}\n\nThe error is: \n\n{error}\n\n Here are message definitions that are possibly relevant for the script:\n\n {data_types}\n\n. Please fix the script so that it produces the correct plot. please return the fixed script in a markdown code block.",
        )

        # read script from file
        with open(filename, 'r') as file:
            script = file.read()

        # create an instance of LLMChain with the defined prompt and verbosity
        chain = LLMChain(verbose=True, llm=llm, prompt=fix_plot_script_template)
        try:
            response = chain.run({"data_types" : self.message_types, "error": error_message, "script": script})  # run the LLMChain with the error and script as input
        except:
            return "Sorry I couldn't fix the script. Here is the original script I tried:\n\n" + script
        print(response)
        code = PlotCreator.extract_code_snippets(response)  # extract the fixed script from the response
        PlotCreator.write_plot_script("plot.py", code[0])  # write the fixed script to a file

        # run the fixed script 
        try:
            subprocess.check_output(["python", self.script_path], stderr=subprocess.STDOUT)
        except:
            code[0] = "Sorry I was unable to fix the script.\nThis is my attempt to fix it:\n\n" + code[0]
        return code

    def set_logfile_name(self, filename):
        """
        Set the name of the log file.
        
        :param filename: The name of the log file.
        :type filename: str
        """
        # extract the path to the log file

        path = os.path.dirname(filename)
        self.logfile_name = filename
        self.script_path = os.path.join(path, "plot.py")
        self.plot_path = os.path.join(path, "plot.png")


    def find_relevant_data_types(self, human_input):
        # Search the database for documents that are similar to the human input
        docs = self.db.similarity_search(human_input)

        # Concatenate the content of the documents into a string
        data_type_info_text = ""
        for doc in docs:
            data_type_info_text += doc.page_content + "\n\n"
        
        return data_type_info_text

    def run_script(self):
        # Run the script and if it doesn't work, capture the output and call attempt_to_fix_script
        try:
            subprocess.check_output(["python", self.script_path], stderr=subprocess.STDOUT)
        except subprocess.CalledProcessError as e:
            print(e.output.decode())
            code = self.attempt_to_fix_sctript(self.script_path, e.output.decode())
            self.last_code = code[0]

        except Exception as e:
            print(e)
            code = self.attempt_to_fix_sctript(self.script_path, str(e))
            self.last_code = code[0]


        # Return a list containing the filename of the plot and the code used to generate it
        return [[(None, (self.plot_path,))], self.last_code]

    def create_plot(self, human_input, data_type_info_text):
        """
        Create a plot based on the input provided by the human.
        
        :param human_input: Input provided by the human.
        :type human_input: str
        """

        # Create a history of generated scripts if one exists
        if self.last_code != "":
            history = "\n\nLast script generated:\n\n" + self.last_code
        else:
            history = ""


        # Generate a response by running the chain with the relevant data types, history, file name and human input
        response = self.chain.run({"data_types" : data_type_info_text, "history" : history, "file": self.logfile_name, "human_input": human_input, "output_file": self.plot_path})
        print(response)

        # Parse the code from the response 
        code = self.extract_code_snippets(response)

        # Write the code to a file named "plot.py"
        self.write_plot_script(self.script_path, code[0])

        # Store the code for the next iteration
        self.last_code = code[0]


        return code[0]


    def parse_mavlink_log(self):
        """
        Parse the MAVLink log to extract unique message types and their fields.
        
        :return: A JSON string representation of the unique message types and their fields.
        :rtype: str
        """
        # Initialize a dictionary to store unique message types and their fields
        self.message_types = {}

        # Establish a MAVLink connection
        mav_log = mavutil.mavlink_connection(self.logfile_name)

        # Loop through the log file and extract all unique message types
        while True:
            try:
                # Receive a message
                msg = mav_log.recv_match(blocking=False, type=None)
                # Check if we received a message
                if msg is None:
                    break

                # Store the unique message types and their fields in the dictionary
                if msg.get_type() not in self.message_types:
                    # Add the message type and its fields to the dictionary
                    self.message_types[msg.get_type()] = {
                        "count": 1,
                        "fields": {field: type(getattr(msg, field)).__name__ for field in msg.get_fieldnames()}
                    }
                else:
                    # Increment the count for this message type
                    self.message_types[msg.get_type()]["count"] += 1

            except KeyboardInterrupt:
                break
            except:
                print("Unknown error")
                break

        # Create embeddings for the message types
        self.create_embeddings(self.message_types)

        # Return a JSON string of the message types
        return json.dumps(self.message_types, indent=4)

    def create_embedding(self, texts):
        """
        Create OpenAI embeddings for a list of texts.
        
        :param texts: A list of texts to create embeddings for.
        :type texts: list of str
        """
        # Initialize a dictionary to store the embeddings
        embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
        self.db = Chroma.from_texts(texts, embeddings)

    def create_embeddings(self, message_types):
        """
        Create OpenAI embeddings for a dictionary of message types.
        
        :param message_types: A dictionary of message types to create embeddings for.
        :type message_types: dict
        """
        print(message_types)

        # Convert the message types to a list of JSON strings
        texts = []
        for message_type in message_types:
            texts.append(json.dumps({ message_type : message_types[message_type]}))

        print(f"Texts: {texts}")
        # Create the embeddings
        self.create_embedding(texts)