diff --git a/main.go b/main.go new file mode 100644 index 0000000..52d3d79 --- /dev/null +++ b/main.go @@ -0,0 +1,409 @@ +package main + +import ( + "bufio" + "context" + "flag" + "fmt" + "os" + "sync" + "path/filepath" + "time" + "os/signal" + "io/ioutil" + "strings" + "syscall" + + "github.com/ipfs/go-datastore/query" + "github.com/libp2p/go-libp2p" + dht "github.com/libp2p/go-libp2p-kad-dht" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing" + dutil "github.com/libp2p/go-libp2p/p2p/discovery/util" + ds "github.com/ipfs/go-datastore" + ipfslite "github.com/hsanjuan/ipfs-lite" + + multiaddr "github.com/multiformats/go-multiaddr" + + badger "github.com/ipfs/go-ds-badger" + crdt "github.com/ipfs/go-ds-crdt" + logging "github.com/ipfs/go-log/v2" + crypto "github.com/libp2p/go-libp2p/core/crypto" +) + +var ( + topicNameFlag = flag.String("topicName", "applesauce", "name of topic to join") + dbPath = flag.String("db", "./db", "db file path") + logger = logging.Logger("globaldb") + listen = libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0") +// topicName = "globaldb-example" +// netTopic = "globaldb-example-net" +// config = "globaldb-example" +) + +func main() { + flag.Parse() + ctx := context.Background() + + h, err := libp2p.New(listen) + if err != nil { + panic(err) + } + ps, err := pubsub.NewGossipSub(ctx, h) + if err != nil { + panic(err) + } + topic, err := ps.Join(*topicNameFlag) + if err != nil { + panic(err) + } + + kademliaDHT := initDHT(ctx, h) + go discoverPeers(ctx, h, kademliaDHT) + + data := *dbPath + + store, err := badger.NewDatastore(data, &badger.DefaultOptions) + if err != nil { + logger.Fatal(err) + } + defer store.Close() + + keyPath := filepath.Join(data, "key") + var priv crypto.PrivKey + _, err = os.Stat(keyPath) + if os.IsNotExist(err) { + priv, _, err = crypto.GenerateKeyPair(crypto.Ed25519, 1) + if err != nil { + logger.Fatal(err) + } + data, err := crypto.MarshalPrivateKey(priv) + if err != nil { + logger.Fatal(err) + } + err = ioutil.WriteFile(keyPath, data, 0400) + if err != nil { + logger.Fatal(err) + } + } else if err != nil { + logger.Fatal(err) + } else { + key, err := ioutil.ReadFile(keyPath) + if err != nil { + logger.Fatal(err) + } + priv, err = crypto.UnmarshalPrivateKey(key) + if err != nil { + logger.Fatal(err) + } + + } + pid, err := peer.IDFromPublicKey(priv.GetPublic()) + if err != nil { + logger.Fatal(err) + } + + netSubs, err := topic.Subscribe() + if err != nil { + logger.Fatal(err) + } + + // Use a special pubsub topic to avoid disconnecting + // from globaldb peers. + go func() { + for { + msg, err := netSubs.Next(ctx) + if err != nil { + fmt.Println(err) + break + } + h.ConnManager().TagPeer(msg.ReceivedFrom, "keep", 100) + } + }() + + go func() { + for { + select { + case <-ctx.Done(): + return + default: + topic.Publish(ctx, []byte("hi!")) + time.Sleep(20 * time.Second) + } + } + }() + + + ipfs, err := ipfslite.New(ctx, store, nil, h, kademliaDHT, nil) + if err != nil { + logger.Fatal(err) + } + + psubCtx, psubCancel := context.WithCancel(ctx) + pubsubBC, err := crdt.NewPubSubBroadcaster(psubCtx, ps, *topicNameFlag + "BC") + if err != nil { + logger.Fatal(err) + } + + opts := crdt.DefaultOptions() + opts.Logger = logger + opts.RebroadcastInterval = 5 * time.Second + opts.PutHook = func(k ds.Key, v []byte) { + fmt.Printf("Added: [%s] -> %s\n", k, string(v)) + + } + opts.DeleteHook = func(k ds.Key) { + fmt.Printf("Removed: [%s]\n", k) + } + + crdt, err := crdt.New(store, ds.NewKey("crdt"), ipfs, pubsubBC, opts) + if err != nil { + logger.Fatal(err) + } + defer crdt.Close() + defer psubCancel() + + fmt.Println("Bootstrapping...") + + bstr, _ := multiaddr.NewMultiaddr("/ip4/94.130.135.167/tcp/33123/ipfs/12D3KooWFta2AE7oiK1ioqjVAKajUJauZWfeM7R413K7ARtHRDAu") + inf, _ := peer.AddrInfoFromP2pAddr(bstr) + list := append(ipfslite.DefaultBootstrapPeers(), *inf) + ipfs.Bootstrap(list) + h.ConnManager().TagPeer(inf.ID, "keep", 100) + + fmt.Printf(` +Peer ID: %s +Listen address: %s +Topic: %s +Data Folder: %s + +Ready! + +Commands: + +> list -> list items in the store +> get -> get value for a key +> put -> store value on a key +> exit -> quit + + +`, + pid, listen, *topicNameFlag, data, + ) + + if len(os.Args) > 1 && os.Args[1] == "daemon" { + fmt.Println("Running in daemon mode") + go func() { + for { + fmt.Printf("%s - %d connected peers\n", time.Now().Format(time.Stamp), len(connectedPeers(h))) + time.Sleep(10 * time.Second) + } + }() + signalChan := make(chan os.Signal, 20) + signal.Notify( + signalChan, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGHUP, + ) + <-signalChan + return + } + + fmt.Printf("> ") + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + text := scanner.Text() + fields := strings.Fields(text) + if len(fields) == 0 { + fmt.Printf("> ") + continue + } + + cmd := fields[0] + + switch cmd { + case "exit", "quit": + return + case "debug": + if len(fields) < 2 { + fmt.Println("debug ") + } + st := fields[1] + switch st { + case "on": + logging.SetLogLevel("globaldb", "debug") + case "off": + logging.SetLogLevel("globaldb", "error") + case "peers": + for _, p := range connectedPeers(h) { + addrs, err := peer.AddrInfoToP2pAddrs(p) + if err != nil { + logger.Warn(err) + continue + } + for _, a := range addrs { + fmt.Println(a) + } + } + } + case "list": + q := query.Query{} + results, err := crdt.Query(ctx, q) + + if err != nil { + printErr(err) + } + + testKey := "0123912-09adskldj123-adlskj192-ajdl2k1" + testValue := "{ \"hello\": \"world\" }" + + err = crdt.Put(ctx, ds.NewKey(testKey), []byte(testValue)) + + if err != nil { + printErr(err) + } + + for r := range results.Next() { + if r.Error != nil { + printErr(err) + continue + } + fmt.Printf("[%s] -> %s\n", r.Key, string(r.Value)) + } + case "get": + if len(fields) < 2 { + fmt.Println("get ") + fmt.Println("> ") + continue + } + k := ds.NewKey(fields[1]) + v, err := crdt.Get(ctx, k) + if err != nil { + printErr(err) + continue + } + fmt.Printf("[%s] -> %s\n", k, string(v)) + case "put": + if len(fields) < 3 { + fmt.Println("put ") + fmt.Println("> ") + continue + } + k := ds.NewKey(fields[1]) + v := strings.Join(fields[2:], " ") + err := crdt.Put(ctx, k, []byte(v)) + if err != nil { + printErr(err) + continue + } + } + fmt.Printf("> ") + } + //go streamConsoleTo(ctx, topic) + + //sub, err := topic.Subscribe() + //if err != nil { + // panic(err) + //} + //printMessagesFrom(ctx, sub) +} + +func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT { + // Start a DHT, for use in peer discovery. We can't just make a new DHT + // client because we want each peer to maintain its own local copy of the + // DHT, so that the bootstrapping node of the DHT can go down without + // inhibiting future peer discovery. + kademliaDHT, err := dht.New(ctx, h) + if err != nil { + panic(err) + } + if err = kademliaDHT.Bootstrap(ctx); err != nil { + panic(err) + } + var wg sync.WaitGroup + for _, peerAddr := range dht.DefaultBootstrapPeers { + peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr) + wg.Add(1) + go func() { + defer wg.Done() + if err := h.Connect(ctx, *peerinfo); err != nil { + fmt.Println("Bootstrap warning:", err) + } + }() + } + wg.Wait() + + return kademliaDHT +} + +func discoverPeers(ctx context.Context, h host.Host, dht *dht.IpfsDHT) { + routingDiscovery := drouting.NewRoutingDiscovery(dht) + dutil.Advertise(ctx, routingDiscovery, *topicNameFlag) + + // Look for others who have announced and attempt to connect to them + anyConnected := false + fmt.Printf("Own Id: %s\n", h.ID()) + for !anyConnected { + fmt.Println("Searching for peers...") + peerChan, err := routingDiscovery.FindPeers(ctx, *topicNameFlag) + if err != nil { + panic(err) + } + for peer := range peerChan { + if peer.ID == h.ID() { + continue // No self connection + } + err := h.Connect(ctx, peer) + if err != nil { + fmt.Printf("Failed connecting to %s, error: %s\n", peer.ID, err) + } else { + fmt.Println("Connected to:", peer.ID) + anyConnected = true + } + } + } + fmt.Println("Peer discovery complete") +} + +func streamConsoleTo(ctx context.Context, topic *pubsub.Topic) { + reader := bufio.NewReader(os.Stdin) + for { + s, err := reader.ReadString('\n') + if err != nil { + panic(err) + } + if err := topic.Publish(ctx, []byte(fmt.Sprintf("NEWMSG: %s", s))); err != nil { + fmt.Println("### Publish error:", err) + } + } +} + +func printMessagesFrom(ctx context.Context, sub *pubsub.Subscription) { + for { + m, err := sub.Next(ctx) + if err != nil { + panic(err) + } + fmt.Println(m.ReceivedFrom, ": ", string(m.Message.Data)) + } +} + + +func printErr(err error) { + fmt.Println("error:", err) + fmt.Println("> ") +} + +func connectedPeers(h host.Host) []*peer.AddrInfo { + var pinfos []*peer.AddrInfo + for _, c := range h.Network().Conns() { + pinfos = append(pinfos, &peer.AddrInfo{ + ID: c.RemotePeer(), + Addrs: []multiaddr.Multiaddr{c.RemoteMultiaddr()}, + }) + } + return pinfos +}