diff --git a/cmd/ppass/ppass.go b/cmd/ppass/ppass.go new file mode 100644 index 0000000..d750a6f --- /dev/null +++ b/cmd/ppass/ppass.go @@ -0,0 +1,246 @@ +package main + +import ( + "bufio" + "context" + "flag" + "fmt" + "os" + "time" + "os/signal" + "strings" + "syscall" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" + ipfslite "github.com/hsanjuan/ipfs-lite" + badger "github.com/ipfs/go-ds-badger2" + logging "github.com/ipfs/go-log/v2" + + "github.com/k4lipso/pentapass/storage" +) + +var ( + topicNameFlag = flag.String("topicName", "akdjlask-23klaj2idalj2-ajl2kjd3i-2ldakjd2", "name of topic to join") + dbPath = flag.String("db", "./db", "db file path") + nameSpace = flag.String("namespace", "crdt", "namespace") + logger = logging.Logger("globaldb") +// topicName = "globaldb-example" +// netTopic = "globaldb-example-net" +// config = "globaldb-example" +) + + +func main() { + flag.Parse() + ctx := context.Background() + data := *dbPath + + h, dht, err := storage.SetupLibp2pHost(ctx, *dbPath) + + pid := h.ID().String() + fmt.Println(h.ID().String()) + + if err != nil { + panic(err) + } + + ps, err := pubsub.NewGossipSub(ctx, h) + if err != nil { + panic(err) + } + + //topic, err := ps.Join(*topicNameFlag) + //if err != nil { + // panic(err) + //} + + go storage.DiscoverPeers(ctx, h, dht) + + store, err := badger.NewDatastore(data, &badger.DefaultOptions) + if err != nil { + logger.Fatal(err) + } + defer store.Close() + + ipfs, err := ipfslite.New(ctx, store, nil, h, dht, nil) + if err != nil { + logger.Fatal(err) + } + + storageHandler := storage.StorageHandler{ + Ctx: ctx , + Store: store, + Host: h, + Ipfs: ipfs, + PubSub: ps, + } + + Cfg := storage.NewConfig() + + Namespaces := make(map[string]*storage.Namespace) + for _, nsCfg := range Cfg { + ns1, err := storage.CreateNamespace(nsCfg.Id, storageHandler) + + if err != nil { + logger.Fatal(err) + } + + Namespaces[nsCfg.Name] = ns1 + defer ns1.Close() + } + + fmt.Printf(` +Peer ID: %s +Listen address: %s +Topic: %s +Data Folder: %s + +Ready! + +Commands: + +> list -> list items in the store +> get -> get value for a key +> put -> store value on a key +> exit -> quit + + +`, + pid, storage.Listen, *topicNameFlag, data, + ) + + if len(os.Args) > 1 && os.Args[1] == "daemon" { + fmt.Println("Running in daemon mode") + go func() { + for { + fmt.Printf("%s - %d connected peers\n", time.Now().Format(time.Stamp), len(storage.ConnectedPeers(h))) + time.Sleep(10 * time.Second) + } + }() + signalChan := make(chan os.Signal, 20) + signal.Notify( + signalChan, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGHUP, + ) + <-signalChan + return + } + + fmt.Printf("> ") + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + text := scanner.Text() + fields := strings.Fields(text) + if len(fields) == 0 { + fmt.Printf("> ") + continue + } + + cmd := fields[0] + + switch cmd { + case "exit", "quit": + return + case "debug": + if len(fields) < 2 { + fmt.Println("debug ") + } + st := fields[1] + switch st { + case "on": + logging.SetLogLevel("globaldb", "debug") + case "off": + logging.SetLogLevel("globaldb", "error") + case "peers": + for _, p := range storage.ConnectedPeers(h) { + addrs, err := peer.AddrInfoToP2pAddrs(p) + if err != nil { + logger.Warn(err) + continue + } + for _, a := range addrs { + fmt.Println(a) + } + } + } + case "list": + if len(fields) < 2 { + fmt.Printf("Available Namespaces:\n") + for k := range Namespaces { + fmt.Printf("%s\n", k) + } + continue + } + + namespace := fields[1] + + fmt.Printf("Listing content of %s", namespace) + + val, ok := Namespaces[namespace] + + if !ok { + fmt.Println("Namespace does not exist") + continue + } + + val.List() + case "get": + if len(fields) < 3 { + fmt.Println("get ") + fmt.Println("> ") + continue + } + + namespace := fields[1] + + val, ok := Namespaces[namespace] + + if !ok { + fmt.Println("Namespace does not exist") + continue + } + + k := fields[2] + v, err := val.Get(k) + if err != nil { + printErr(err) + continue + } + + fmt.Printf("[%s] -> %s\n", k, string(v)) + case "put": + if len(fields) < 4 { + fmt.Println("put ") + fmt.Println("> ") + continue + } + + namespace := fields[1] + + val, ok := Namespaces[namespace] + + if !ok { + fmt.Println("Namespace does not exist") + continue + } + + + k := fields[2] + v := strings.Join(fields[3:], " ") + err := val.Put(k, v) + if err != nil { + printErr(err) + continue + } + } + fmt.Printf("> ") + } +} + +func printErr(err error) { + fmt.Println("error:", err) + fmt.Println("> ") +} diff --git a/main.go b/storage/storage.go similarity index 53% rename from main.go rename to storage/storage.go index d247e6b..461d403 100644 --- a/main.go +++ b/storage/storage.go @@ -1,18 +1,13 @@ -package main +package storage import ( - "bufio" "context" - "flag" "fmt" "os" - "sync" "path/filepath" "time" - "os/signal" + "sync" "io/ioutil" - "strings" - "syscall" "github.com/ipfs/go-datastore/query" "github.com/libp2p/go-libp2p" @@ -20,35 +15,32 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/control" //"github.com/libp2p/go-libp2p/core/peerstore" drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing" dutil "github.com/libp2p/go-libp2p/p2p/discovery/util" ds "github.com/ipfs/go-datastore" ipfslite "github.com/hsanjuan/ipfs-lite" + "github.com/libp2p/go-libp2p/core/network" multiaddr "github.com/multiformats/go-multiaddr" badger "github.com/ipfs/go-ds-badger2" dsq "github.com/ipfs/go-datastore/query" crdt "github.com/ipfs/go-ds-crdt" - logging "github.com/ipfs/go-log/v2" crypto "github.com/libp2p/go-libp2p/core/crypto" routed "github.com/libp2p/go-libp2p/p2p/host/routed" + logging "github.com/ipfs/go-log/v2" ) var ( - topicNameFlag = flag.String("topicName", "applesauce1234", "name of topic to join") - dbPath = flag.String("db", "./db", "db file path") - nameSpace = flag.String("namespace", "crdt", "namespace") + topicNameFlag = "akdjlask-23klaj2idalj2-ajl2kjd3i-2ldakjd2" logger = logging.Logger("globaldb") - listen = libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0") -// topicName = "globaldb-example" -// netTopic = "globaldb-example-net" -// config = "globaldb-example" + Listen = libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0") ) -func SetupLibp2pHost(ctx context.Context) (host host.Host, dht *dht.IpfsDHT, err error) { - data := *dbPath +func SetupLibp2pHost(ctx context.Context, dbPath string) (host host.Host, dht *dht.IpfsDHT, err error) { + data := dbPath keyPath := filepath.Join(data, "key") var priv crypto.PrivKey _, err = os.Stat(keyPath) @@ -83,7 +75,16 @@ func SetupLibp2pHost(ctx context.Context) (host host.Host, dht *dht.IpfsDHT, err logger.Fatal(err) } - host, err = libp2p.New(libp2p.Identity(priv), listen) + //whitelistedPeers := map[peer.ID]struct{} { + // "12D3KooWLF7BU5VgpqWdS1XwSTFCLphENozhYQAj6i5LqU8BPZZZ": {}, + // "12D3KooWBRvtW83QYnPgJCyVyAgMXtg71wjkGefVB2fBnm1A36kS": {}, + // "12D3KooWMmc4kYy78vSumqWtPkUNAoPeCpJ66ysFv1U8S554B7e2": {}, + //} + + //connectionGater := &WhitelistConnectionGater{whitelistedPeers: whitelistedPeers} + + //host, err = libp2p.New(libp2p.Identity(priv), libp2p.ConnectionGater(connectionGater), Listen) + host, err = libp2p.New(libp2p.Identity(priv), Listen) if err != nil { return nil, nil, err @@ -118,19 +119,66 @@ func NewConfig() []NamespaceConfig { Peers: []Peer{ { Id: "12D3KooWLF7BU5VgpqWdS1XwSTFCLphENozhYQAj6i5LqU8BPZZZ" }, { Id: "12D3KooWBRvtW83QYnPgJCyVyAgMXtg71wjkGefVB2fBnm1A36kS" }, + { Id: "12D3KooWMmc4kYy78vSumqWtPkUNAoPeCpJ66ysFv1U8S554B7e2" }, }, }, { - Name: "TestNamespace", + Name: "foo", Id: "2-903djl1290djl1-21jdl1kjd2-1d1jdl1k2jd11", Peers: []Peer{ { Id: "12D3KooWLF7BU5VgpqWdS1XwSTFCLphENozhYQAj6i5LqU8BPZZZ" }, - { Id: "12D3KooWBRvtW83QYnPgJCyVyAgMXtg71wjkGefVB2fBnm1A36kS" }, + { Id: "12D3KooWMmc4kYy78vSumqWtPkUNAoPeCpJ66ysFv1U8S554B7e2" }, + }, + }, + { + Name: "bar", + Id: "2-90ssssssssdjl1-21jdl1kjd2-1d1jdl1k2jd11", + Peers: []Peer{ + //{ Id: "12D3KooWLF7BU5VgpqWdS1XwSTFCLphENozhYQAj6i5LqU8BPZZZ" }, + //{ Id: "12D3KooWBRvtW83QYnPgJCyVyAgMXtg71wjkGefVB2fBnm1A36kS" }, }, }, } } + +type WhitelistConnectionGater struct { + whitelistedPeers map[peer.ID]struct{} +} + +func (wg *WhitelistConnectionGater) InterceptPeerDial(p peer.ID) (allowed bool) { + //fmt.Println("PeerDial") + //_, allowed = wg.whitelistedPeers[p] + return true +} + +func (wg *WhitelistConnectionGater) InterceptAddrDial(p peer.ID, addr multiaddr.Multiaddr) (bool) { + //fmt.Println("AddrDial") + return wg.InterceptPeerDial(p) +} + +func (wg *WhitelistConnectionGater) InterceptAccept(conn network.ConnMultiaddrs) (bool) { + //fmt.Println("InterceptAccept") + addr, err := peer.AddrInfoFromP2pAddr(conn.RemoteMultiaddr()) + + if err != nil { + fmt.Printf("Error InterceptAccept: %s\n", err) + return false + } + + return wg.InterceptPeerDial(addr.ID) +} + +func (wg *WhitelistConnectionGater) InterceptSecured(direction network.Direction, p peer.ID, conn network.ConnMultiaddrs) (allow bool) { + //fmt.Println("InterceptSecured") + return wg.InterceptPeerDial(p) +} + +func (wg *WhitelistConnectionGater) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason) { + //fmt.Println("InterceptUpgraded") + return wg.InterceptPeerDial(conn.RemotePeer()), 0 +} + func GetTrustedPeers() map[string][]Peer { cfg := NewConfig() @@ -151,6 +199,46 @@ type Namespace struct { Datastore *crdt.Datastore //Registry *sharedKeyRegistry CancelFunc context.CancelFunc + ctx context.Context +} + + +func (n *Namespace) Put(k string, v string) error { + key := ds.NewKey(k) + err := n.Datastore.Put(n.ctx, key, []byte(v)) + + if err != nil { + printErr(err) + } + + return err +} + +func (n *Namespace) Get(k string) (string, error) { + v, err := n.Datastore.Get(n.ctx, ds.NewKey(k)) + if err != nil { + printErr(err) + return "", err + } + + return string(v), nil +} + +func (n *Namespace) List() { + q := query.Query{} + results, err := n.Datastore.Query(n.ctx, q) + + if err != nil { + printErr(err) + } + + for r := range results.Next() { + if r.Error != nil { + printErr(err) + continue + } + fmt.Printf("[%s] -> %s\n", r.Key, string(r.Value)) + } } func (n *Namespace) Close() { @@ -174,13 +262,11 @@ func IsTrustedPeer(ctx context.Context, id peer.ID, namespace string) bool { if ok { for _, v := range val { if v.Id == id.String() { - fmt.Printf("Trusting %s in Namspace %s\n", id.String(), namespace) return true } } } - fmt.Printf("NOT! trusting %s in Namspace %s\n", id.String(), namespace) return false } @@ -199,7 +285,6 @@ func CreateNamespace(ID string, storageHandler StorageHandler) (*Namespace, erro err := storageHandler.PubSub.RegisterTopicValidator( ID, //== topicName func(ctx context.Context, _ peer.ID, msg *pubsub.Message) bool { - fmt.Println("Running Validator") signer := msg.GetFrom() trusted := IsTrustedPeer(ctx, signer, ID) if !trusted { @@ -229,254 +314,16 @@ func CreateNamespace(ID string, storageHandler StorageHandler) (*Namespace, erro fmt.Printf("Removed: [%s]\n", k) } - crdt, err := crdt.New(storageHandler.Store, ds.NewKey(*nameSpace), storageHandler.Ipfs, pubsubBC, opts) + crdt, err := crdt.New(storageHandler.Store, ds.NewKey(ID), storageHandler.Ipfs, pubsubBC, opts) if err != nil { logger.Fatal(err) psubCancel() return nil, err } - return &Namespace{ID: ID, Datastore: crdt, CancelFunc: psubCancel}, nil + return &Namespace{ID: ID, Datastore: crdt, CancelFunc: psubCancel, ctx: storageHandler.Ctx}, nil } -func main() { - flag.Parse() - ctx := context.Background() - data := *dbPath - - h, dht, err := SetupLibp2pHost(ctx) - - pid := h.ID().String() - fmt.Println(h.ID().String()) - - if err != nil { - panic(err) - } - - ps, err := pubsub.NewGossipSub(ctx, h) - if err != nil { - panic(err) - } - - topic, err := ps.Join(*topicNameFlag) - if err != nil { - panic(err) - } - - go discoverPeers(ctx, h, dht) - - store, err := badger.NewDatastore(data, &badger.DefaultOptions) - if err != nil { - logger.Fatal(err) - } - defer store.Close() - - PrintDBContent(ctx, store) - - netSubs, err := topic.Subscribe() - if err != nil { - logger.Fatal(err) - } - - // Use a special pubsub topic to avoid disconnecting - // from globaldb peers. - go func() { - for { - msg, err := netSubs.Next(ctx) - if err != nil { - fmt.Println(err) - break - } - h.ConnManager().TagPeer(msg.ReceivedFrom, "keep", 100) - } - }() - - go func() { - for { - select { - case <-ctx.Done(): - return - default: - topic.Publish(ctx, []byte("hi!")) - time.Sleep(20 * time.Second) - } - } - }() - - ipfs, err := ipfslite.New(ctx, store, nil, h, dht, nil) - if err != nil { - logger.Fatal(err) - } - - storageHandler := StorageHandler{ - Ctx: ctx , - Store: store, - Host: h, - Ipfs: ipfs, - PubSub: ps, - } - - Cfg := NewConfig() - - var Namespaces []*Namespace - for _, nsCfg := range Cfg { - ns1, err := CreateNamespace(nsCfg.Id, storageHandler) - - if err != nil { - logger.Fatal(err) - } - - Namespaces = append(Namespaces, ns1) - defer ns1.Close() - } - - //fmt.Println("Bootstrapping...") - -// bstr, _ := multiaddr.NewMultiaddr("/ip4/94.130.135.167/tcp/33123/ipfs/12D3KooWFta2AE7oiK1ioqjVAKajUJauZWfeM7R413K7ARtHRDAu") -// inf, _ := peer.AddrInfoFromP2pAddr(bstr) -// list := append(ipfslite.DefaultBootstrapPeers(), *inf) -// ipfs.Bootstrap(list) -// h.ConnManager().TagPeer(inf.ID, "keep", 100) - - fmt.Printf(` -Peer ID: %s -Listen address: %s -Topic: %s -Data Folder: %s - -Ready! - -Commands: - -> list -> list items in the store -> get -> get value for a key -> put -> store value on a key -> exit -> quit - - -`, - pid, listen, *topicNameFlag, data, - ) - - if len(os.Args) > 1 && os.Args[1] == "daemon" { - fmt.Println("Running in daemon mode") - go func() { - for { - fmt.Printf("%s - %d connected peers\n", time.Now().Format(time.Stamp), len(connectedPeers(h))) - time.Sleep(10 * time.Second) - } - }() - signalChan := make(chan os.Signal, 20) - signal.Notify( - signalChan, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGHUP, - ) - <-signalChan - return - } - - crdt := Namespaces[0].Datastore - fmt.Printf("Setting default namespace %s", Namespaces[0].ID) - fmt.Printf("> ") - scanner := bufio.NewScanner(os.Stdin) - for scanner.Scan() { - text := scanner.Text() - fields := strings.Fields(text) - if len(fields) == 0 { - fmt.Printf("> ") - continue - } - - cmd := fields[0] - - switch cmd { - case "exit", "quit": - return - case "debug": - if len(fields) < 2 { - fmt.Println("debug ") - } - st := fields[1] - switch st { - case "on": - logging.SetLogLevel("globaldb", "debug") - case "off": - logging.SetLogLevel("globaldb", "error") - case "peers": - for _, p := range connectedPeers(h) { - addrs, err := peer.AddrInfoToP2pAddrs(p) - if err != nil { - logger.Warn(err) - continue - } - for _, a := range addrs { - fmt.Println(a) - } - } - } - case "list": - q := query.Query{} - results, err := crdt.Query(ctx, q) - - if err != nil { - printErr(err) - } - - //testKey := "0123912-09adskldj123-adlskj192-ajdl2k1" - //testValue := "{ \"hello\": \"world\" }" - - //err = crdt.Put(ctx, ds.NewKey(testKey), []byte(testValue)) - -// if err != nil { -// printErr(err) -// } - - for r := range results.Next() { - if r.Error != nil { - printErr(err) - continue - } - fmt.Printf("[%s] -> %s\n", r.Key, string(r.Value)) - } - case "get": - if len(fields) < 2 { - fmt.Println("get ") - fmt.Println("> ") - continue - } - k := ds.NewKey(fields[1]) - v, err := crdt.Get(ctx, k) - if err != nil { - printErr(err) - continue - } - fmt.Printf("[%s] -> %s\n", k, string(v)) - case "put": - if len(fields) < 3 { - fmt.Println("put ") - fmt.Println("> ") - continue - } - k := ds.NewKey(fields[1]) - v := strings.Join(fields[2:], " ") - err := crdt.Put(ctx, k, []byte(v)) - if err != nil { - printErr(err) - continue - } - } - fmt.Printf("> ") - } - //go streamConsoleTo(ctx, topic) - - //sub, err := topic.Subscribe() - //if err != nil { - // panic(err) - //} - //printMessagesFrom(ctx, sub) -} func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT { // Start a DHT, for use in peer discovery. We can't just make a new DHT @@ -506,7 +353,7 @@ func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT { return kademliaDHT } -func discoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) { +func DiscoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) { //cfg := NewConfig() //for _, v := range cfg { @@ -535,14 +382,15 @@ func discoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) { // } //} routingDiscovery := drouting.NewRoutingDiscovery(dht) - dutil.Advertise(ctx, routingDiscovery, *topicNameFlag) + dutil.Advertise(ctx, routingDiscovery, topicNameFlag) // Look for others who have announced and attempt to connect to them anyConnected := false fmt.Printf("Own Id: %s\n", h.ID()) for !anyConnected { + time.Sleep(2 * time.Second) fmt.Println("Searching for peers...") - peerChan, err := routingDiscovery.FindPeers(ctx, *topicNameFlag) + peerChan, err := routingDiscovery.FindPeers(ctx, topicNameFlag) if err != nil { panic(err) } @@ -562,36 +410,12 @@ func discoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) { fmt.Println("Peer discovery complete") } -func streamConsoleTo(ctx context.Context, topic *pubsub.Topic) { - reader := bufio.NewReader(os.Stdin) - for { - s, err := reader.ReadString('\n') - if err != nil { - panic(err) - } - if err := topic.Publish(ctx, []byte(fmt.Sprintf("NEWMSG: %s", s))); err != nil { - fmt.Println("### Publish error:", err) - } - } -} - -func printMessagesFrom(ctx context.Context, sub *pubsub.Subscription) { - for { - m, err := sub.Next(ctx) - if err != nil { - panic(err) - } - fmt.Println(m.ReceivedFrom, ": ", string(m.Message.Data)) - } -} - - func printErr(err error) { fmt.Println("error:", err) fmt.Println("> ") } -func connectedPeers(h host.Host) []*peer.AddrInfo { +func ConnectedPeers(h host.Host) []*peer.AddrInfo { var pinfos []*peer.AddrInfo for _, c := range h.Network().Conns() { pinfos = append(pinfos, &peer.AddrInfo{