Spaces:
Configuration error
Configuration error
package explorer | |
import ( | |
"context" | |
"fmt" | |
"strings" | |
"sync" | |
"time" | |
"github.com/rs/zerolog/log" | |
"github.com/mudler/LocalAI/core/p2p" | |
"github.com/mudler/edgevpn/pkg/blockchain" | |
) | |
type DiscoveryServer struct { | |
sync.Mutex | |
database *Database | |
connectionTime time.Duration | |
errorThreshold int | |
} | |
// NewDiscoveryServer creates a new DiscoveryServer with the given Database. | |
// it keeps the db state in sync with the network state | |
func NewDiscoveryServer(db *Database, dur time.Duration, failureThreshold int) *DiscoveryServer { | |
if dur == 0 { | |
dur = 50 * time.Second | |
} | |
if failureThreshold == 0 { | |
failureThreshold = 3 | |
} | |
return &DiscoveryServer{ | |
database: db, | |
connectionTime: dur, | |
errorThreshold: failureThreshold, | |
} | |
} | |
type Network struct { | |
Clusters []ClusterData | |
} | |
func (s *DiscoveryServer) runBackground() { | |
if len(s.database.TokenList()) == 0 { | |
time.Sleep(5 * time.Second) // avoid busy loop | |
return | |
} | |
for _, token := range s.database.TokenList() { | |
c, cancel := context.WithTimeout(context.Background(), s.connectionTime) | |
defer cancel() | |
// Connect to the network | |
// Get the number of nodes | |
// save it in the current state (mutex) | |
// do not do in parallel | |
n, err := p2p.NewNode(token) | |
if err != nil { | |
log.Err(err).Msg("Failed to create node") | |
s.failedToken(token) | |
continue | |
} | |
err = n.Start(c) | |
if err != nil { | |
log.Err(err).Msg("Failed to start node") | |
s.failedToken(token) | |
continue | |
} | |
ledger, err := n.Ledger() | |
if err != nil { | |
log.Err(err).Msg("Failed to start ledger") | |
s.failedToken(token) | |
continue | |
} | |
networkData := make(chan ClusterData) | |
// get the network data - it takes the whole timeout | |
// as we might not be connected to the network yet, | |
// and few attempts would have to be made before bailing out | |
go s.retrieveNetworkData(c, ledger, networkData) | |
hasWorkers := false | |
ledgerK := []ClusterData{} | |
for key := range networkData { | |
ledgerK = append(ledgerK, key) | |
if len(key.Workers) > 0 { | |
hasWorkers = true | |
} | |
} | |
log.Debug().Any("network", token).Msgf("Network has %d clusters", len(ledgerK)) | |
if len(ledgerK) != 0 { | |
for _, k := range ledgerK { | |
log.Debug().Any("network", token).Msgf("Clusterdata %+v", k) | |
} | |
} | |
if hasWorkers { | |
s.Lock() | |
data, _ := s.database.Get(token) | |
(&data).Clusters = ledgerK | |
(&data).Failures = 0 | |
s.database.Set(token, data) | |
s.Unlock() | |
} else { | |
s.failedToken(token) | |
} | |
} | |
s.deleteFailedConnections() | |
} | |
func (s *DiscoveryServer) failedToken(token string) { | |
s.Lock() | |
defer s.Unlock() | |
data, _ := s.database.Get(token) | |
(&data).Failures++ | |
s.database.Set(token, data) | |
} | |
func (s *DiscoveryServer) deleteFailedConnections() { | |
s.Lock() | |
defer s.Unlock() | |
for _, t := range s.database.TokenList() { | |
data, _ := s.database.Get(t) | |
if data.Failures > s.errorThreshold { | |
log.Info().Any("token", t).Msg("Token has been removed from the database") | |
s.database.Delete(t) | |
} | |
} | |
} | |
func (s *DiscoveryServer) retrieveNetworkData(c context.Context, ledger *blockchain.Ledger, networkData chan ClusterData) { | |
clusters := map[string]ClusterData{} | |
defer func() { | |
for _, n := range clusters { | |
networkData <- n | |
} | |
close(networkData) | |
}() | |
for { | |
select { | |
case <-c.Done(): | |
return | |
default: | |
time.Sleep(5 * time.Second) | |
data := ledger.LastBlock().Storage | |
LEDGER: | |
for d := range data { | |
toScanForWorkers := false | |
cd := ClusterData{} | |
isWorkerCluster := d == p2p.WorkerID || (strings.Contains(d, "_") && strings.Contains(d, p2p.WorkerID)) | |
isFederatedCluster := d == p2p.FederatedID || (strings.Contains(d, "_") && strings.Contains(d, p2p.FederatedID)) | |
switch { | |
case isWorkerCluster: | |
toScanForWorkers = true | |
cd.Type = "worker" | |
case isFederatedCluster: | |
toScanForWorkers = true | |
cd.Type = "federated" | |
} | |
if strings.Contains(d, "_") { | |
cd.NetworkID = strings.Split(d, "_")[0] | |
} | |
if !toScanForWorkers { | |
continue LEDGER | |
} | |
atLeastOneWorker := false | |
DATA: | |
for _, v := range data[d] { | |
nd := &p2p.NodeData{} | |
if err := v.Unmarshal(nd); err != nil { | |
continue DATA | |
} | |
if nd.IsOnline() { | |
atLeastOneWorker = true | |
(&cd).Workers = append(cd.Workers, nd.ID) | |
} | |
} | |
if atLeastOneWorker { | |
clusters[d] = cd | |
} | |
} | |
} | |
} | |
} | |
// Start the discovery server. This is meant to be run in to a goroutine. | |
func (s *DiscoveryServer) Start(ctx context.Context, keepRunning bool) error { | |
for { | |
select { | |
case <-ctx.Done(): | |
return fmt.Errorf("context cancelled") | |
default: | |
// Collect data | |
s.runBackground() | |
if !keepRunning { | |
return nil | |
} | |
} | |
} | |
} | |