File size: 756 Bytes
7107f0b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package rpc

import "sync"

type ResponseProcFn func(resp clientResponse) error

type ResponseProcessor struct {
	cbs map[uint64]ResponseProcFn
	mu  *sync.RWMutex
}

func NewResponseProcessor() *ResponseProcessor {
	return &ResponseProcessor{
		make(map[uint64]ResponseProcFn),
		&sync.RWMutex{},
	}
}

func (r *ResponseProcessor) Add(id uint64, fn ResponseProcFn) {
	r.mu.Lock()
	r.cbs[id] = fn
	r.mu.Unlock()
}

func (r *ResponseProcessor) remove(id uint64) {
	r.mu.Lock()
	delete(r.cbs, id)
	r.mu.Unlock()
}

// Process called by recv routine
func (r *ResponseProcessor) Process(resp clientResponse) error {
	id := *resp.Id
	r.mu.RLock()
	fn, ok := r.cbs[id]
	r.mu.RUnlock()
	if ok && fn != nil {
		defer r.remove(id)
		return fn(resp)
	}
	return nil
}