File size: 5,446 Bytes
87337b1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
/**
*
* Agora Real Time Engagement
* Created by Wei Hu in 2022-10.
* Copyright (c) 2024 Agora IO. All rights reserved.
*
*/
// Note that this is just an example extension written in the GO programming
// language, so the package name does not equal to the containing directory
// name. However, it is not common in Go.
package extension
import (
"encoding/json"
"fmt"
"strconv"
"ten_framework/ten"
)
// Message colllector represents the text output result
// @Description 输出结果
type ColllectorMessage struct {
Text string `json:"text"` // 识别出的文本
IsFinal bool `json:"is_final"` // 是否为最终结果
StreamID int32 `json:"stream_id"` // 流ID
Type string `json:"data_type"` // 数据类型
Ts uint64 `json:"text_ts"` // 时间戳
}
// Message represents the text output result
// @Description 输出结果
type Message struct {
Text string `json:"text"` // 识别出的文本
IsFinal bool `json:"is_final"` // 是否为最终结果
StreamID string `json:"stream_id"` // 流ID
Type string `json:"type"` // 数据类型
Ts uint64 `json:"ts"` // 时间戳
}
// RtcUserSate represents the rtc user state
// @Description RTC用户状态
type RtcUserSate struct {
RemoteUserID string `json:"remote_user_id"` // 远程用户ID
State string `json:"state"` // 状态
Reason string `json:"reason"` // 原因
}
type agoraRtmWrapperExtension struct {
ten.DefaultExtension
}
func newExtension(name string) ten.Extension {
return &agoraRtmWrapperExtension{}
}
// OnData receives data from ten graph.
func (p *agoraRtmWrapperExtension) OnData(
tenEnv ten.TenEnv,
data ten.Data,
) {
buf, err := data.GetPropertyBytes("data")
if err != nil {
tenEnv.LogError("OnData GetProperty data error: " + err.Error())
return
}
tenEnv.LogInfo("AGORA_RTM_WRAPPER_EXTENSION OnData: " + string(buf))
colllectorMessage := ColllectorMessage{}
err = json.Unmarshal(buf, &colllectorMessage)
if err != nil {
tenEnv.LogError("OnData Unmarshal data error: " + err.Error())
return
}
message := Message{
Text: colllectorMessage.Text,
IsFinal: colllectorMessage.IsFinal,
StreamID: strconv.Itoa(int(colllectorMessage.StreamID)),
Type: colllectorMessage.Type,
Ts: colllectorMessage.Ts,
}
jsonBytes, err := json.Marshal(message)
if err != nil {
tenEnv.LogError("failed to marshal JSON: " + err.Error())
return
}
tenEnv.LogInfo("AGORA_RTM_WRAPPER_EXTENSION OnData: " + string(jsonBytes))
cmd, _ := ten.NewCmd("publish")
err = cmd.SetPropertyBytes("message", jsonBytes)
if err != nil {
tenEnv.LogError("failed to set property message: " + err.Error())
return
}
if err := tenEnv.SendCmd(cmd, func(_ ten.TenEnv, result ten.CmdResult, _ error) {
status, err := result.GetStatusCode()
tenEnv.LogInfo(fmt.Sprintf("AGORA_RTM_WRAPPER_EXTENSION publish result %d", status))
if status != ten.StatusCodeOk || err != nil {
tenEnv.LogError("failed to subscribe")
}
}); err != nil {
tenEnv.LogError("failed to send command " + err.Error())
}
}
func (p *agoraRtmWrapperExtension) OnCmd(tenEnv ten.TenEnv, cmd ten.Cmd) {
defer func() {
if r := recover(); r != nil {
tenEnv.LogError(fmt.Sprintf("OnCmd panic: %v", r))
}
cmdResult, err := ten.NewCmdResult(ten.StatusCodeOk)
if err != nil {
tenEnv.LogError(fmt.Sprintf("failed to create cmd result: %v", err))
return
}
tenEnv.ReturnResult(cmdResult, cmd, nil)
}()
cmdName, err := cmd.GetName()
if err != nil {
tenEnv.LogError(fmt.Sprintf("failed to get cmd name: %v", err))
return
}
tenEnv.LogInfo(fmt.Sprintf("received command: %s", cmdName))
switch cmdName {
case "on_user_audio_track_state_changed":
// on_user_audio_track_state_changed
p.handleUserStateChanged(tenEnv, cmd)
default:
tenEnv.LogWarn(fmt.Sprintf("unsupported cmd: %s", cmdName))
}
}
func (p *agoraRtmWrapperExtension) handleUserStateChanged(tenEnv ten.TenEnv, cmd ten.Cmd) {
remoteUserID, err := cmd.GetPropertyString("remote_user_id")
if err != nil {
tenEnv.LogError(fmt.Sprintf("failed to get remote_user_id: %v", err))
return
}
state, err := cmd.GetPropertyInt32("state")
if err != nil {
tenEnv.LogError(fmt.Sprintf("failed to get state: %v", err))
return
}
reason, err := cmd.GetPropertyInt32("reason")
if err != nil {
tenEnv.LogError(fmt.Sprintf("failed to get reason: %v", err))
return
}
userState := RtcUserSate{
RemoteUserID: remoteUserID,
State: strconv.Itoa(int(state)),
Reason: strconv.Itoa(int(reason)),
}
jsonBytes, err := json.Marshal(userState)
if err != nil {
tenEnv.LogError("failed to marshal JSON: " + err.Error())
return
}
sendCmd, _ := ten.NewCmd("set_presence_state")
sendCmd.SetPropertyString("states", string(jsonBytes))
tenEnv.LogInfo("AGORA_RTM_WRAPPER_EXTENSION SetRtmPresenceState " + string(jsonBytes))
if err := tenEnv.SendCmd(sendCmd, func(_ ten.TenEnv, result ten.CmdResult, _ error) {
status, err := result.GetStatusCode()
tenEnv.LogInfo(fmt.Sprintf("AGORA_RTM_WRAPPER_EXTENSION SetRtmPresenceState result %d", status))
if status != ten.StatusCodeOk || err != nil {
panic("failed to SetRtmPresenceState")
}
}); err != nil {
tenEnv.LogError("failed to send command " + err.Error())
}
}
func init() {
// Register addon
ten.RegisterAddonAsExtension(
"agora_rtm_wrapper",
ten.NewDefaultExtensionAddon(newExtension),
)
}
|