410 lines
8.8 KiB
Go
410 lines
8.8 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"path/filepath"
|
|
"time"
|
|
"os/signal"
|
|
"io/ioutil"
|
|
"strings"
|
|
"syscall"
|
|
|
|
"github.com/ipfs/go-datastore/query"
|
|
"github.com/libp2p/go-libp2p"
|
|
dht "github.com/libp2p/go-libp2p-kad-dht"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
|
|
dutil "github.com/libp2p/go-libp2p/p2p/discovery/util"
|
|
ds "github.com/ipfs/go-datastore"
|
|
ipfslite "github.com/hsanjuan/ipfs-lite"
|
|
|
|
multiaddr "github.com/multiformats/go-multiaddr"
|
|
|
|
badger "github.com/ipfs/go-ds-badger"
|
|
crdt "github.com/ipfs/go-ds-crdt"
|
|
logging "github.com/ipfs/go-log/v2"
|
|
crypto "github.com/libp2p/go-libp2p/core/crypto"
|
|
)
|
|
|
|
var (
|
|
topicNameFlag = flag.String("topicName", "applesauce", "name of topic to join")
|
|
dbPath = flag.String("db", "./db", "db file path")
|
|
logger = logging.Logger("globaldb")
|
|
listen = libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")
|
|
// topicName = "globaldb-example"
|
|
// netTopic = "globaldb-example-net"
|
|
// config = "globaldb-example"
|
|
)
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
ctx := context.Background()
|
|
|
|
h, err := libp2p.New(listen)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
ps, err := pubsub.NewGossipSub(ctx, h)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
topic, err := ps.Join(*topicNameFlag)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
kademliaDHT := initDHT(ctx, h)
|
|
go discoverPeers(ctx, h, kademliaDHT)
|
|
|
|
data := *dbPath
|
|
|
|
store, err := badger.NewDatastore(data, &badger.DefaultOptions)
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
defer store.Close()
|
|
|
|
keyPath := filepath.Join(data, "key")
|
|
var priv crypto.PrivKey
|
|
_, err = os.Stat(keyPath)
|
|
if os.IsNotExist(err) {
|
|
priv, _, err = crypto.GenerateKeyPair(crypto.Ed25519, 1)
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
data, err := crypto.MarshalPrivateKey(priv)
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
err = ioutil.WriteFile(keyPath, data, 0400)
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
} else if err != nil {
|
|
logger.Fatal(err)
|
|
} else {
|
|
key, err := ioutil.ReadFile(keyPath)
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
priv, err = crypto.UnmarshalPrivateKey(key)
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
|
|
}
|
|
pid, err := peer.IDFromPublicKey(priv.GetPublic())
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
|
|
netSubs, err := topic.Subscribe()
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
|
|
// Use a special pubsub topic to avoid disconnecting
|
|
// from globaldb peers.
|
|
go func() {
|
|
for {
|
|
msg, err := netSubs.Next(ctx)
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
break
|
|
}
|
|
h.ConnManager().TagPeer(msg.ReceivedFrom, "keep", 100)
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
topic.Publish(ctx, []byte("hi!"))
|
|
time.Sleep(20 * time.Second)
|
|
}
|
|
}
|
|
}()
|
|
|
|
|
|
ipfs, err := ipfslite.New(ctx, store, nil, h, kademliaDHT, nil)
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
|
|
psubCtx, psubCancel := context.WithCancel(ctx)
|
|
pubsubBC, err := crdt.NewPubSubBroadcaster(psubCtx, ps, *topicNameFlag + "BC")
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
|
|
opts := crdt.DefaultOptions()
|
|
opts.Logger = logger
|
|
opts.RebroadcastInterval = 5 * time.Second
|
|
opts.PutHook = func(k ds.Key, v []byte) {
|
|
fmt.Printf("Added: [%s] -> %s\n", k, string(v))
|
|
|
|
}
|
|
opts.DeleteHook = func(k ds.Key) {
|
|
fmt.Printf("Removed: [%s]\n", k)
|
|
}
|
|
|
|
crdt, err := crdt.New(store, ds.NewKey("crdt"), ipfs, pubsubBC, opts)
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
defer crdt.Close()
|
|
defer psubCancel()
|
|
|
|
fmt.Println("Bootstrapping...")
|
|
|
|
bstr, _ := multiaddr.NewMultiaddr("/ip4/94.130.135.167/tcp/33123/ipfs/12D3KooWFta2AE7oiK1ioqjVAKajUJauZWfeM7R413K7ARtHRDAu")
|
|
inf, _ := peer.AddrInfoFromP2pAddr(bstr)
|
|
list := append(ipfslite.DefaultBootstrapPeers(), *inf)
|
|
ipfs.Bootstrap(list)
|
|
h.ConnManager().TagPeer(inf.ID, "keep", 100)
|
|
|
|
fmt.Printf(`
|
|
Peer ID: %s
|
|
Listen address: %s
|
|
Topic: %s
|
|
Data Folder: %s
|
|
|
|
Ready!
|
|
|
|
Commands:
|
|
|
|
> list -> list items in the store
|
|
> get <key> -> get value for a key
|
|
> put <key> <value> -> store value on a key
|
|
> exit -> quit
|
|
|
|
|
|
`,
|
|
pid, listen, *topicNameFlag, data,
|
|
)
|
|
|
|
if len(os.Args) > 1 && os.Args[1] == "daemon" {
|
|
fmt.Println("Running in daemon mode")
|
|
go func() {
|
|
for {
|
|
fmt.Printf("%s - %d connected peers\n", time.Now().Format(time.Stamp), len(connectedPeers(h)))
|
|
time.Sleep(10 * time.Second)
|
|
}
|
|
}()
|
|
signalChan := make(chan os.Signal, 20)
|
|
signal.Notify(
|
|
signalChan,
|
|
syscall.SIGINT,
|
|
syscall.SIGTERM,
|
|
syscall.SIGHUP,
|
|
)
|
|
<-signalChan
|
|
return
|
|
}
|
|
|
|
fmt.Printf("> ")
|
|
scanner := bufio.NewScanner(os.Stdin)
|
|
for scanner.Scan() {
|
|
text := scanner.Text()
|
|
fields := strings.Fields(text)
|
|
if len(fields) == 0 {
|
|
fmt.Printf("> ")
|
|
continue
|
|
}
|
|
|
|
cmd := fields[0]
|
|
|
|
switch cmd {
|
|
case "exit", "quit":
|
|
return
|
|
case "debug":
|
|
if len(fields) < 2 {
|
|
fmt.Println("debug <on/off/peers>")
|
|
}
|
|
st := fields[1]
|
|
switch st {
|
|
case "on":
|
|
logging.SetLogLevel("globaldb", "debug")
|
|
case "off":
|
|
logging.SetLogLevel("globaldb", "error")
|
|
case "peers":
|
|
for _, p := range connectedPeers(h) {
|
|
addrs, err := peer.AddrInfoToP2pAddrs(p)
|
|
if err != nil {
|
|
logger.Warn(err)
|
|
continue
|
|
}
|
|
for _, a := range addrs {
|
|
fmt.Println(a)
|
|
}
|
|
}
|
|
}
|
|
case "list":
|
|
q := query.Query{}
|
|
results, err := crdt.Query(ctx, q)
|
|
|
|
if err != nil {
|
|
printErr(err)
|
|
}
|
|
|
|
testKey := "0123912-09adskldj123-adlskj192-ajdl2k1"
|
|
testValue := "{ \"hello\": \"world\" }"
|
|
|
|
err = crdt.Put(ctx, ds.NewKey(testKey), []byte(testValue))
|
|
|
|
if err != nil {
|
|
printErr(err)
|
|
}
|
|
|
|
for r := range results.Next() {
|
|
if r.Error != nil {
|
|
printErr(err)
|
|
continue
|
|
}
|
|
fmt.Printf("[%s] -> %s\n", r.Key, string(r.Value))
|
|
}
|
|
case "get":
|
|
if len(fields) < 2 {
|
|
fmt.Println("get <key>")
|
|
fmt.Println("> ")
|
|
continue
|
|
}
|
|
k := ds.NewKey(fields[1])
|
|
v, err := crdt.Get(ctx, k)
|
|
if err != nil {
|
|
printErr(err)
|
|
continue
|
|
}
|
|
fmt.Printf("[%s] -> %s\n", k, string(v))
|
|
case "put":
|
|
if len(fields) < 3 {
|
|
fmt.Println("put <key> <value>")
|
|
fmt.Println("> ")
|
|
continue
|
|
}
|
|
k := ds.NewKey(fields[1])
|
|
v := strings.Join(fields[2:], " ")
|
|
err := crdt.Put(ctx, k, []byte(v))
|
|
if err != nil {
|
|
printErr(err)
|
|
continue
|
|
}
|
|
}
|
|
fmt.Printf("> ")
|
|
}
|
|
//go streamConsoleTo(ctx, topic)
|
|
|
|
//sub, err := topic.Subscribe()
|
|
//if err != nil {
|
|
// panic(err)
|
|
//}
|
|
//printMessagesFrom(ctx, sub)
|
|
}
|
|
|
|
func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT {
|
|
// Start a DHT, for use in peer discovery. We can't just make a new DHT
|
|
// client because we want each peer to maintain its own local copy of the
|
|
// DHT, so that the bootstrapping node of the DHT can go down without
|
|
// inhibiting future peer discovery.
|
|
kademliaDHT, err := dht.New(ctx, h)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
if err = kademliaDHT.Bootstrap(ctx); err != nil {
|
|
panic(err)
|
|
}
|
|
var wg sync.WaitGroup
|
|
for _, peerAddr := range dht.DefaultBootstrapPeers {
|
|
peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
if err := h.Connect(ctx, *peerinfo); err != nil {
|
|
fmt.Println("Bootstrap warning:", err)
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
|
|
return kademliaDHT
|
|
}
|
|
|
|
func discoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) {
|
|
routingDiscovery := drouting.NewRoutingDiscovery(dht)
|
|
dutil.Advertise(ctx, routingDiscovery, *topicNameFlag)
|
|
|
|
// Look for others who have announced and attempt to connect to them
|
|
anyConnected := false
|
|
fmt.Printf("Own Id: %s\n", h.ID())
|
|
for !anyConnected {
|
|
fmt.Println("Searching for peers...")
|
|
peerChan, err := routingDiscovery.FindPeers(ctx, *topicNameFlag)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
for peer := range peerChan {
|
|
if peer.ID == h.ID() {
|
|
continue // No self connection
|
|
}
|
|
err := h.Connect(ctx, peer)
|
|
if err != nil {
|
|
fmt.Printf("Failed connecting to %s, error: %s\n", peer.ID, err)
|
|
} else {
|
|
fmt.Println("Connected to:", peer.ID)
|
|
anyConnected = true
|
|
}
|
|
}
|
|
}
|
|
fmt.Println("Peer discovery complete")
|
|
}
|
|
|
|
func streamConsoleTo(ctx context.Context, topic *pubsub.Topic) {
|
|
reader := bufio.NewReader(os.Stdin)
|
|
for {
|
|
s, err := reader.ReadString('\n')
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
if err := topic.Publish(ctx, []byte(fmt.Sprintf("NEWMSG: %s", s))); err != nil {
|
|
fmt.Println("### Publish error:", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func printMessagesFrom(ctx context.Context, sub *pubsub.Subscription) {
|
|
for {
|
|
m, err := sub.Next(ctx)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
fmt.Println(m.ReceivedFrom, ": ", string(m.Message.Data))
|
|
}
|
|
}
|
|
|
|
|
|
func printErr(err error) {
|
|
fmt.Println("error:", err)
|
|
fmt.Println("> ")
|
|
}
|
|
|
|
func connectedPeers(h host.Host) []*peer.AddrInfo {
|
|
var pinfos []*peer.AddrInfo
|
|
for _, c := range h.Network().Conns() {
|
|
pinfos = append(pinfos, &peer.AddrInfo{
|
|
ID: c.RemotePeer(),
|
|
Addrs: []multiaddr.Multiaddr{c.RemoteMultiaddr()},
|
|
})
|
|
}
|
|
return pinfos
|
|
}
|