Files
pentapass/storage/storage.go
2024-08-14 19:43:39 +02:00

503 lines
11 KiB
Go

package storage
import (
"context"
"fmt"
"os"
"encoding/json"
"path/filepath"
"time"
"sync"
"github.com/ipfs/go-datastore/query"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/control"
//"github.com/libp2p/go-libp2p/core/peerstore"
drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
dutil "github.com/libp2p/go-libp2p/p2p/discovery/util"
ds "github.com/ipfs/go-datastore"
ipfslite "github.com/hsanjuan/ipfs-lite"
"github.com/libp2p/go-libp2p/core/network"
multiaddr "github.com/multiformats/go-multiaddr"
badger "github.com/ipfs/go-ds-badger2"
dsq "github.com/ipfs/go-datastore/query"
crdt "github.com/ipfs/go-ds-crdt"
crypto "github.com/libp2p/go-libp2p/core/crypto"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
logging "github.com/ipfs/go-log/v2"
agelib "filippo.io/age"
password "github.com/k4lipso/pentapass/crypto"
"github.com/k4lipso/pentapass/crypto/age"
)
var (
topicNameFlag = "afbjlask-23klaj2idalj2-ajl2kjd3i-2ldakjd2"
logger = logging.Logger("globaldb")
Listen = libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")
)
func SetupLibp2pHost(ctx context.Context, dbPath string) (host host.Host, dht *dht.IpfsDHT, err error) {
data := dbPath
keyPath := filepath.Join(data, "key")
var priv crypto.PrivKey
_, err = os.Stat(keyPath)
if os.IsNotExist(err) {
priv, _, err = crypto.GenerateKeyPair(crypto.Ed25519, 1)
if err != nil {
logger.Fatal(err)
}
data, err := crypto.MarshalPrivateKey(priv)
if err != nil {
logger.Fatal(err)
}
err = os.WriteFile(keyPath, data, 0400)
if err != nil {
logger.Fatal(err)
}
} else if err != nil {
logger.Fatal(err)
} else {
key, err := os.ReadFile(keyPath)
if err != nil {
logger.Fatal(err)
}
priv, err = crypto.UnmarshalPrivateKey(key)
if err != nil {
logger.Fatal(err)
}
}
if err != nil {
logger.Fatal(err)
}
//whitelistedPeers := map[peer.ID]struct{} {
// "12D3KooWLF7BU5VgpqWdS1XwSTFCLphENozhYQAj6i5LqU8BPZZZ": {},
// "12D3KooWBRvtW83QYnPgJCyVyAgMXtg71wjkGefVB2fBnm1A36kS": {},
// "12D3KooWMmc4kYy78vSumqWtPkUNAoPeCpJ66ysFv1U8S554B7e2": {},
//}
//connectionGater := &WhitelistConnectionGater{whitelistedPeers: whitelistedPeers}
//host, err = libp2p.New(libp2p.Identity(priv), libp2p.ConnectionGater(connectionGater), Listen)
host, err = libp2p.New(libp2p.Identity(priv), Listen)
if err != nil {
return nil, nil, err
}
dht = initDHT(ctx, host)
host = routed.Wrap(host, dht)
return host, dht, nil
}
type Peer struct {
Id string `json:"Id"`
Key string `json:"Key"`
}
type NamespaceConfig struct {
Name string `json:"Name"`
Id string `json:"Id"`
Peers []Peer `json:"Peers"`
}
type Config []NamespaceConfig
func NewConfig(filename string) ([]NamespaceConfig, error) {
//fmt.Println("NewConfig Path not implemented yet")
content, err := os.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("Could not read config file: %s", err)
}
var result []NamespaceConfig
err = json.Unmarshal(content, &result)
if err != nil {
return nil, fmt.Errorf("Could not parse config file: %s", err)
}
return result, nil
}
type WhitelistConnectionGater struct {
whitelistedPeers map[peer.ID]struct{}
}
func (wg *WhitelistConnectionGater) InterceptPeerDial(p peer.ID) (allowed bool) {
//fmt.Println("PeerDial")
//_, allowed = wg.whitelistedPeers[p]
return true
}
func (wg *WhitelistConnectionGater) InterceptAddrDial(p peer.ID, addr multiaddr.Multiaddr) (bool) {
//fmt.Println("AddrDial")
return wg.InterceptPeerDial(p)
}
func (wg *WhitelistConnectionGater) InterceptAccept(conn network.ConnMultiaddrs) (bool) {
//fmt.Println("InterceptAccept")
addr, err := peer.AddrInfoFromP2pAddr(conn.RemoteMultiaddr())
if err != nil {
fmt.Printf("Error InterceptAccept: %s\n", err)
return false
}
return wg.InterceptPeerDial(addr.ID)
}
func (wg *WhitelistConnectionGater) InterceptSecured(direction network.Direction, p peer.ID, conn network.ConnMultiaddrs) (allow bool) {
//fmt.Println("InterceptSecured")
return wg.InterceptPeerDial(p)
}
func (wg *WhitelistConnectionGater) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason) {
//fmt.Println("InterceptUpgraded")
return wg.InterceptPeerDial(conn.RemotePeer()), 0
}
func GetTrustedPeers(config []NamespaceConfig) map[string][]Peer {
result := make(map[string][]Peer)
for _, c := range config {
result[c.Id] = c.Peers
}
return result
}
func InitRootNs() {
//TODO: check if "SharedKeyRegistry" key exists, if not create
}
type Namespace struct {
ID string
Datastore *crdt.Datastore
//Registry *sharedKeyRegistry
CancelFunc context.CancelFunc
ctx context.Context
Key *agelib.X25519Identity
TrustedPeers []Peer
}
func (n *Namespace) GetRecipients() []string {
var result []string
for _, peer := range n.TrustedPeers {
result = append(result, peer.Key)
}
return result
}
func (n *Namespace) Put(k string, v string) error {
key := ds.NewKey(k)
err := n.Datastore.Put(n.ctx, key, []byte(v))
if err != nil {
printErr(err)
}
return err
}
func (n *Namespace) Get(k string) (string, error) {
v, err := n.Datastore.Get(n.ctx, ds.NewKey(k))
if err != nil {
printErr(err)
return "", err
}
return string(v), nil
}
func (n *Namespace) List() {
q := query.Query{}
results, err := n.Datastore.Query(n.ctx, q)
if err != nil {
printErr(err)
}
for r := range results.Next() {
if r.Error != nil {
printErr(err)
continue
}
val, err := age.Decrypt(r.Value, n.Key)
if err != nil {
printErr(err)
continue
}
fmt.Printf("[%s] -> %s\n", r.Key, string(val))
}
}
func (n *Namespace) GetAllPasswords() ([]password.Password, error) {
q := query.Query{}
results, err := n.Datastore.Query(n.ctx, q)
if err != nil {
return nil, fmt.Errorf("Error during GetAllPasswords: %s", err)
}
var result []password.Password
for r := range results.Next() {
if r.Error != nil {
printErr(err)
continue
}
val, err := age.Decrypt(r.Value, n.Key)
if err != nil {
printErr(err)
continue
}
pw, err := password.GetPasswordFromJson(val)
if err != nil {
printErr(err)
continue
}
result = append(result, pw)
}
return result, nil
}
func (n *Namespace) Close() {
n.Datastore.Close()
n.CancelFunc()
}
type StorageHandler struct {
Ctx context.Context
Store *badger.Datastore
Host host.Host
Ipfs *ipfslite.Peer
PubSub *pubsub.PubSub
Key *agelib.X25519Identity
Config []NamespaceConfig
Namespaces map[string]*Namespace
}
func (s *StorageHandler) GetDefaultNamespace(Name string) *Namespace {
return s.Namespaces["root"]
}
func (s *StorageHandler) InitNamespaces() {
NamespaceMap := make(map[string]*Namespace)
for _, nsCfg := range s.Config {
ns1, err := CreateNamespace(nsCfg.Id, *s)
if err != nil {
logger.Fatal(err)
}
NamespaceMap[nsCfg.Name] = ns1
}
s.Namespaces = NamespaceMap
}
func IsTrustedPeer(ctx context.Context, id peer.ID, namespace string, config []NamespaceConfig) bool {
peerMap := GetTrustedPeers(config)
val, ok := peerMap[namespace]
if ok {
for _, v := range val {
if v.Id == id.String() {
return true
}
}
}
return false
}
func PrintDBContent(ctx context.Context, store *badger.Datastore) {
q := dsq.Query{}
result, _ := store.Query(ctx, q)
entries, _ := result.Rest()
for _, entry := range entries {
fmt.Printf("Key: %s, Value: %s\n", entry.Key, string(entry.Value))
}
}
func CreateNamespace(ID string, storageHandler StorageHandler) (*Namespace, error) {
fmt.Printf("Creating Namespace %s\n", ID)
err := storageHandler.PubSub.RegisterTopicValidator(
ID, //== topicName
func(ctx context.Context, _ peer.ID, msg *pubsub.Message) bool {
signer := msg.GetFrom()
trusted := IsTrustedPeer(ctx, signer, ID, storageHandler.Config)
if !trusted {
logger.Debug("discarded pubsub message from non trusted source %s ", signer)
}
return trusted
},
)
if err != nil {
logger.Errorf("error registering topic validator: %s", err)
}
psubCtx, psubCancel := context.WithCancel(storageHandler.Ctx)
pubsubBC, err := crdt.NewPubSubBroadcaster(psubCtx, storageHandler.PubSub, ID)
if err != nil {
logger.Fatal(err)
}
opts := crdt.DefaultOptions()
opts.Logger = logger
opts.RebroadcastInterval = 5 * time.Second
opts.PutHook = func(k ds.Key, v []byte) {
fmt.Printf("Added: [%s] -> %s\n", k, string(v))
}
opts.DeleteHook = func(k ds.Key) {
fmt.Printf("Removed: [%s]\n", k)
}
crdt, err := crdt.New(storageHandler.Store, ds.NewKey(ID), storageHandler.Ipfs, pubsubBC, opts)
if err != nil {
logger.Fatal(err)
psubCancel()
return nil, err
}
PeerMap := GetTrustedPeers(storageHandler.Config)
val, ok := PeerMap[ID]
if !ok {
logger.Fatal("namespace config does not contain any peers")
}
return &Namespace{ID: ID, Datastore: crdt, CancelFunc: psubCancel, ctx: storageHandler.Ctx, Key: storageHandler.Key, TrustedPeers: val}, nil
}
func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT {
// Start a DHT, for use in peer discovery. We can't just make a new DHT
// client because we want each peer to maintain its own local copy of the
// DHT, so that the bootstrapping node of the DHT can go down without
// inhibiting future peer discovery.
kademliaDHT, err := dht.New(ctx, h)
if err != nil {
panic(err)
}
if err = kademliaDHT.Bootstrap(ctx); err != nil {
panic(err)
}
var wg sync.WaitGroup
for _, peerAddr := range dht.DefaultBootstrapPeers {
peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
wg.Add(1)
go func() {
defer wg.Done()
if err := h.Connect(ctx, *peerinfo); err != nil {
fmt.Println("Bootstrap warning:", err)
}
}()
}
wg.Wait()
return kademliaDHT
}
func DiscoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) {
//cfg := NewConfig()
//for _, v := range cfg {
// for _, p := range v.Peers {
// peerID, err := peer.Decode(p.Id)
// if err != nil {
// logger.Fatal(err)
// }
// peerInfo, err := dht.FindPeer(ctx, peerID)
// if err != nil {
// fmt.Println(err)
// }
// h.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.PermanentAddrTTL)
// err = h.Connect(ctx, peerInfo)
// if err != nil {
// fmt.Printf("Failed connecting to %s, error: %s\n", peerInfo.ID, err)
// } else {
// fmt.Println("Connected to:", peerInfo.ID)
// }
// }
//}
routingDiscovery := drouting.NewRoutingDiscovery(dht)
dutil.Advertise(ctx, routingDiscovery, topicNameFlag)
// Look for others who have announced and attempt to connect to them
anyConnected := false
fmt.Printf("Own Id: %s\n", h.ID())
for !anyConnected {
time.Sleep(2 * time.Second)
//debug fmt.Println("Searching for peers...")
peerChan, err := routingDiscovery.FindPeers(ctx, topicNameFlag)
if err != nil {
panic(err)
}
for peer := range peerChan {
if peer.ID == h.ID() {
continue // No self connection
}
err := h.Connect(ctx, peer)
if err != nil {
//debug fmt.Printf("Failed connecting to %s, error: %s\n", peer.ID, err)
} else {
fmt.Println("Connected to:", peer.ID)
anyConnected = true
}
}
}
fmt.Println("Peer discovery complete")
}
func printErr(err error) {
fmt.Println("error:", err)
}
func ConnectedPeers(h host.Host) []*peer.AddrInfo {
var pinfos []*peer.AddrInfo
for _, c := range h.Network().Conns() {
pinfos = append(pinfos, &peer.AddrInfo{
ID: c.RemotePeer(),
Addrs: []multiaddr.Multiaddr{c.RemoteMultiaddr()},
})
}
return pinfos
}