split files
This commit is contained in:
246
cmd/ppass/ppass.go
Normal file
246
cmd/ppass/ppass.go
Normal file
@@ -0,0 +1,246 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"context"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
"os/signal"
|
||||||
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
ipfslite "github.com/hsanjuan/ipfs-lite"
|
||||||
|
badger "github.com/ipfs/go-ds-badger2"
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
|
||||||
|
"github.com/k4lipso/pentapass/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
topicNameFlag = flag.String("topicName", "akdjlask-23klaj2idalj2-ajl2kjd3i-2ldakjd2", "name of topic to join")
|
||||||
|
dbPath = flag.String("db", "./db", "db file path")
|
||||||
|
nameSpace = flag.String("namespace", "crdt", "namespace")
|
||||||
|
logger = logging.Logger("globaldb")
|
||||||
|
// topicName = "globaldb-example"
|
||||||
|
// netTopic = "globaldb-example-net"
|
||||||
|
// config = "globaldb-example"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
flag.Parse()
|
||||||
|
ctx := context.Background()
|
||||||
|
data := *dbPath
|
||||||
|
|
||||||
|
h, dht, err := storage.SetupLibp2pHost(ctx, *dbPath)
|
||||||
|
|
||||||
|
pid := h.ID().String()
|
||||||
|
fmt.Println(h.ID().String())
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ps, err := pubsub.NewGossipSub(ctx, h)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
//topic, err := ps.Join(*topicNameFlag)
|
||||||
|
//if err != nil {
|
||||||
|
// panic(err)
|
||||||
|
//}
|
||||||
|
|
||||||
|
go storage.DiscoverPeers(ctx, h, dht)
|
||||||
|
|
||||||
|
store, err := badger.NewDatastore(data, &badger.DefaultOptions)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal(err)
|
||||||
|
}
|
||||||
|
defer store.Close()
|
||||||
|
|
||||||
|
ipfs, err := ipfslite.New(ctx, store, nil, h, dht, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
storageHandler := storage.StorageHandler{
|
||||||
|
Ctx: ctx ,
|
||||||
|
Store: store,
|
||||||
|
Host: h,
|
||||||
|
Ipfs: ipfs,
|
||||||
|
PubSub: ps,
|
||||||
|
}
|
||||||
|
|
||||||
|
Cfg := storage.NewConfig()
|
||||||
|
|
||||||
|
Namespaces := make(map[string]*storage.Namespace)
|
||||||
|
for _, nsCfg := range Cfg {
|
||||||
|
ns1, err := storage.CreateNamespace(nsCfg.Id, storageHandler)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
Namespaces[nsCfg.Name] = ns1
|
||||||
|
defer ns1.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf(`
|
||||||
|
Peer ID: %s
|
||||||
|
Listen address: %s
|
||||||
|
Topic: %s
|
||||||
|
Data Folder: %s
|
||||||
|
|
||||||
|
Ready!
|
||||||
|
|
||||||
|
Commands:
|
||||||
|
|
||||||
|
> list -> list items in the store
|
||||||
|
> get <key> -> get value for a key
|
||||||
|
> put <key> <value> -> store value on a key
|
||||||
|
> exit -> quit
|
||||||
|
|
||||||
|
|
||||||
|
`,
|
||||||
|
pid, storage.Listen, *topicNameFlag, data,
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(os.Args) > 1 && os.Args[1] == "daemon" {
|
||||||
|
fmt.Println("Running in daemon mode")
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
fmt.Printf("%s - %d connected peers\n", time.Now().Format(time.Stamp), len(storage.ConnectedPeers(h)))
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
signalChan := make(chan os.Signal, 20)
|
||||||
|
signal.Notify(
|
||||||
|
signalChan,
|
||||||
|
syscall.SIGINT,
|
||||||
|
syscall.SIGTERM,
|
||||||
|
syscall.SIGHUP,
|
||||||
|
)
|
||||||
|
<-signalChan
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("> ")
|
||||||
|
scanner := bufio.NewScanner(os.Stdin)
|
||||||
|
for scanner.Scan() {
|
||||||
|
text := scanner.Text()
|
||||||
|
fields := strings.Fields(text)
|
||||||
|
if len(fields) == 0 {
|
||||||
|
fmt.Printf("> ")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd := fields[0]
|
||||||
|
|
||||||
|
switch cmd {
|
||||||
|
case "exit", "quit":
|
||||||
|
return
|
||||||
|
case "debug":
|
||||||
|
if len(fields) < 2 {
|
||||||
|
fmt.Println("debug <on/off/peers>")
|
||||||
|
}
|
||||||
|
st := fields[1]
|
||||||
|
switch st {
|
||||||
|
case "on":
|
||||||
|
logging.SetLogLevel("globaldb", "debug")
|
||||||
|
case "off":
|
||||||
|
logging.SetLogLevel("globaldb", "error")
|
||||||
|
case "peers":
|
||||||
|
for _, p := range storage.ConnectedPeers(h) {
|
||||||
|
addrs, err := peer.AddrInfoToP2pAddrs(p)
|
||||||
|
if err != nil {
|
||||||
|
logger.Warn(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, a := range addrs {
|
||||||
|
fmt.Println(a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case "list":
|
||||||
|
if len(fields) < 2 {
|
||||||
|
fmt.Printf("Available Namespaces:\n")
|
||||||
|
for k := range Namespaces {
|
||||||
|
fmt.Printf("%s\n", k)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace := fields[1]
|
||||||
|
|
||||||
|
fmt.Printf("Listing content of %s", namespace)
|
||||||
|
|
||||||
|
val, ok := Namespaces[namespace]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
fmt.Println("Namespace does not exist")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
val.List()
|
||||||
|
case "get":
|
||||||
|
if len(fields) < 3 {
|
||||||
|
fmt.Println("get <namespace> <key>")
|
||||||
|
fmt.Println("> ")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace := fields[1]
|
||||||
|
|
||||||
|
val, ok := Namespaces[namespace]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
fmt.Println("Namespace does not exist")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
k := fields[2]
|
||||||
|
v, err := val.Get(k)
|
||||||
|
if err != nil {
|
||||||
|
printErr(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("[%s] -> %s\n", k, string(v))
|
||||||
|
case "put":
|
||||||
|
if len(fields) < 4 {
|
||||||
|
fmt.Println("put <namespace> <key> <value>")
|
||||||
|
fmt.Println("> ")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace := fields[1]
|
||||||
|
|
||||||
|
val, ok := Namespaces[namespace]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
fmt.Println("Namespace does not exist")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
k := fields[2]
|
||||||
|
v := strings.Join(fields[3:], " ")
|
||||||
|
err := val.Put(k, v)
|
||||||
|
if err != nil {
|
||||||
|
printErr(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fmt.Printf("> ")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func printErr(err error) {
|
||||||
|
fmt.Println("error:", err)
|
||||||
|
fmt.Println("> ")
|
||||||
|
}
|
||||||
@@ -1,18 +1,13 @@
|
|||||||
package main
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"context"
|
"context"
|
||||||
"flag"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
"os/signal"
|
"sync"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"strings"
|
|
||||||
"syscall"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-datastore/query"
|
"github.com/ipfs/go-datastore/query"
|
||||||
"github.com/libp2p/go-libp2p"
|
"github.com/libp2p/go-libp2p"
|
||||||
@@ -20,35 +15,32 @@ import (
|
|||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
"github.com/libp2p/go-libp2p/core/host"
|
"github.com/libp2p/go-libp2p/core/host"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
"github.com/libp2p/go-libp2p/core/control"
|
||||||
//"github.com/libp2p/go-libp2p/core/peerstore"
|
//"github.com/libp2p/go-libp2p/core/peerstore"
|
||||||
drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
|
drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
|
||||||
dutil "github.com/libp2p/go-libp2p/p2p/discovery/util"
|
dutil "github.com/libp2p/go-libp2p/p2p/discovery/util"
|
||||||
ds "github.com/ipfs/go-datastore"
|
ds "github.com/ipfs/go-datastore"
|
||||||
ipfslite "github.com/hsanjuan/ipfs-lite"
|
ipfslite "github.com/hsanjuan/ipfs-lite"
|
||||||
|
"github.com/libp2p/go-libp2p/core/network"
|
||||||
|
|
||||||
multiaddr "github.com/multiformats/go-multiaddr"
|
multiaddr "github.com/multiformats/go-multiaddr"
|
||||||
|
|
||||||
badger "github.com/ipfs/go-ds-badger2"
|
badger "github.com/ipfs/go-ds-badger2"
|
||||||
dsq "github.com/ipfs/go-datastore/query"
|
dsq "github.com/ipfs/go-datastore/query"
|
||||||
crdt "github.com/ipfs/go-ds-crdt"
|
crdt "github.com/ipfs/go-ds-crdt"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
|
||||||
crypto "github.com/libp2p/go-libp2p/core/crypto"
|
crypto "github.com/libp2p/go-libp2p/core/crypto"
|
||||||
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
|
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
topicNameFlag = flag.String("topicName", "applesauce1234", "name of topic to join")
|
topicNameFlag = "akdjlask-23klaj2idalj2-ajl2kjd3i-2ldakjd2"
|
||||||
dbPath = flag.String("db", "./db", "db file path")
|
|
||||||
nameSpace = flag.String("namespace", "crdt", "namespace")
|
|
||||||
logger = logging.Logger("globaldb")
|
logger = logging.Logger("globaldb")
|
||||||
listen = libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")
|
Listen = libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")
|
||||||
// topicName = "globaldb-example"
|
|
||||||
// netTopic = "globaldb-example-net"
|
|
||||||
// config = "globaldb-example"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func SetupLibp2pHost(ctx context.Context) (host host.Host, dht *dht.IpfsDHT, err error) {
|
func SetupLibp2pHost(ctx context.Context, dbPath string) (host host.Host, dht *dht.IpfsDHT, err error) {
|
||||||
data := *dbPath
|
data := dbPath
|
||||||
keyPath := filepath.Join(data, "key")
|
keyPath := filepath.Join(data, "key")
|
||||||
var priv crypto.PrivKey
|
var priv crypto.PrivKey
|
||||||
_, err = os.Stat(keyPath)
|
_, err = os.Stat(keyPath)
|
||||||
@@ -83,7 +75,16 @@ func SetupLibp2pHost(ctx context.Context) (host host.Host, dht *dht.IpfsDHT, err
|
|||||||
logger.Fatal(err)
|
logger.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
host, err = libp2p.New(libp2p.Identity(priv), listen)
|
//whitelistedPeers := map[peer.ID]struct{} {
|
||||||
|
// "12D3KooWLF7BU5VgpqWdS1XwSTFCLphENozhYQAj6i5LqU8BPZZZ": {},
|
||||||
|
// "12D3KooWBRvtW83QYnPgJCyVyAgMXtg71wjkGefVB2fBnm1A36kS": {},
|
||||||
|
// "12D3KooWMmc4kYy78vSumqWtPkUNAoPeCpJ66ysFv1U8S554B7e2": {},
|
||||||
|
//}
|
||||||
|
|
||||||
|
//connectionGater := &WhitelistConnectionGater{whitelistedPeers: whitelistedPeers}
|
||||||
|
|
||||||
|
//host, err = libp2p.New(libp2p.Identity(priv), libp2p.ConnectionGater(connectionGater), Listen)
|
||||||
|
host, err = libp2p.New(libp2p.Identity(priv), Listen)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
@@ -118,19 +119,66 @@ func NewConfig() []NamespaceConfig {
|
|||||||
Peers: []Peer{
|
Peers: []Peer{
|
||||||
{ Id: "12D3KooWLF7BU5VgpqWdS1XwSTFCLphENozhYQAj6i5LqU8BPZZZ" },
|
{ Id: "12D3KooWLF7BU5VgpqWdS1XwSTFCLphENozhYQAj6i5LqU8BPZZZ" },
|
||||||
{ Id: "12D3KooWBRvtW83QYnPgJCyVyAgMXtg71wjkGefVB2fBnm1A36kS" },
|
{ Id: "12D3KooWBRvtW83QYnPgJCyVyAgMXtg71wjkGefVB2fBnm1A36kS" },
|
||||||
|
{ Id: "12D3KooWMmc4kYy78vSumqWtPkUNAoPeCpJ66ysFv1U8S554B7e2" },
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "TestNamespace",
|
Name: "foo",
|
||||||
Id: "2-903djl1290djl1-21jdl1kjd2-1d1jdl1k2jd11",
|
Id: "2-903djl1290djl1-21jdl1kjd2-1d1jdl1k2jd11",
|
||||||
Peers: []Peer{
|
Peers: []Peer{
|
||||||
{ Id: "12D3KooWLF7BU5VgpqWdS1XwSTFCLphENozhYQAj6i5LqU8BPZZZ" },
|
{ Id: "12D3KooWLF7BU5VgpqWdS1XwSTFCLphENozhYQAj6i5LqU8BPZZZ" },
|
||||||
{ Id: "12D3KooWBRvtW83QYnPgJCyVyAgMXtg71wjkGefVB2fBnm1A36kS" },
|
{ Id: "12D3KooWMmc4kYy78vSumqWtPkUNAoPeCpJ66ysFv1U8S554B7e2" },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "bar",
|
||||||
|
Id: "2-90ssssssssdjl1-21jdl1kjd2-1d1jdl1k2jd11",
|
||||||
|
Peers: []Peer{
|
||||||
|
//{ Id: "12D3KooWLF7BU5VgpqWdS1XwSTFCLphENozhYQAj6i5LqU8BPZZZ" },
|
||||||
|
//{ Id: "12D3KooWBRvtW83QYnPgJCyVyAgMXtg71wjkGefVB2fBnm1A36kS" },
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
type WhitelistConnectionGater struct {
|
||||||
|
whitelistedPeers map[peer.ID]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wg *WhitelistConnectionGater) InterceptPeerDial(p peer.ID) (allowed bool) {
|
||||||
|
//fmt.Println("PeerDial")
|
||||||
|
//_, allowed = wg.whitelistedPeers[p]
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wg *WhitelistConnectionGater) InterceptAddrDial(p peer.ID, addr multiaddr.Multiaddr) (bool) {
|
||||||
|
//fmt.Println("AddrDial")
|
||||||
|
return wg.InterceptPeerDial(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wg *WhitelistConnectionGater) InterceptAccept(conn network.ConnMultiaddrs) (bool) {
|
||||||
|
//fmt.Println("InterceptAccept")
|
||||||
|
addr, err := peer.AddrInfoFromP2pAddr(conn.RemoteMultiaddr())
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Error InterceptAccept: %s\n", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return wg.InterceptPeerDial(addr.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wg *WhitelistConnectionGater) InterceptSecured(direction network.Direction, p peer.ID, conn network.ConnMultiaddrs) (allow bool) {
|
||||||
|
//fmt.Println("InterceptSecured")
|
||||||
|
return wg.InterceptPeerDial(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wg *WhitelistConnectionGater) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason) {
|
||||||
|
//fmt.Println("InterceptUpgraded")
|
||||||
|
return wg.InterceptPeerDial(conn.RemotePeer()), 0
|
||||||
|
}
|
||||||
|
|
||||||
func GetTrustedPeers() map[string][]Peer {
|
func GetTrustedPeers() map[string][]Peer {
|
||||||
cfg := NewConfig()
|
cfg := NewConfig()
|
||||||
|
|
||||||
@@ -151,6 +199,46 @@ type Namespace struct {
|
|||||||
Datastore *crdt.Datastore
|
Datastore *crdt.Datastore
|
||||||
//Registry *sharedKeyRegistry
|
//Registry *sharedKeyRegistry
|
||||||
CancelFunc context.CancelFunc
|
CancelFunc context.CancelFunc
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func (n *Namespace) Put(k string, v string) error {
|
||||||
|
key := ds.NewKey(k)
|
||||||
|
err := n.Datastore.Put(n.ctx, key, []byte(v))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
printErr(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Namespace) Get(k string) (string, error) {
|
||||||
|
v, err := n.Datastore.Get(n.ctx, ds.NewKey(k))
|
||||||
|
if err != nil {
|
||||||
|
printErr(err)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(v), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Namespace) List() {
|
||||||
|
q := query.Query{}
|
||||||
|
results, err := n.Datastore.Query(n.ctx, q)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
printErr(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for r := range results.Next() {
|
||||||
|
if r.Error != nil {
|
||||||
|
printErr(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fmt.Printf("[%s] -> %s\n", r.Key, string(r.Value))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Namespace) Close() {
|
func (n *Namespace) Close() {
|
||||||
@@ -174,13 +262,11 @@ func IsTrustedPeer(ctx context.Context, id peer.ID, namespace string) bool {
|
|||||||
if ok {
|
if ok {
|
||||||
for _, v := range val {
|
for _, v := range val {
|
||||||
if v.Id == id.String() {
|
if v.Id == id.String() {
|
||||||
fmt.Printf("Trusting %s in Namspace %s\n", id.String(), namespace)
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("NOT! trusting %s in Namspace %s\n", id.String(), namespace)
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -199,7 +285,6 @@ func CreateNamespace(ID string, storageHandler StorageHandler) (*Namespace, erro
|
|||||||
err := storageHandler.PubSub.RegisterTopicValidator(
|
err := storageHandler.PubSub.RegisterTopicValidator(
|
||||||
ID, //== topicName
|
ID, //== topicName
|
||||||
func(ctx context.Context, _ peer.ID, msg *pubsub.Message) bool {
|
func(ctx context.Context, _ peer.ID, msg *pubsub.Message) bool {
|
||||||
fmt.Println("Running Validator")
|
|
||||||
signer := msg.GetFrom()
|
signer := msg.GetFrom()
|
||||||
trusted := IsTrustedPeer(ctx, signer, ID)
|
trusted := IsTrustedPeer(ctx, signer, ID)
|
||||||
if !trusted {
|
if !trusted {
|
||||||
@@ -229,254 +314,16 @@ func CreateNamespace(ID string, storageHandler StorageHandler) (*Namespace, erro
|
|||||||
fmt.Printf("Removed: [%s]\n", k)
|
fmt.Printf("Removed: [%s]\n", k)
|
||||||
}
|
}
|
||||||
|
|
||||||
crdt, err := crdt.New(storageHandler.Store, ds.NewKey(*nameSpace), storageHandler.Ipfs, pubsubBC, opts)
|
crdt, err := crdt.New(storageHandler.Store, ds.NewKey(ID), storageHandler.Ipfs, pubsubBC, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatal(err)
|
logger.Fatal(err)
|
||||||
psubCancel()
|
psubCancel()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Namespace{ID: ID, Datastore: crdt, CancelFunc: psubCancel}, nil
|
return &Namespace{ID: ID, Datastore: crdt, CancelFunc: psubCancel, ctx: storageHandler.Ctx}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
|
||||||
flag.Parse()
|
|
||||||
ctx := context.Background()
|
|
||||||
data := *dbPath
|
|
||||||
|
|
||||||
h, dht, err := SetupLibp2pHost(ctx)
|
|
||||||
|
|
||||||
pid := h.ID().String()
|
|
||||||
fmt.Println(h.ID().String())
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ps, err := pubsub.NewGossipSub(ctx, h)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
topic, err := ps.Join(*topicNameFlag)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go discoverPeers(ctx, h, dht)
|
|
||||||
|
|
||||||
store, err := badger.NewDatastore(data, &badger.DefaultOptions)
|
|
||||||
if err != nil {
|
|
||||||
logger.Fatal(err)
|
|
||||||
}
|
|
||||||
defer store.Close()
|
|
||||||
|
|
||||||
PrintDBContent(ctx, store)
|
|
||||||
|
|
||||||
netSubs, err := topic.Subscribe()
|
|
||||||
if err != nil {
|
|
||||||
logger.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use a special pubsub topic to avoid disconnecting
|
|
||||||
// from globaldb peers.
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
msg, err := netSubs.Next(ctx)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
h.ConnManager().TagPeer(msg.ReceivedFrom, "keep", 100)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
topic.Publish(ctx, []byte("hi!"))
|
|
||||||
time.Sleep(20 * time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
ipfs, err := ipfslite.New(ctx, store, nil, h, dht, nil)
|
|
||||||
if err != nil {
|
|
||||||
logger.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
storageHandler := StorageHandler{
|
|
||||||
Ctx: ctx ,
|
|
||||||
Store: store,
|
|
||||||
Host: h,
|
|
||||||
Ipfs: ipfs,
|
|
||||||
PubSub: ps,
|
|
||||||
}
|
|
||||||
|
|
||||||
Cfg := NewConfig()
|
|
||||||
|
|
||||||
var Namespaces []*Namespace
|
|
||||||
for _, nsCfg := range Cfg {
|
|
||||||
ns1, err := CreateNamespace(nsCfg.Id, storageHandler)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
logger.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
Namespaces = append(Namespaces, ns1)
|
|
||||||
defer ns1.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
//fmt.Println("Bootstrapping...")
|
|
||||||
|
|
||||||
// bstr, _ := multiaddr.NewMultiaddr("/ip4/94.130.135.167/tcp/33123/ipfs/12D3KooWFta2AE7oiK1ioqjVAKajUJauZWfeM7R413K7ARtHRDAu")
|
|
||||||
// inf, _ := peer.AddrInfoFromP2pAddr(bstr)
|
|
||||||
// list := append(ipfslite.DefaultBootstrapPeers(), *inf)
|
|
||||||
// ipfs.Bootstrap(list)
|
|
||||||
// h.ConnManager().TagPeer(inf.ID, "keep", 100)
|
|
||||||
|
|
||||||
fmt.Printf(`
|
|
||||||
Peer ID: %s
|
|
||||||
Listen address: %s
|
|
||||||
Topic: %s
|
|
||||||
Data Folder: %s
|
|
||||||
|
|
||||||
Ready!
|
|
||||||
|
|
||||||
Commands:
|
|
||||||
|
|
||||||
> list -> list items in the store
|
|
||||||
> get <key> -> get value for a key
|
|
||||||
> put <key> <value> -> store value on a key
|
|
||||||
> exit -> quit
|
|
||||||
|
|
||||||
|
|
||||||
`,
|
|
||||||
pid, listen, *topicNameFlag, data,
|
|
||||||
)
|
|
||||||
|
|
||||||
if len(os.Args) > 1 && os.Args[1] == "daemon" {
|
|
||||||
fmt.Println("Running in daemon mode")
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
fmt.Printf("%s - %d connected peers\n", time.Now().Format(time.Stamp), len(connectedPeers(h)))
|
|
||||||
time.Sleep(10 * time.Second)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
signalChan := make(chan os.Signal, 20)
|
|
||||||
signal.Notify(
|
|
||||||
signalChan,
|
|
||||||
syscall.SIGINT,
|
|
||||||
syscall.SIGTERM,
|
|
||||||
syscall.SIGHUP,
|
|
||||||
)
|
|
||||||
<-signalChan
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
crdt := Namespaces[0].Datastore
|
|
||||||
fmt.Printf("Setting default namespace %s", Namespaces[0].ID)
|
|
||||||
fmt.Printf("> ")
|
|
||||||
scanner := bufio.NewScanner(os.Stdin)
|
|
||||||
for scanner.Scan() {
|
|
||||||
text := scanner.Text()
|
|
||||||
fields := strings.Fields(text)
|
|
||||||
if len(fields) == 0 {
|
|
||||||
fmt.Printf("> ")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
cmd := fields[0]
|
|
||||||
|
|
||||||
switch cmd {
|
|
||||||
case "exit", "quit":
|
|
||||||
return
|
|
||||||
case "debug":
|
|
||||||
if len(fields) < 2 {
|
|
||||||
fmt.Println("debug <on/off/peers>")
|
|
||||||
}
|
|
||||||
st := fields[1]
|
|
||||||
switch st {
|
|
||||||
case "on":
|
|
||||||
logging.SetLogLevel("globaldb", "debug")
|
|
||||||
case "off":
|
|
||||||
logging.SetLogLevel("globaldb", "error")
|
|
||||||
case "peers":
|
|
||||||
for _, p := range connectedPeers(h) {
|
|
||||||
addrs, err := peer.AddrInfoToP2pAddrs(p)
|
|
||||||
if err != nil {
|
|
||||||
logger.Warn(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, a := range addrs {
|
|
||||||
fmt.Println(a)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case "list":
|
|
||||||
q := query.Query{}
|
|
||||||
results, err := crdt.Query(ctx, q)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
printErr(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
//testKey := "0123912-09adskldj123-adlskj192-ajdl2k1"
|
|
||||||
//testValue := "{ \"hello\": \"world\" }"
|
|
||||||
|
|
||||||
//err = crdt.Put(ctx, ds.NewKey(testKey), []byte(testValue))
|
|
||||||
|
|
||||||
// if err != nil {
|
|
||||||
// printErr(err)
|
|
||||||
// }
|
|
||||||
|
|
||||||
for r := range results.Next() {
|
|
||||||
if r.Error != nil {
|
|
||||||
printErr(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
fmt.Printf("[%s] -> %s\n", r.Key, string(r.Value))
|
|
||||||
}
|
|
||||||
case "get":
|
|
||||||
if len(fields) < 2 {
|
|
||||||
fmt.Println("get <key>")
|
|
||||||
fmt.Println("> ")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
k := ds.NewKey(fields[1])
|
|
||||||
v, err := crdt.Get(ctx, k)
|
|
||||||
if err != nil {
|
|
||||||
printErr(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
fmt.Printf("[%s] -> %s\n", k, string(v))
|
|
||||||
case "put":
|
|
||||||
if len(fields) < 3 {
|
|
||||||
fmt.Println("put <key> <value>")
|
|
||||||
fmt.Println("> ")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
k := ds.NewKey(fields[1])
|
|
||||||
v := strings.Join(fields[2:], " ")
|
|
||||||
err := crdt.Put(ctx, k, []byte(v))
|
|
||||||
if err != nil {
|
|
||||||
printErr(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fmt.Printf("> ")
|
|
||||||
}
|
|
||||||
//go streamConsoleTo(ctx, topic)
|
|
||||||
|
|
||||||
//sub, err := topic.Subscribe()
|
|
||||||
//if err != nil {
|
|
||||||
// panic(err)
|
|
||||||
//}
|
|
||||||
//printMessagesFrom(ctx, sub)
|
|
||||||
}
|
|
||||||
|
|
||||||
func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT {
|
func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT {
|
||||||
// Start a DHT, for use in peer discovery. We can't just make a new DHT
|
// Start a DHT, for use in peer discovery. We can't just make a new DHT
|
||||||
@@ -506,7 +353,7 @@ func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT {
|
|||||||
return kademliaDHT
|
return kademliaDHT
|
||||||
}
|
}
|
||||||
|
|
||||||
func discoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) {
|
func DiscoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) {
|
||||||
//cfg := NewConfig()
|
//cfg := NewConfig()
|
||||||
|
|
||||||
//for _, v := range cfg {
|
//for _, v := range cfg {
|
||||||
@@ -535,14 +382,15 @@ func discoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) {
|
|||||||
// }
|
// }
|
||||||
//}
|
//}
|
||||||
routingDiscovery := drouting.NewRoutingDiscovery(dht)
|
routingDiscovery := drouting.NewRoutingDiscovery(dht)
|
||||||
dutil.Advertise(ctx, routingDiscovery, *topicNameFlag)
|
dutil.Advertise(ctx, routingDiscovery, topicNameFlag)
|
||||||
|
|
||||||
// Look for others who have announced and attempt to connect to them
|
// Look for others who have announced and attempt to connect to them
|
||||||
anyConnected := false
|
anyConnected := false
|
||||||
fmt.Printf("Own Id: %s\n", h.ID())
|
fmt.Printf("Own Id: %s\n", h.ID())
|
||||||
for !anyConnected {
|
for !anyConnected {
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
fmt.Println("Searching for peers...")
|
fmt.Println("Searching for peers...")
|
||||||
peerChan, err := routingDiscovery.FindPeers(ctx, *topicNameFlag)
|
peerChan, err := routingDiscovery.FindPeers(ctx, topicNameFlag)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
@@ -562,36 +410,12 @@ func discoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) {
|
|||||||
fmt.Println("Peer discovery complete")
|
fmt.Println("Peer discovery complete")
|
||||||
}
|
}
|
||||||
|
|
||||||
func streamConsoleTo(ctx context.Context, topic *pubsub.Topic) {
|
|
||||||
reader := bufio.NewReader(os.Stdin)
|
|
||||||
for {
|
|
||||||
s, err := reader.ReadString('\n')
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
if err := topic.Publish(ctx, []byte(fmt.Sprintf("NEWMSG: %s", s))); err != nil {
|
|
||||||
fmt.Println("### Publish error:", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func printMessagesFrom(ctx context.Context, sub *pubsub.Subscription) {
|
|
||||||
for {
|
|
||||||
m, err := sub.Next(ctx)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
fmt.Println(m.ReceivedFrom, ": ", string(m.Message.Data))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
func printErr(err error) {
|
func printErr(err error) {
|
||||||
fmt.Println("error:", err)
|
fmt.Println("error:", err)
|
||||||
fmt.Println("> ")
|
fmt.Println("> ")
|
||||||
}
|
}
|
||||||
|
|
||||||
func connectedPeers(h host.Host) []*peer.AddrInfo {
|
func ConnectedPeers(h host.Host) []*peer.AddrInfo {
|
||||||
var pinfos []*peer.AddrInfo
|
var pinfos []*peer.AddrInfo
|
||||||
for _, c := range h.Network().Conns() {
|
for _, c := range h.Network().Conns() {
|
||||||
pinfos = append(pinfos, &peer.AddrInfo{
|
pinfos = append(pinfos, &peer.AddrInfo{
|
||||||
Reference in New Issue
Block a user