storage use new logger

This commit is contained in:
2024-10-10 10:15:09 +02:00
parent 873f653b86
commit 0419af16b0

View File

@@ -3,8 +3,8 @@ package storage
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"os" "os"
"fmt"
"encoding/json" "encoding/json"
"path/filepath" "path/filepath"
"time" "time"
@@ -32,16 +32,15 @@ import (
crdt "github.com/ipfs/go-ds-crdt" crdt "github.com/ipfs/go-ds-crdt"
crypto "github.com/libp2p/go-libp2p/core/crypto" crypto "github.com/libp2p/go-libp2p/core/crypto"
routed "github.com/libp2p/go-libp2p/p2p/host/routed" routed "github.com/libp2p/go-libp2p/p2p/host/routed"
logging "github.com/ipfs/go-log/v2"
agelib "filippo.io/age" agelib "filippo.io/age"
password "github.com/k4lipso/pentapass/crypto" password "github.com/k4lipso/pentapass/crypto"
"github.com/k4lipso/pentapass/crypto/age" "github.com/k4lipso/pentapass/crypto/age"
. "github.com/k4lipso/pentapass/internal/log"
) )
var ( var (
topicNameFlag = "pentapass" topicNameFlag = "pentapass"
logger = logging.Logger("globaldb")
Listen = libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0") Listen = libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")
) )
@@ -53,32 +52,32 @@ func SetupLibp2pHost(ctx context.Context, dbPath string) (host host.Host, dht *d
if os.IsNotExist(err) { if os.IsNotExist(err) {
priv, _, err = crypto.GenerateKeyPair(crypto.Ed25519, 1) priv, _, err = crypto.GenerateKeyPair(crypto.Ed25519, 1)
if err != nil { if err != nil {
logger.Fatal(err) Logger.Fatal(err)
} }
data, err := crypto.MarshalPrivateKey(priv) data, err := crypto.MarshalPrivateKey(priv)
if err != nil { if err != nil {
logger.Fatal(err) Logger.Fatal(err)
} }
err = os.WriteFile(keyPath, data, 0400) err = os.WriteFile(keyPath, data, 0400)
if err != nil { if err != nil {
logger.Fatal(err) Logger.Fatal(err)
} }
} else if err != nil { } else if err != nil {
logger.Fatal(err) Logger.Fatal(err)
} else { } else {
key, err := os.ReadFile(keyPath) key, err := os.ReadFile(keyPath)
if err != nil { if err != nil {
logger.Fatal(err) Logger.Fatal(err)
} }
priv, err = crypto.UnmarshalPrivateKey(key) priv, err = crypto.UnmarshalPrivateKey(key)
if err != nil { if err != nil {
logger.Fatal(err) Logger.Fatal(err)
} }
} }
if err != nil { if err != nil {
logger.Fatal(err) Logger.Fatal(err)
} }
//whitelistedPeers := map[peer.ID]struct{} { //whitelistedPeers := map[peer.ID]struct{} {
@@ -122,22 +121,19 @@ type WhitelistConnectionGater struct {
} }
func (wg *WhitelistConnectionGater) InterceptPeerDial(p peer.ID) (allowed bool) { func (wg *WhitelistConnectionGater) InterceptPeerDial(p peer.ID) (allowed bool) {
//fmt.Println("PeerDial")
//_, allowed = wg.whitelistedPeers[p] //_, allowed = wg.whitelistedPeers[p]
return true return true
} }
func (wg *WhitelistConnectionGater) InterceptAddrDial(p peer.ID, addr multiaddr.Multiaddr) (bool) { func (wg *WhitelistConnectionGater) InterceptAddrDial(p peer.ID, addr multiaddr.Multiaddr) (bool) {
//fmt.Println("AddrDial")
return wg.InterceptPeerDial(p) return wg.InterceptPeerDial(p)
} }
func (wg *WhitelistConnectionGater) InterceptAccept(conn network.ConnMultiaddrs) (bool) { func (wg *WhitelistConnectionGater) InterceptAccept(conn network.ConnMultiaddrs) (bool) {
//fmt.Println("InterceptAccept")
addr, err := peer.AddrInfoFromP2pAddr(conn.RemoteMultiaddr()) addr, err := peer.AddrInfoFromP2pAddr(conn.RemoteMultiaddr())
if err != nil { if err != nil {
fmt.Printf("Error InterceptAccept: %s\n", err) Logger.Debugf("Error InterceptAccept: %s\n", err)
return false return false
} }
@@ -145,12 +141,10 @@ func (wg *WhitelistConnectionGater) InterceptAccept(conn network.ConnMultiaddrs)
} }
func (wg *WhitelistConnectionGater) InterceptSecured(direction network.Direction, p peer.ID, conn network.ConnMultiaddrs) (allow bool) { func (wg *WhitelistConnectionGater) InterceptSecured(direction network.Direction, p peer.ID, conn network.ConnMultiaddrs) (allow bool) {
//fmt.Println("InterceptSecured")
return wg.InterceptPeerDial(p) return wg.InterceptPeerDial(p)
} }
func (wg *WhitelistConnectionGater) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason) { func (wg *WhitelistConnectionGater) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason) {
//fmt.Println("InterceptUpgraded")
return wg.InterceptPeerDial(conn.RemotePeer()), 0 return wg.InterceptPeerDial(conn.RemotePeer()), 0
} }
@@ -180,8 +174,6 @@ type Namespace struct {
func PeerFromString(str string) (Peer, error) { func PeerFromString(str string) (Peer, error) {
parts := strings.Split(str, "/") parts := strings.Split(str, "/")
fmt.Println(str)
fmt.Println(parts)
if len(parts) != 2 { if len(parts) != 2 {
return Peer{}, fmt.Errorf("Invalid Peer String") return Peer{}, fmt.Errorf("Invalid Peer String")
} }
@@ -322,7 +314,7 @@ func (n *Namespace) List() {
continue continue
} }
fmt.Printf("[%s] -> %s\n", r.Key, string(val)) Logger.Infof("[%s] -> %s\n", r.Key, string(val))
} }
} }
@@ -387,7 +379,7 @@ func (s *StorageHandler) GetSelfPeer() Peer {
} }
func (s *StorageHandler) UpdateConfig() { func (s *StorageHandler) UpdateConfig() {
fmt.Println("Updating Config...") Logger.Debug("Updating Config...")
s.recreateConfig() s.recreateConfig()
s.writeConfig(s.ConfigPath, s.Config) s.writeConfig(s.ConfigPath, s.Config)
} }
@@ -411,7 +403,6 @@ func (s *StorageHandler) writeConfig(filename string, config []NamespaceConfig)
jsonData, err := json.Marshal(config) jsonData, err := json.Marshal(config)
if err != nil { if err != nil {
fmt.Printf("Error during config initialization")
return err return err
} }
@@ -419,7 +410,6 @@ func (s *StorageHandler) writeConfig(filename string, config []NamespaceConfig)
err = os.WriteFile(filename, jsonData, 0644) err = os.WriteFile(filename, jsonData, 0644)
if err != nil { if err != nil {
fmt.Printf("Error during config initialization")
return err return err
} }
@@ -473,7 +463,7 @@ func (s *StorageHandler) InitNamespaces() {
ns1, err := CreateNamespace(nsCfg.Id, s) ns1, err := CreateNamespace(nsCfg.Id, s)
if err != nil { if err != nil {
logger.Fatal(err) Logger.Fatal(err)
} }
NamespaceMap[nsCfg.Name] = ns1 NamespaceMap[nsCfg.Name] = ns1
@@ -504,7 +494,7 @@ func PrintDBContent(ctx context.Context, store *badger.Datastore) {
entries, _ := result.Rest() entries, _ := result.Rest()
for _, entry := range entries { for _, entry := range entries {
fmt.Printf("Key: %s, Value: %s\n", entry.Key, string(entry.Value)) Logger.Infof("Key: %s, Value: %s\n", entry.Key, string(entry.Value))
} }
} }
@@ -521,7 +511,7 @@ func (s *StorageHandler) DeleteNamespace(ID string) error {
ns, ok := s.Namespaces[ID] ns, ok := s.Namespaces[ID]
if !ok { if !ok {
fmt.Print("DeleteNamespace that does not exists") Logger.Debug("DeleteNamespace that does not exists")
return nil return nil
} }
@@ -551,42 +541,42 @@ func (s *StorageHandler) AddNamespace(Name string) (*Namespace, error) {
} }
func CreateNamespace(ID string, storageHandler *StorageHandler) (*Namespace, error) { func CreateNamespace(ID string, storageHandler *StorageHandler) (*Namespace, error) {
fmt.Printf("Creating Namespace %s\n", ID) Logger.Debugf("Creating Namespace %s\n", ID)
err := storageHandler.PubSub.RegisterTopicValidator( err := storageHandler.PubSub.RegisterTopicValidator(
ID, //== topicName ID, //== topicName
func(ctx context.Context, id peer.ID, msg *pubsub.Message) bool { func(ctx context.Context, id peer.ID, msg *pubsub.Message) bool {
fmt.Printf("PubSubmsg TOPIC: %s, PEER: %s\n", ID, id) Logger.Debugf("PubSubmsg TOPIC: %s, PEER: %s\n", ID, id)
signer := msg.GetFrom() signer := msg.GetFrom()
trusted := IsTrustedPeer(ctx, signer, ID, storageHandler.Config) trusted := IsTrustedPeer(ctx, signer, ID, storageHandler.Config)
if !trusted { if !trusted {
fmt.Printf("discarded pubsub message from non trusted source %s\n", signer) Logger.Debugf("discarded pubsub message from non trusted source %s\n", signer)
} }
return trusted return trusted
}, },
) )
if err != nil { if err != nil {
logger.Errorf("error registering topic validator: %s", err) Logger.Errorf("error registering topic validator: %s", err)
} }
psubCtx, psubCancel := context.WithCancel(storageHandler.Ctx) psubCtx, psubCancel := context.WithCancel(storageHandler.Ctx)
pubsubBC, err := crdt.NewPubSubBroadcaster(psubCtx, storageHandler.PubSub, ID) pubsubBC, err := crdt.NewPubSubBroadcaster(psubCtx, storageHandler.PubSub, ID)
if err != nil { if err != nil {
logger.Fatal(err) Logger.Fatal(err)
} }
opts := crdt.DefaultOptions() opts := crdt.DefaultOptions()
opts.Logger = logger //opts.Logger = Logger
opts.RebroadcastInterval = 5 * time.Second opts.RebroadcastInterval = 5 * time.Second
opts.PutHook = func(k ds.Key, v []byte) { opts.PutHook = func(k ds.Key, v []byte) {
fmt.Printf("Added: [%s]\n", k) Logger.Infof("Added: [%s]\n", k)
} }
opts.DeleteHook = func(k ds.Key) { opts.DeleteHook = func(k ds.Key) {
fmt.Printf("Removed: [%s]\n", k) Logger.Infof("Removed: [%s]\n", k)
} }
crdt, err := crdt.New(storageHandler.Store, ds.NewKey(ID), storageHandler.Ipfs, pubsubBC, opts) crdt, err := crdt.New(storageHandler.Store, ds.NewKey(ID), storageHandler.Ipfs, pubsubBC, opts)
if err != nil { if err != nil {
logger.Fatal(err) Logger.Fatal(err)
psubCancel() psubCancel()
return nil, err return nil, err
} }
@@ -596,7 +586,7 @@ func CreateNamespace(ID string, storageHandler *StorageHandler) (*Namespace, err
val, ok := PeerMap[ID] val, ok := PeerMap[ID]
if !ok { if !ok {
fmt.Println("namespace config does not contain any peers") Logger.Debug("namespace config does not contain any peers")
} }
return &Namespace{ID: ID, Datastore: crdt, CancelFunc: psubCancel, ctx: storageHandler.Ctx, Key: storageHandler.Key, TrustedPeers: val}, nil return &Namespace{ID: ID, Datastore: crdt, CancelFunc: psubCancel, ctx: storageHandler.Ctx, Key: storageHandler.Key, TrustedPeers: val}, nil
@@ -622,9 +612,9 @@ func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT {
go func() { go func() {
defer wg.Done() defer wg.Done()
if err := h.Connect(ctx, *peerinfo); err != nil { if err := h.Connect(ctx, *peerinfo); err != nil {
fmt.Println("Bootstrap warning:", err) Logger.Debugf("Bootstrap warning: %s", err)
} else { } else {
fmt.Printf("Connection established with bootstrap node: %q\n", *peerinfo) Logger.Debugf("Connection established with bootstrap node: %q\n", *peerinfo)
} }
}() }()
} }
@@ -639,10 +629,10 @@ func DiscoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) {
// Look for others who have announced and attempt to connect to them // Look for others who have announced and attempt to connect to them
anyConnected := false anyConnected := false
fmt.Printf("Own Id: %s\n", h.ID()) Logger.Debugf("Own Id: %s\n", h.ID())
for !anyConnected { for !anyConnected {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
fmt.Println("Searching for peers...") Logger.Debug("Searching for peers...")
peerChan, err := routingDiscovery.FindPeers(ctx, topicNameFlag) peerChan, err := routingDiscovery.FindPeers(ctx, topicNameFlag)
if err != nil { if err != nil {
panic(err) panic(err)
@@ -656,20 +646,20 @@ func DiscoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) {
continue continue
} }
fmt.Printf("Connecting to peer with id %s", peer.ID.String()) Logger.Debugf("Connecting to peer with id %s", peer.ID.String())
err := h.Connect(ctx, peer) err := h.Connect(ctx, peer)
if err != nil { if err != nil {
fmt.Printf("Failed connecting to %s, error: %s\n", peer.ID, err) Logger.Debugf("Failed connecting to %s, error: %s\n", peer.ID, err)
} else { } else {
fmt.Println("Connected to:", peer.ID) Logger.Debugf("Connected to: %s", peer.ID)
} }
} }
} }
fmt.Println("Peer discovery complete") Logger.Debugf("Peer discovery complete")
} }
func printErr(err error) { func printErr(err error) {
fmt.Println("error:", err) Logger.Errorf("error: %s", err)
} }
func ConnectedPeers(h host.Host) []*peer.AddrInfo { func ConnectedPeers(h host.Host) []*peer.AddrInfo {