Rework discovery

peer discovery happens now based on vault ids not the general token
"pentapass". It also happens periodically instead of just once
This commit is contained in:
2024-10-10 13:12:26 +02:00
parent 069fdf8215
commit a3f2a5ee6d
4 changed files with 114 additions and 64 deletions

View File

@@ -11,6 +11,7 @@ import (
"sync"
"strings"
"github.com/mudler/edgevpn/pkg/utils"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@@ -35,7 +36,6 @@ import (
)
var (
topicNameFlag = "pentapass"
Listen = libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")
)
@@ -75,15 +75,6 @@ func SetupLibp2pHost(ctx context.Context, dbPath string) (host host.Host, dht *d
Logger.Fatal(err)
}
//whitelistedPeers := map[peer.ID]struct{} {
// "12D3KooWLF7BU5VgpqWdS1XwSTFCLphENozhYQAj6i5LqU8BPZZZ": {},
// "12D3KooWBRvtW83QYnPgJCyVyAgMXtg71wjkGefVB2fBnm1A36kS": {},
// "12D3KooWMmc4kYy78vSumqWtPkUNAoPeCpJ66ysFv1U8S554B7e2": {},
//}
//connectionGater := &WhitelistConnectionGater{whitelistedPeers: whitelistedPeers}
//host, err = libp2p.New(libp2p.Identity(priv), libp2p.ConnectionGater(connectionGater), Listen)
host, err = libp2p.New(libp2p.Identity(priv), Listen)
if err != nil {
@@ -93,7 +84,6 @@ func SetupLibp2pHost(ctx context.Context, dbPath string) (host host.Host, dht *d
dht = initDHT(ctx, host)
host = routed.Wrap(host, dht)
return host, dht, nil
}
@@ -358,9 +348,16 @@ func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT {
if err != nil {
panic(err)
}
if err = kademliaDHT.Bootstrap(ctx); err != nil {
panic(err)
}
// if err = kademliaDHT.Bootstrap(ctx); err != nil {
// panic(err)
// }
return kademliaDHT
}
func (s *StorageHandler) bootstrapPeers(ctx context.Context, h host.Host) {
Logger.Info("Bootstrapping DHT")
var wg sync.WaitGroup
for _, peerAddr := range dht.DefaultBootstrapPeers {
peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
@@ -375,35 +372,75 @@ func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT {
}()
}
wg.Wait()
return kademliaDHT
}
func DiscoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) {
routingDiscovery := discovery.NewRoutingDiscovery(dht)
routingDiscovery.Advertise(ctx, topicNameFlag)
func (s *StorageHandler) RunBackground(ctx context.Context, h host.Host, dht *dht.IpfsDHT) {
s.discoverPeers(ctx, h, dht)
t := utils.NewBackoffTicker(utils.BackoffInitialInterval(2 * time.Minute),
utils.BackoffMaxInterval(6 * time.Minute))
defer t.Stop()
for {
select {
case <-t.C:
// We announce ourselves to the rendezvous point for all the peers.
// We have a safeguard of 1 hour to avoid blocking the main loop
// in case of network issues.
// The TTL of DHT is by default no longer than 3 hours, so we should
// be safe by having an entry less than that.
safeTimeout, cancel := context.WithTimeout(ctx, time.Hour)
// Look for others who have announced and attempt to connect to them
anyConnected := false
Logger.Debugf("Own Id: %s\n", h.ID())
for !anyConnected {
time.Sleep(2 * time.Second)
Logger.Debug("Searching for peers...")
peerChan, err := routingDiscovery.FindPeers(ctx, topicNameFlag)
if err != nil {
panic(err)
endChan := make(chan struct{})
go func() {
s.discoverPeers(safeTimeout, h, dht)
endChan <- struct{}{}
}()
select {
case <-endChan:
cancel()
case <-safeTimeout.Done():
Logger.Error("Timeout while peer discovery")
cancel()
}
case <-ctx.Done():
return
}
}
}
func (s *StorageHandler) discoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) error {
s.bootstrapPeers(ctx, h)
time.Sleep(2 * time.Second)
for vaultName, v := range s.Vaults {
Logger.Debugf("Announcing vault \"%s\" with id: %s", vaultName, v.ID)
routingDiscovery := discovery.NewRoutingDiscovery(dht)
routingDiscovery.Advertise(ctx, v.ID)
Logger.Debugf("Start peer discovery...")
timedCtx, cf := context.WithTimeout(ctx, time.Second*120)
defer cf()
peerChan, err := routingDiscovery.FindPeers(timedCtx, v.ID)
if err != nil {
return err
}
for peer := range peerChan {
if peer.ID == h.ID() || len(peer.Addrs) == 0 {
continue // No self connection
}
Logger.Debugf("Found peer with id %s", peer.ID.String())
if h.Network().Connectedness(peer.ID) == network.Connected {
Logger.Debugf("Already connected to %s", peer.ID.String())
continue
}
Logger.Debugf("Connecting to peer with id %s", peer.ID.String())
err := h.Connect(ctx, peer)
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*120)
defer cancel()
err := h.Connect(timeoutCtx, peer)
if err != nil {
Logger.Debugf("Failed connecting to %s, error: %s\n", peer.ID, err)
} else {
@@ -411,7 +448,9 @@ func DiscoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) {
}
}
}
Logger.Debugf("Peer discovery complete")
Logger.Debug("Peer discovery complete")
return nil
}
func printErr(err error) {