peer discovery happens now based on vault ids not the general token "pentapass". It also happens periodically instead of just once
470 lines
10 KiB
Go
470 lines
10 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"os"
|
|
"fmt"
|
|
"encoding/json"
|
|
"path/filepath"
|
|
"time"
|
|
"sync"
|
|
"strings"
|
|
|
|
"github.com/mudler/edgevpn/pkg/utils"
|
|
"github.com/libp2p/go-libp2p"
|
|
dht "github.com/libp2p/go-libp2p-kad-dht"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/core/control"
|
|
"github.com/libp2p/go-libp2p/core/network"
|
|
//"github.com/libp2p/go-libp2p/core/peerstore"
|
|
discovery "github.com/libp2p/go-libp2p/p2p/discovery/routing"
|
|
ipfslite "github.com/hsanjuan/ipfs-lite"
|
|
"github.com/google/uuid"
|
|
|
|
multiaddr "github.com/multiformats/go-multiaddr"
|
|
|
|
badger "github.com/ipfs/go-ds-badger2"
|
|
dsq "github.com/ipfs/go-datastore/query"
|
|
crypto "github.com/libp2p/go-libp2p/core/crypto"
|
|
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
|
|
|
|
agelib "filippo.io/age"
|
|
. "github.com/k4lipso/pentapass/internal/log"
|
|
)
|
|
|
|
var (
|
|
Listen = libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")
|
|
)
|
|
|
|
func SetupLibp2pHost(ctx context.Context, dbPath string) (host host.Host, dht *dht.IpfsDHT, err error) {
|
|
data := dbPath
|
|
keyPath := filepath.Join(data, "key")
|
|
var priv crypto.PrivKey
|
|
_, err = os.Stat(keyPath)
|
|
if os.IsNotExist(err) {
|
|
priv, _, err = crypto.GenerateKeyPair(crypto.Ed25519, 1)
|
|
if err != nil {
|
|
Logger.Fatal(err)
|
|
}
|
|
data, err := crypto.MarshalPrivateKey(priv)
|
|
if err != nil {
|
|
Logger.Fatal(err)
|
|
}
|
|
err = os.WriteFile(keyPath, data, 0400)
|
|
if err != nil {
|
|
Logger.Fatal(err)
|
|
}
|
|
} else if err != nil {
|
|
Logger.Fatal(err)
|
|
} else {
|
|
key, err := os.ReadFile(keyPath)
|
|
if err != nil {
|
|
Logger.Fatal(err)
|
|
}
|
|
priv, err = crypto.UnmarshalPrivateKey(key)
|
|
if err != nil {
|
|
Logger.Fatal(err)
|
|
}
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
Logger.Fatal(err)
|
|
}
|
|
|
|
host, err = libp2p.New(libp2p.Identity(priv), Listen)
|
|
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
dht = initDHT(ctx, host)
|
|
host = routed.Wrap(host, dht)
|
|
|
|
return host, dht, nil
|
|
}
|
|
|
|
type Peer struct {
|
|
Id string `json:"Id"`
|
|
Key string `json:"Key"`
|
|
}
|
|
|
|
type VaultConfig struct {
|
|
Name string `json:"Name"`
|
|
Id string `json:"Id"`
|
|
Peers []Peer `json:"Peers"`
|
|
}
|
|
|
|
type Config []VaultConfig
|
|
|
|
|
|
type WhitelistConnectionGater struct {
|
|
whitelistedPeers map[peer.ID]struct{}
|
|
}
|
|
|
|
func (wg *WhitelistConnectionGater) InterceptPeerDial(p peer.ID) (allowed bool) {
|
|
//_, allowed = wg.whitelistedPeers[p]
|
|
return true
|
|
}
|
|
|
|
func (wg *WhitelistConnectionGater) InterceptAddrDial(p peer.ID, addr multiaddr.Multiaddr) (bool) {
|
|
return wg.InterceptPeerDial(p)
|
|
}
|
|
|
|
func (wg *WhitelistConnectionGater) InterceptAccept(conn network.ConnMultiaddrs) (bool) {
|
|
addr, err := peer.AddrInfoFromP2pAddr(conn.RemoteMultiaddr())
|
|
|
|
if err != nil {
|
|
Logger.Debugf("Error InterceptAccept: %s\n", err)
|
|
return false
|
|
}
|
|
|
|
return wg.InterceptPeerDial(addr.ID)
|
|
}
|
|
|
|
func (wg *WhitelistConnectionGater) InterceptSecured(direction network.Direction, p peer.ID, conn network.ConnMultiaddrs) (allow bool) {
|
|
return wg.InterceptPeerDial(p)
|
|
}
|
|
|
|
func (wg *WhitelistConnectionGater) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason) {
|
|
return wg.InterceptPeerDial(conn.RemotePeer()), 0
|
|
}
|
|
|
|
func GetTrustedPeers(config []VaultConfig) map[string][]Peer {
|
|
result := make(map[string][]Peer)
|
|
for _, c := range config {
|
|
result[c.Id] = c.Peers
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func InitRootNs() {
|
|
//TODO: check if "SharedKeyRegistry" key exists, if not create
|
|
}
|
|
|
|
func PeerFromString(str string) (Peer, error) {
|
|
parts := strings.Split(str, "/")
|
|
|
|
if len(parts) != 2 {
|
|
return Peer{}, fmt.Errorf("Invalid Peer String")
|
|
}
|
|
//TODO: validate each part
|
|
|
|
return Peer{ Id: parts[0], Key: parts[1] }, nil
|
|
}
|
|
|
|
type StorageHandler struct {
|
|
Ctx context.Context
|
|
Store *badger.Datastore
|
|
Host host.Host
|
|
Ipfs *ipfslite.Peer
|
|
PubSub *pubsub.PubSub
|
|
Key *agelib.X25519Identity
|
|
Config []VaultConfig
|
|
Vaults map[string]*Vault
|
|
ConfigPath string
|
|
}
|
|
|
|
func (s *StorageHandler) GetSelfPeer() Peer {
|
|
return Peer {
|
|
Id: s.Host.ID().String(),
|
|
Key: s.Key.Recipient().String(),
|
|
}
|
|
}
|
|
|
|
func (s *StorageHandler) UpdateConfig() {
|
|
Logger.Debug("Updating Config...")
|
|
s.recreateConfig()
|
|
s.writeConfig(s.ConfigPath, s.Config)
|
|
}
|
|
|
|
func (s *StorageHandler) recreateConfig() {
|
|
var newCfg []VaultConfig
|
|
for key, val := range s.Vaults {
|
|
newCfg = append(newCfg, VaultConfig{
|
|
Name: key,
|
|
Id: val.ID,
|
|
Peers: val.TrustedPeers,
|
|
})
|
|
}
|
|
s.Config = newCfg
|
|
//for idx, vaultConfig := range s.Config {
|
|
// s.Config[idx].Peers = s.Vaults[vaultConfig.Name].TrustedPeers
|
|
//}
|
|
}
|
|
|
|
func (s *StorageHandler) writeConfig(filename string, config []VaultConfig) error {
|
|
jsonData, err := json.Marshal(config)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
|
|
err = os.WriteFile(filename, jsonData, 0644)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *StorageHandler) NewConfig(filename string) ([]VaultConfig, error) {
|
|
if _, err := os.Stat(filename); errors.Is(err, os.ErrNotExist) {
|
|
err := s.writeConfig(filename, []VaultConfig{
|
|
{
|
|
Name: "root",
|
|
Id: uuid.New().String(),
|
|
Peers: []Peer{
|
|
{
|
|
Id: s.Host.ID().String(),
|
|
Key: s.Key.Recipient().String(),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
)
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Could not create config file: %s", err)
|
|
}
|
|
}
|
|
|
|
content, err := os.ReadFile(filename)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Could not read config file: %s", err)
|
|
}
|
|
|
|
var result []VaultConfig
|
|
err = json.Unmarshal(content, &result)
|
|
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Could not parse config file: %s", err)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (s *StorageHandler) GetDefaultVault(Name string) *Vault {
|
|
return s.Vaults["root"]
|
|
}
|
|
|
|
func (s *StorageHandler) InitVaults() {
|
|
VaultMap := make(map[string]*Vault)
|
|
for _, nsCfg := range s.Config {
|
|
ns1, err := CreateVault(nsCfg.Id, s)
|
|
|
|
if err != nil {
|
|
Logger.Fatal(err)
|
|
}
|
|
|
|
VaultMap[nsCfg.Name] = ns1
|
|
}
|
|
|
|
s.Vaults = VaultMap
|
|
}
|
|
|
|
func IsTrustedPeer(ctx context.Context, id peer.ID, vault string, config []VaultConfig) bool {
|
|
peerMap := GetTrustedPeers(config)
|
|
|
|
val, ok := peerMap[vault]
|
|
|
|
if ok {
|
|
for _, v := range val {
|
|
if v.Id == id.String() {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func PrintDBContent(ctx context.Context, store *badger.Datastore) {
|
|
q := dsq.Query{}
|
|
result, _ := store.Query(ctx, q)
|
|
|
|
entries, _ := result.Rest()
|
|
for _, entry := range entries {
|
|
Logger.Infof("Key: %s, Value: %s\n", entry.Key, string(entry.Value))
|
|
}
|
|
}
|
|
|
|
func (s *StorageHandler) ListVaults() []string {
|
|
var result []string
|
|
for k := range s.Vaults {
|
|
result = append(result, k)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func (s *StorageHandler) DeleteVault(ID string) error {
|
|
ns, ok := s.Vaults[ID]
|
|
|
|
if !ok {
|
|
Logger.Debug("DeleteVault that does not exists")
|
|
return nil
|
|
}
|
|
|
|
delete(s.Vaults, ID)
|
|
ns.Close()
|
|
s.UpdateConfig()
|
|
return nil
|
|
}
|
|
|
|
func (s *StorageHandler) AddVault(Name string) (*Vault, error) {
|
|
ns, ok := s.Vaults[Name]
|
|
|
|
if ok {
|
|
return ns, nil
|
|
}
|
|
|
|
result, err := CreateVault(uuid.New().String(), s)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result.TrustedPeers = append(result.TrustedPeers, s.GetSelfPeer())
|
|
s.Vaults[Name] = result
|
|
s.UpdateConfig()
|
|
return result, nil
|
|
}
|
|
|
|
|
|
func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT {
|
|
// Start a DHT, for use in peer discovery. We can't just make a new DHT
|
|
// client because we want each peer to maintain its own local copy of the
|
|
// DHT, so that the bootstrapping node of the DHT can go down without
|
|
// inhibiting future peer discovery.
|
|
kademliaDHT, err := dht.New(ctx, h)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
// if err = kademliaDHT.Bootstrap(ctx); err != nil {
|
|
// panic(err)
|
|
// }
|
|
|
|
return kademliaDHT
|
|
}
|
|
|
|
func (s *StorageHandler) bootstrapPeers(ctx context.Context, h host.Host) {
|
|
Logger.Info("Bootstrapping DHT")
|
|
|
|
var wg sync.WaitGroup
|
|
for _, peerAddr := range dht.DefaultBootstrapPeers {
|
|
peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
if err := h.Connect(ctx, *peerinfo); err != nil {
|
|
Logger.Debugf("Bootstrap warning: %s", err)
|
|
} else {
|
|
Logger.Debugf("Connection established with bootstrap node: %q\n", *peerinfo)
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func (s *StorageHandler) RunBackground(ctx context.Context, h host.Host, dht *dht.IpfsDHT) {
|
|
s.discoverPeers(ctx, h, dht)
|
|
t := utils.NewBackoffTicker(utils.BackoffInitialInterval(2 * time.Minute),
|
|
utils.BackoffMaxInterval(6 * time.Minute))
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
// We announce ourselves to the rendezvous point for all the peers.
|
|
// We have a safeguard of 1 hour to avoid blocking the main loop
|
|
// in case of network issues.
|
|
// The TTL of DHT is by default no longer than 3 hours, so we should
|
|
// be safe by having an entry less than that.
|
|
safeTimeout, cancel := context.WithTimeout(ctx, time.Hour)
|
|
|
|
endChan := make(chan struct{})
|
|
go func() {
|
|
s.discoverPeers(safeTimeout, h, dht)
|
|
endChan <- struct{}{}
|
|
}()
|
|
|
|
select {
|
|
case <-endChan:
|
|
cancel()
|
|
case <-safeTimeout.Done():
|
|
Logger.Error("Timeout while peer discovery")
|
|
cancel()
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *StorageHandler) discoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) error {
|
|
s.bootstrapPeers(ctx, h)
|
|
time.Sleep(2 * time.Second)
|
|
|
|
for vaultName, v := range s.Vaults {
|
|
Logger.Debugf("Announcing vault \"%s\" with id: %s", vaultName, v.ID)
|
|
routingDiscovery := discovery.NewRoutingDiscovery(dht)
|
|
routingDiscovery.Advertise(ctx, v.ID)
|
|
|
|
Logger.Debugf("Start peer discovery...")
|
|
|
|
timedCtx, cf := context.WithTimeout(ctx, time.Second*120)
|
|
defer cf()
|
|
|
|
peerChan, err := routingDiscovery.FindPeers(timedCtx, v.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for peer := range peerChan {
|
|
if peer.ID == h.ID() || len(peer.Addrs) == 0 {
|
|
continue // No self connection
|
|
}
|
|
|
|
Logger.Debugf("Found peer with id %s", peer.ID.String())
|
|
if h.Network().Connectedness(peer.ID) == network.Connected {
|
|
Logger.Debugf("Already connected to %s", peer.ID.String())
|
|
continue
|
|
}
|
|
|
|
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*120)
|
|
defer cancel()
|
|
err := h.Connect(timeoutCtx, peer)
|
|
if err != nil {
|
|
Logger.Debugf("Failed connecting to %s, error: %s\n", peer.ID, err)
|
|
} else {
|
|
Logger.Debugf("Connected to: %s", peer.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
Logger.Debug("Peer discovery complete")
|
|
return nil
|
|
}
|
|
|
|
func printErr(err error) {
|
|
Logger.Errorf("error: %s", err)
|
|
}
|
|
|
|
func ConnectedPeers(h host.Host) []*peer.AddrInfo {
|
|
var pinfos []*peer.AddrInfo
|
|
for _, c := range h.Network().Conns() {
|
|
pinfos = append(pinfos, &peer.AddrInfo{
|
|
ID: c.RemotePeer(),
|
|
Addrs: []multiaddr.Multiaddr{c.RemoteMultiaddr()},
|
|
})
|
|
}
|
|
return pinfos
|
|
}
|