File size: 4,372 Bytes
4a98f26
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""
Langfuse๋ฅผ ํ™œ์šฉํ•œ ๋ชจ๋‹ˆํ„ฐ๋ง ๊ตฌํ˜„ (์„ ํƒ์ )
"""
from typing import Dict, Any, Optional
import time
import os
from dotenv import load_dotenv

# ํ™˜๊ฒฝ ๋ณ€์ˆ˜ ๋กœ๋“œ
load_dotenv()

# ์„ค์ • ๊ฐ’ ๊ฐ€์ ธ์˜ค๊ธฐ
LANGFUSE_SECRET_KEY = os.getenv("LANGFUSE_SECRET_KEY", "")
LANGFUSE_PUBLIC_KEY = os.getenv("LANGFUSE_PUBLIC_KEY", "")
LANGFUSE_HOST = os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com")

class LangfuseMonitoring:
    def __init__(self):
        """
        Langfuse ๋ชจ๋‹ˆํ„ฐ๋ง ์ดˆ๊ธฐํ™” (์„ ํƒ์  ๊ธฐ๋Šฅ)
        """
        self.enabled = False
        print("๋ชจ๋‹ˆํ„ฐ๋ง ๊ธฐ๋Šฅ์„ ์ดˆ๊ธฐํ™”ํ•ฉ๋‹ˆ๋‹ค...")

        # Langfuse๊ฐ€ ์„ค์น˜๋˜์–ด ์žˆ๋Š”์ง€ ํ™•์ธ
        try:
            from langfuse import Langfuse

            if LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY:
                try:
                    self.langfuse = Langfuse(
                        public_key=LANGFUSE_PUBLIC_KEY,
                        secret_key=LANGFUSE_SECRET_KEY,
                        host=LANGFUSE_HOST,
                    )
                    self.enabled = True
                    print("Langfuse ๋ชจ๋‹ˆํ„ฐ๋ง์ด ํ™œ์„ฑํ™”๋˜์—ˆ์Šต๋‹ˆ๋‹ค.")
                except Exception as e:
                    print(f"Langfuse ์ดˆ๊ธฐํ™” ์‹คํŒจ: {e}")
            else:
                print("Langfuse API ํ‚ค๊ฐ€ ์„ค์ •๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค. ๋ชจ๋‹ˆํ„ฐ๋ง์€ ๋น„ํ™œ์„ฑํ™”๋ฉ๋‹ˆ๋‹ค.")
        except ImportError:
            print("langfuse ํŒจํ‚ค์ง€๊ฐ€ ์„ค์น˜๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค. ๋ชจ๋‹ˆํ„ฐ๋ง ๊ธฐ๋Šฅ์ด ๋น„ํ™œ์„ฑํ™”๋ฉ๋‹ˆ๋‹ค.")
            print("pip install langfuse ๋ช…๋ น์œผ๋กœ ์„ค์น˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.")

    def start_trace(self, name: str, user_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> Any:
        """
        ์ƒˆ ํŠธ๋ ˆ์ด์Šค ์‹œ์ž‘

        Args:
            name: ํŠธ๋ ˆ์ด์Šค ์ด๋ฆ„
            user_id: ์‚ฌ์šฉ์ž ID (์„ ํƒ์ )
            metadata: ์ถ”๊ฐ€ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ (์„ ํƒ์ )

        Returns:
            ํŠธ๋ ˆ์ด์Šค ๊ฐ์ฒด ๋˜๋Š” None
        """
        if not self.enabled:
            return None

        try:
            return self.langfuse.trace(
                name=name,
                user_id=user_id,
                metadata=metadata or {},
            )
        except Exception as e:
            print(f"ํŠธ๋ ˆ์ด์Šค ์ƒ์„ฑ ์‹คํŒจ: {e}")
            return None

    def log_generation(self, trace: Any, name: str, prompt: str, response: str, metadata: Optional[Dict[str, Any]] = None) -> None:
        """
        LLM ์ƒ์„ฑ ๋กœ๊น…

        Args:
            trace: ํŠธ๋ ˆ์ด์Šค ๊ฐ์ฒด
            name: ์ƒ์„ฑ ์ด๋ฆ„
            prompt: ์ž…๋ ฅ ํ”„๋กฌํ”„ํŠธ
            response: ๋ชจ๋ธ ์‘๋‹ต
            metadata: ์ถ”๊ฐ€ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ (์„ ํƒ์ )
        """
        if not self.enabled or trace is None:
            return

        try:
            trace.generation(
                name=name,
                model="user-defined-model",
                prompt=prompt,
                completion=response,
                metadata=metadata or {},
            )
        except Exception as e:
            print(f"์ƒ์„ฑ ๋กœ๊น… ์‹คํŒจ: {e}")

    def log_span(self, trace: Any, name: str, input_data: Any, output_data: Any, start_time: float, end_time: float) -> None:
        """
        ์ฒ˜๋ฆฌ ๊ตฌ๊ฐ„ ๋กœ๊น…

        Args:
            trace: ํŠธ๋ ˆ์ด์Šค ๊ฐ์ฒด
            name: ๊ตฌ๊ฐ„ ์ด๋ฆ„
            input_data: ์ž…๋ ฅ ๋ฐ์ดํ„ฐ
            output_data: ์ถœ๋ ฅ ๋ฐ์ดํ„ฐ
            start_time: ์‹œ์ž‘ ์‹œ๊ฐ„
            end_time: ์ข…๋ฃŒ ์‹œ๊ฐ„
        """
        if not self.enabled or trace is None:
            return

        try:
            trace.span(
                name=name,
                start_time=start_time,
                end_time=end_time,
                input=input_data,
                output=output_data,
                metadata={"duration_ms": (end_time - start_time) * 1000},
            )
        except Exception as e:
            print(f"๊ตฌ๊ฐ„ ๋กœ๊น… ์‹คํŒจ: {e}")

    def end_trace(self, trace: Any) -> None:
        """
        ํŠธ๋ ˆ์ด์Šค ์ข…๋ฃŒ

        Args:
            trace: ์ข…๋ฃŒํ•  ํŠธ๋ ˆ์ด์Šค ๊ฐ์ฒด
        """
        if not self.enabled or trace is None:
            return

        try:
            trace.update(status="success")
        except Exception as e:
            print(f"ํŠธ๋ ˆ์ด์Šค ์ข…๋ฃŒ ์‹คํŒจ: {e}")