package storage import ( "context" "errors" "fmt" "os" "encoding/json" "path/filepath" "time" "sync" "strings" "github.com/ipfs/go-datastore/query" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/control" "github.com/libp2p/go-libp2p/core/network" //"github.com/libp2p/go-libp2p/core/peerstore" discovery "github.com/libp2p/go-libp2p/p2p/discovery/routing" ds "github.com/ipfs/go-datastore" ipfslite "github.com/hsanjuan/ipfs-lite" "github.com/google/uuid" multiaddr "github.com/multiformats/go-multiaddr" badger "github.com/ipfs/go-ds-badger2" dsq "github.com/ipfs/go-datastore/query" crdt "github.com/ipfs/go-ds-crdt" crypto "github.com/libp2p/go-libp2p/core/crypto" routed "github.com/libp2p/go-libp2p/p2p/host/routed" logging "github.com/ipfs/go-log/v2" agelib "filippo.io/age" password "github.com/k4lipso/pentapass/crypto" "github.com/k4lipso/pentapass/crypto/age" ) var ( topicNameFlag = "pentapass" logger = logging.Logger("globaldb") Listen = libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0") ) func SetupLibp2pHost(ctx context.Context, dbPath string) (host host.Host, dht *dht.IpfsDHT, err error) { data := dbPath keyPath := filepath.Join(data, "key") var priv crypto.PrivKey _, err = os.Stat(keyPath) if os.IsNotExist(err) { priv, _, err = crypto.GenerateKeyPair(crypto.Ed25519, 1) if err != nil { logger.Fatal(err) } data, err := crypto.MarshalPrivateKey(priv) if err != nil { logger.Fatal(err) } err = os.WriteFile(keyPath, data, 0400) if err != nil { logger.Fatal(err) } } else if err != nil { logger.Fatal(err) } else { key, err := os.ReadFile(keyPath) if err != nil { logger.Fatal(err) } priv, err = crypto.UnmarshalPrivateKey(key) if err != nil { logger.Fatal(err) } } if err != nil { logger.Fatal(err) } //whitelistedPeers := map[peer.ID]struct{} { // "12D3KooWLF7BU5VgpqWdS1XwSTFCLphENozhYQAj6i5LqU8BPZZZ": {}, // "12D3KooWBRvtW83QYnPgJCyVyAgMXtg71wjkGefVB2fBnm1A36kS": {}, // "12D3KooWMmc4kYy78vSumqWtPkUNAoPeCpJ66ysFv1U8S554B7e2": {}, //} //connectionGater := &WhitelistConnectionGater{whitelistedPeers: whitelistedPeers} //host, err = libp2p.New(libp2p.Identity(priv), libp2p.ConnectionGater(connectionGater), Listen) host, err = libp2p.New(libp2p.Identity(priv), Listen) if err != nil { return nil, nil, err } dht = initDHT(ctx, host) host = routed.Wrap(host, dht) return host, dht, nil } type Peer struct { Id string `json:"Id"` Key string `json:"Key"` } type NamespaceConfig struct { Name string `json:"Name"` Id string `json:"Id"` Peers []Peer `json:"Peers"` } type Config []NamespaceConfig type WhitelistConnectionGater struct { whitelistedPeers map[peer.ID]struct{} } func (wg *WhitelistConnectionGater) InterceptPeerDial(p peer.ID) (allowed bool) { //fmt.Println("PeerDial") //_, allowed = wg.whitelistedPeers[p] return true } func (wg *WhitelistConnectionGater) InterceptAddrDial(p peer.ID, addr multiaddr.Multiaddr) (bool) { //fmt.Println("AddrDial") return wg.InterceptPeerDial(p) } func (wg *WhitelistConnectionGater) InterceptAccept(conn network.ConnMultiaddrs) (bool) { //fmt.Println("InterceptAccept") addr, err := peer.AddrInfoFromP2pAddr(conn.RemoteMultiaddr()) if err != nil { fmt.Printf("Error InterceptAccept: %s\n", err) return false } return wg.InterceptPeerDial(addr.ID) } func (wg *WhitelistConnectionGater) InterceptSecured(direction network.Direction, p peer.ID, conn network.ConnMultiaddrs) (allow bool) { //fmt.Println("InterceptSecured") return wg.InterceptPeerDial(p) } func (wg *WhitelistConnectionGater) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason) { //fmt.Println("InterceptUpgraded") return wg.InterceptPeerDial(conn.RemotePeer()), 0 } func GetTrustedPeers(config []NamespaceConfig) map[string][]Peer { result := make(map[string][]Peer) for _, c := range config { result[c.Id] = c.Peers } return result } func InitRootNs() { //TODO: check if "SharedKeyRegistry" key exists, if not create } type Namespace struct { ID string Datastore *crdt.Datastore //Registry *sharedKeyRegistry CancelFunc context.CancelFunc ctx context.Context Key *agelib.X25519Identity TrustedPeers []Peer } func PeerFromString(str string) (Peer, error) { parts := strings.Split(str, "/") fmt.Println(str) fmt.Println(parts) if len(parts) != 2 { return Peer{}, fmt.Errorf("Invalid Peer String") } //TODO: validate each part return Peer{ Id: parts[0], Key: parts[1] }, nil } func (n *Namespace) AddPeer(peer Peer) { for _, CurrentPeer := range n.TrustedPeers { if CurrentPeer.Id == peer.Id && CurrentPeer.Key == peer.Key { return } } n.TrustedPeers = append(n.TrustedPeers, peer) } func (n *Namespace) RemovePeer(peer Peer) { var Peers []Peer for _, CurrentPeer := range n.TrustedPeers { if CurrentPeer.Id == peer.Id && CurrentPeer.Key == peer.Key { continue } Peers = append(Peers, CurrentPeer) } n.TrustedPeers = Peers } func (n *Namespace) GetRecipients() []string { var result []string for _, peer := range n.TrustedPeers { result = append(result, peer.Key) } return result } func (n *Namespace) Put(k string, v string) error { key := ds.NewKey(k) err := n.Datastore.Put(n.ctx, key, []byte(v)) if err != nil { printErr(err) } return err } func (n *Namespace) Delete(k string) error { key := ds.NewKey(k) err := n.Datastore.Delete(n.ctx, key) if err != nil { printErr(err) } return err } func (n *Namespace) GetPassword(k string) (password.Password, error) { v, err := n.Datastore.Get(n.ctx, ds.NewKey(k)) if err != nil { printErr(err) return password.Password{}, err } val, err := age.Decrypt(v, n.Key) if err != nil { printErr(err) return password.Password{}, err } pw, err := password.GetPasswordFromJson(val) if err != nil { printErr(err) return password.Password{}, err } return pw, nil } func (n *Namespace) Get(k string) (string, error) { v, err := n.Datastore.Get(n.ctx, ds.NewKey(k)) if err != nil { printErr(err) return "", err } return string(v), nil } func (n *Namespace) GetAllNames() []string { q := query.Query{} results, err := n.Datastore.Query(n.ctx, q) if err != nil { printErr(err) } var result []string for r := range results.Next() { if r.Error != nil { printErr(err) continue } result = append(result, r.Key) } return result } func (n *Namespace) List() { q := query.Query{} results, err := n.Datastore.Query(n.ctx, q) if err != nil { printErr(err) } for r := range results.Next() { if r.Error != nil { printErr(err) continue } val, err := age.Decrypt(r.Value, n.Key) if err != nil { printErr(err) continue } fmt.Printf("[%s] -> %s\n", r.Key, string(val)) } } func (n *Namespace) GetAllPasswords() ([]password.Password, error) { q := query.Query{} results, err := n.Datastore.Query(n.ctx, q) if err != nil { return nil, fmt.Errorf("Error during GetAllPasswords: %s", err) } var result []password.Password for r := range results.Next() { if r.Error != nil { printErr(err) continue } val, err := age.Decrypt(r.Value, n.Key) if err != nil { printErr(err) continue } pw, err := password.GetPasswordFromJson(val) if err != nil { printErr(err) continue } result = append(result, pw) } return result, nil } func (n *Namespace) Close() { n.CancelFunc() n.Datastore.Close() } type StorageHandler struct { Ctx context.Context Store *badger.Datastore Host host.Host Ipfs *ipfslite.Peer PubSub *pubsub.PubSub Key *agelib.X25519Identity Config []NamespaceConfig Namespaces map[string]*Namespace ConfigPath string } func (s *StorageHandler) GetSelfPeer() Peer { return Peer { Id: s.Host.ID().String(), Key: s.Key.Recipient().String(), } } func (s *StorageHandler) UpdateConfig() { fmt.Println("Updating Config...") s.recreateConfig() s.writeConfig(s.ConfigPath, s.Config) } func (s *StorageHandler) recreateConfig() { var newCfg []NamespaceConfig for key, val := range s.Namespaces { newCfg = append(newCfg, NamespaceConfig{ Name: key, Id: val.ID, Peers: val.TrustedPeers, }) } s.Config = newCfg //for idx, namespaceConfig := range s.Config { // s.Config[idx].Peers = s.Namespaces[namespaceConfig.Name].TrustedPeers //} } func (s *StorageHandler) writeConfig(filename string, config []NamespaceConfig) error { jsonData, err := json.Marshal(config) if err != nil { fmt.Printf("Error during config initialization") return err } err = os.WriteFile(filename, jsonData, 0644) if err != nil { fmt.Printf("Error during config initialization") return err } return nil } func (s *StorageHandler) NewConfig(filename string) ([]NamespaceConfig, error) { if _, err := os.Stat(filename); errors.Is(err, os.ErrNotExist) { err := s.writeConfig(filename, []NamespaceConfig{ { Name: "root", Id: uuid.New().String(), Peers: []Peer{ { Id: s.Host.ID().String(), Key: s.Key.Recipient().String(), }, }, }, }, ) if err != nil { return nil, fmt.Errorf("Could not create config file: %s", err) } } content, err := os.ReadFile(filename) if err != nil { return nil, fmt.Errorf("Could not read config file: %s", err) } var result []NamespaceConfig err = json.Unmarshal(content, &result) if err != nil { return nil, fmt.Errorf("Could not parse config file: %s", err) } return result, nil } func (s *StorageHandler) GetDefaultNamespace(Name string) *Namespace { return s.Namespaces["root"] } func (s *StorageHandler) InitNamespaces() { NamespaceMap := make(map[string]*Namespace) for _, nsCfg := range s.Config { ns1, err := CreateNamespace(nsCfg.Id, s) if err != nil { logger.Fatal(err) } NamespaceMap[nsCfg.Name] = ns1 } s.Namespaces = NamespaceMap } func IsTrustedPeer(ctx context.Context, id peer.ID, namespace string, config []NamespaceConfig) bool { peerMap := GetTrustedPeers(config) val, ok := peerMap[namespace] if ok { for _, v := range val { if v.Id == id.String() { return true } } } return false } func PrintDBContent(ctx context.Context, store *badger.Datastore) { q := dsq.Query{} result, _ := store.Query(ctx, q) entries, _ := result.Rest() for _, entry := range entries { fmt.Printf("Key: %s, Value: %s\n", entry.Key, string(entry.Value)) } } func (s *StorageHandler) ListNamespaces() []string { var result []string for k := range s.Namespaces { result = append(result, k) } return result } func (s *StorageHandler) DeleteNamespace(ID string) error { ns, ok := s.Namespaces[ID] if !ok { fmt.Print("DeleteNamespace that does not exists") return nil } delete(s.Namespaces, ID) ns.Close() s.UpdateConfig() return nil } func (s *StorageHandler) AddNamespace(Name string) (*Namespace, error) { ns, ok := s.Namespaces[Name] if ok { return ns, nil } result, err := CreateNamespace(uuid.New().String(), s) if err != nil { return nil, err } result.TrustedPeers = append(result.TrustedPeers, s.GetSelfPeer()) s.Namespaces[Name] = result s.UpdateConfig() return result, nil } func CreateNamespace(ID string, storageHandler *StorageHandler) (*Namespace, error) { fmt.Printf("Creating Namespace %s\n", ID) err := storageHandler.PubSub.RegisterTopicValidator( ID, //== topicName func(ctx context.Context, id peer.ID, msg *pubsub.Message) bool { fmt.Printf("PubSubmsg TOPIC: %s, PEER: %s\n", ID, id) signer := msg.GetFrom() trusted := IsTrustedPeer(ctx, signer, ID, storageHandler.Config) if !trusted { fmt.Printf("discarded pubsub message from non trusted source %s\n", signer) } return trusted }, ) if err != nil { logger.Errorf("error registering topic validator: %s", err) } psubCtx, psubCancel := context.WithCancel(storageHandler.Ctx) pubsubBC, err := crdt.NewPubSubBroadcaster(psubCtx, storageHandler.PubSub, ID) if err != nil { logger.Fatal(err) } opts := crdt.DefaultOptions() opts.Logger = logger opts.RebroadcastInterval = 5 * time.Second opts.PutHook = func(k ds.Key, v []byte) { fmt.Printf("Added: [%s]\n", k) } opts.DeleteHook = func(k ds.Key) { fmt.Printf("Removed: [%s]\n", k) } crdt, err := crdt.New(storageHandler.Store, ds.NewKey(ID), storageHandler.Ipfs, pubsubBC, opts) if err != nil { logger.Fatal(err) psubCancel() return nil, err } PeerMap := GetTrustedPeers(storageHandler.Config) val, ok := PeerMap[ID] if !ok { fmt.Println("namespace config does not contain any peers") } return &Namespace{ID: ID, Datastore: crdt, CancelFunc: psubCancel, ctx: storageHandler.Ctx, Key: storageHandler.Key, TrustedPeers: val}, nil } func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT { // Start a DHT, for use in peer discovery. We can't just make a new DHT // client because we want each peer to maintain its own local copy of the // DHT, so that the bootstrapping node of the DHT can go down without // inhibiting future peer discovery. kademliaDHT, err := dht.New(ctx, h) if err != nil { panic(err) } if err = kademliaDHT.Bootstrap(ctx); err != nil { panic(err) } var wg sync.WaitGroup for _, peerAddr := range dht.DefaultBootstrapPeers { peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr) wg.Add(1) go func() { defer wg.Done() if err := h.Connect(ctx, *peerinfo); err != nil { fmt.Println("Bootstrap warning:", err) } else { fmt.Printf("Connection established with bootstrap node: %q\n", *peerinfo) } }() } wg.Wait() return kademliaDHT } func DiscoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) { routingDiscovery := discovery.NewRoutingDiscovery(dht) routingDiscovery.Advertise(ctx, topicNameFlag) // Look for others who have announced and attempt to connect to them anyConnected := false fmt.Printf("Own Id: %s\n", h.ID()) for !anyConnected { time.Sleep(2 * time.Second) fmt.Println("Searching for peers...") peerChan, err := routingDiscovery.FindPeers(ctx, topicNameFlag) if err != nil { panic(err) } for peer := range peerChan { if peer.ID == h.ID() || len(peer.Addrs) == 0 { continue // No self connection } if h.Network().Connectedness(peer.ID) == network.Connected { continue } fmt.Printf("Connecting to peer with id %s", peer.ID.String()) err := h.Connect(ctx, peer) if err != nil { fmt.Printf("Failed connecting to %s, error: %s\n", peer.ID, err) } else { fmt.Println("Connected to:", peer.ID) } } } fmt.Println("Peer discovery complete") } func printErr(err error) { fmt.Println("error:", err) } func ConnectedPeers(h host.Host) []*peer.AddrInfo { var pinfos []*peer.AddrInfo for _, c := range h.Network().Conns() { pinfos = append(pinfos, &peer.AddrInfo{ ID: c.RemotePeer(), Addrs: []multiaddr.Multiaddr{c.RemoteMultiaddr()}, }) } return pinfos }