Files
pentapass/internal/storage/storage.go
kalipso a3f2a5ee6d Rework discovery
peer discovery happens now based on vault ids not the general token
"pentapass". It also happens periodically instead of just once
2024-10-10 13:12:26 +02:00

470 lines
10 KiB
Go

package storage
import (
"context"
"errors"
"os"
"fmt"
"encoding/json"
"path/filepath"
"time"
"sync"
"strings"
"github.com/mudler/edgevpn/pkg/utils"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/control"
"github.com/libp2p/go-libp2p/core/network"
//"github.com/libp2p/go-libp2p/core/peerstore"
discovery "github.com/libp2p/go-libp2p/p2p/discovery/routing"
ipfslite "github.com/hsanjuan/ipfs-lite"
"github.com/google/uuid"
multiaddr "github.com/multiformats/go-multiaddr"
badger "github.com/ipfs/go-ds-badger2"
dsq "github.com/ipfs/go-datastore/query"
crypto "github.com/libp2p/go-libp2p/core/crypto"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
agelib "filippo.io/age"
. "github.com/k4lipso/pentapass/internal/log"
)
var (
Listen = libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")
)
func SetupLibp2pHost(ctx context.Context, dbPath string) (host host.Host, dht *dht.IpfsDHT, err error) {
data := dbPath
keyPath := filepath.Join(data, "key")
var priv crypto.PrivKey
_, err = os.Stat(keyPath)
if os.IsNotExist(err) {
priv, _, err = crypto.GenerateKeyPair(crypto.Ed25519, 1)
if err != nil {
Logger.Fatal(err)
}
data, err := crypto.MarshalPrivateKey(priv)
if err != nil {
Logger.Fatal(err)
}
err = os.WriteFile(keyPath, data, 0400)
if err != nil {
Logger.Fatal(err)
}
} else if err != nil {
Logger.Fatal(err)
} else {
key, err := os.ReadFile(keyPath)
if err != nil {
Logger.Fatal(err)
}
priv, err = crypto.UnmarshalPrivateKey(key)
if err != nil {
Logger.Fatal(err)
}
}
if err != nil {
Logger.Fatal(err)
}
host, err = libp2p.New(libp2p.Identity(priv), Listen)
if err != nil {
return nil, nil, err
}
dht = initDHT(ctx, host)
host = routed.Wrap(host, dht)
return host, dht, nil
}
type Peer struct {
Id string `json:"Id"`
Key string `json:"Key"`
}
type VaultConfig struct {
Name string `json:"Name"`
Id string `json:"Id"`
Peers []Peer `json:"Peers"`
}
type Config []VaultConfig
type WhitelistConnectionGater struct {
whitelistedPeers map[peer.ID]struct{}
}
func (wg *WhitelistConnectionGater) InterceptPeerDial(p peer.ID) (allowed bool) {
//_, allowed = wg.whitelistedPeers[p]
return true
}
func (wg *WhitelistConnectionGater) InterceptAddrDial(p peer.ID, addr multiaddr.Multiaddr) (bool) {
return wg.InterceptPeerDial(p)
}
func (wg *WhitelistConnectionGater) InterceptAccept(conn network.ConnMultiaddrs) (bool) {
addr, err := peer.AddrInfoFromP2pAddr(conn.RemoteMultiaddr())
if err != nil {
Logger.Debugf("Error InterceptAccept: %s\n", err)
return false
}
return wg.InterceptPeerDial(addr.ID)
}
func (wg *WhitelistConnectionGater) InterceptSecured(direction network.Direction, p peer.ID, conn network.ConnMultiaddrs) (allow bool) {
return wg.InterceptPeerDial(p)
}
func (wg *WhitelistConnectionGater) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason) {
return wg.InterceptPeerDial(conn.RemotePeer()), 0
}
func GetTrustedPeers(config []VaultConfig) map[string][]Peer {
result := make(map[string][]Peer)
for _, c := range config {
result[c.Id] = c.Peers
}
return result
}
func InitRootNs() {
//TODO: check if "SharedKeyRegistry" key exists, if not create
}
func PeerFromString(str string) (Peer, error) {
parts := strings.Split(str, "/")
if len(parts) != 2 {
return Peer{}, fmt.Errorf("Invalid Peer String")
}
//TODO: validate each part
return Peer{ Id: parts[0], Key: parts[1] }, nil
}
type StorageHandler struct {
Ctx context.Context
Store *badger.Datastore
Host host.Host
Ipfs *ipfslite.Peer
PubSub *pubsub.PubSub
Key *agelib.X25519Identity
Config []VaultConfig
Vaults map[string]*Vault
ConfigPath string
}
func (s *StorageHandler) GetSelfPeer() Peer {
return Peer {
Id: s.Host.ID().String(),
Key: s.Key.Recipient().String(),
}
}
func (s *StorageHandler) UpdateConfig() {
Logger.Debug("Updating Config...")
s.recreateConfig()
s.writeConfig(s.ConfigPath, s.Config)
}
func (s *StorageHandler) recreateConfig() {
var newCfg []VaultConfig
for key, val := range s.Vaults {
newCfg = append(newCfg, VaultConfig{
Name: key,
Id: val.ID,
Peers: val.TrustedPeers,
})
}
s.Config = newCfg
//for idx, vaultConfig := range s.Config {
// s.Config[idx].Peers = s.Vaults[vaultConfig.Name].TrustedPeers
//}
}
func (s *StorageHandler) writeConfig(filename string, config []VaultConfig) error {
jsonData, err := json.Marshal(config)
if err != nil {
return err
}
err = os.WriteFile(filename, jsonData, 0644)
if err != nil {
return err
}
return nil
}
func (s *StorageHandler) NewConfig(filename string) ([]VaultConfig, error) {
if _, err := os.Stat(filename); errors.Is(err, os.ErrNotExist) {
err := s.writeConfig(filename, []VaultConfig{
{
Name: "root",
Id: uuid.New().String(),
Peers: []Peer{
{
Id: s.Host.ID().String(),
Key: s.Key.Recipient().String(),
},
},
},
},
)
if err != nil {
return nil, fmt.Errorf("Could not create config file: %s", err)
}
}
content, err := os.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("Could not read config file: %s", err)
}
var result []VaultConfig
err = json.Unmarshal(content, &result)
if err != nil {
return nil, fmt.Errorf("Could not parse config file: %s", err)
}
return result, nil
}
func (s *StorageHandler) GetDefaultVault(Name string) *Vault {
return s.Vaults["root"]
}
func (s *StorageHandler) InitVaults() {
VaultMap := make(map[string]*Vault)
for _, nsCfg := range s.Config {
ns1, err := CreateVault(nsCfg.Id, s)
if err != nil {
Logger.Fatal(err)
}
VaultMap[nsCfg.Name] = ns1
}
s.Vaults = VaultMap
}
func IsTrustedPeer(ctx context.Context, id peer.ID, vault string, config []VaultConfig) bool {
peerMap := GetTrustedPeers(config)
val, ok := peerMap[vault]
if ok {
for _, v := range val {
if v.Id == id.String() {
return true
}
}
}
return false
}
func PrintDBContent(ctx context.Context, store *badger.Datastore) {
q := dsq.Query{}
result, _ := store.Query(ctx, q)
entries, _ := result.Rest()
for _, entry := range entries {
Logger.Infof("Key: %s, Value: %s\n", entry.Key, string(entry.Value))
}
}
func (s *StorageHandler) ListVaults() []string {
var result []string
for k := range s.Vaults {
result = append(result, k)
}
return result
}
func (s *StorageHandler) DeleteVault(ID string) error {
ns, ok := s.Vaults[ID]
if !ok {
Logger.Debug("DeleteVault that does not exists")
return nil
}
delete(s.Vaults, ID)
ns.Close()
s.UpdateConfig()
return nil
}
func (s *StorageHandler) AddVault(Name string) (*Vault, error) {
ns, ok := s.Vaults[Name]
if ok {
return ns, nil
}
result, err := CreateVault(uuid.New().String(), s)
if err != nil {
return nil, err
}
result.TrustedPeers = append(result.TrustedPeers, s.GetSelfPeer())
s.Vaults[Name] = result
s.UpdateConfig()
return result, nil
}
func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT {
// Start a DHT, for use in peer discovery. We can't just make a new DHT
// client because we want each peer to maintain its own local copy of the
// DHT, so that the bootstrapping node of the DHT can go down without
// inhibiting future peer discovery.
kademliaDHT, err := dht.New(ctx, h)
if err != nil {
panic(err)
}
// if err = kademliaDHT.Bootstrap(ctx); err != nil {
// panic(err)
// }
return kademliaDHT
}
func (s *StorageHandler) bootstrapPeers(ctx context.Context, h host.Host) {
Logger.Info("Bootstrapping DHT")
var wg sync.WaitGroup
for _, peerAddr := range dht.DefaultBootstrapPeers {
peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
wg.Add(1)
go func() {
defer wg.Done()
if err := h.Connect(ctx, *peerinfo); err != nil {
Logger.Debugf("Bootstrap warning: %s", err)
} else {
Logger.Debugf("Connection established with bootstrap node: %q\n", *peerinfo)
}
}()
}
wg.Wait()
}
func (s *StorageHandler) RunBackground(ctx context.Context, h host.Host, dht *dht.IpfsDHT) {
s.discoverPeers(ctx, h, dht)
t := utils.NewBackoffTicker(utils.BackoffInitialInterval(2 * time.Minute),
utils.BackoffMaxInterval(6 * time.Minute))
defer t.Stop()
for {
select {
case <-t.C:
// We announce ourselves to the rendezvous point for all the peers.
// We have a safeguard of 1 hour to avoid blocking the main loop
// in case of network issues.
// The TTL of DHT is by default no longer than 3 hours, so we should
// be safe by having an entry less than that.
safeTimeout, cancel := context.WithTimeout(ctx, time.Hour)
endChan := make(chan struct{})
go func() {
s.discoverPeers(safeTimeout, h, dht)
endChan <- struct{}{}
}()
select {
case <-endChan:
cancel()
case <-safeTimeout.Done():
Logger.Error("Timeout while peer discovery")
cancel()
}
case <-ctx.Done():
return
}
}
}
func (s *StorageHandler) discoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) error {
s.bootstrapPeers(ctx, h)
time.Sleep(2 * time.Second)
for vaultName, v := range s.Vaults {
Logger.Debugf("Announcing vault \"%s\" with id: %s", vaultName, v.ID)
routingDiscovery := discovery.NewRoutingDiscovery(dht)
routingDiscovery.Advertise(ctx, v.ID)
Logger.Debugf("Start peer discovery...")
timedCtx, cf := context.WithTimeout(ctx, time.Second*120)
defer cf()
peerChan, err := routingDiscovery.FindPeers(timedCtx, v.ID)
if err != nil {
return err
}
for peer := range peerChan {
if peer.ID == h.ID() || len(peer.Addrs) == 0 {
continue // No self connection
}
Logger.Debugf("Found peer with id %s", peer.ID.String())
if h.Network().Connectedness(peer.ID) == network.Connected {
Logger.Debugf("Already connected to %s", peer.ID.String())
continue
}
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*120)
defer cancel()
err := h.Connect(timeoutCtx, peer)
if err != nil {
Logger.Debugf("Failed connecting to %s, error: %s\n", peer.ID, err)
} else {
Logger.Debugf("Connected to: %s", peer.ID)
}
}
}
Logger.Debug("Peer discovery complete")
return nil
}
func printErr(err error) {
Logger.Errorf("error: %s", err)
}
func ConnectedPeers(h host.Host) []*peer.AddrInfo {
var pinfos []*peer.AddrInfo
for _, c := range h.Network().Conns() {
pinfos = append(pinfos, &peer.AddrInfo{
ID: c.RemotePeer(),
Addrs: []multiaddr.Multiaddr{c.RemoteMultiaddr()},
})
}
return pinfos
}