From a66bd6dad74ebd873740d7f2f8a295b4d6a8aeb5 Mon Sep 17 00:00:00 2001
From: Philipp Heckel <pheckel@datto.com>
Date: Sat, 23 Oct 2021 15:22:17 -0400
Subject: [PATCH] Remove websockets, readme, better UI

---
 README.md         |  46 ++++++++++++++++++---
 server/index.html |  81 +++++++++++++++++++++++++------------
 server/server.go  | 100 ++++++++++++++++------------------------------
 server/topic.go   |   8 ++--
 4 files changed, 134 insertions(+), 101 deletions(-)

diff --git a/README.md b/README.md
index 93232bc..af79562 100644
--- a/README.md
+++ b/README.md
@@ -1,10 +1,44 @@
+# ntfy
 
+ntfy is a super simple pub-sub notification service. It allows you to send desktop and (soon) phone notifications
+via scripts. I run a free version of it on *[ntfy.sh](https://ntfy.sh)*. No signups or cost.
 
-echo "mychan:long process is done" | nc -N ntfy.sh 9999
-curl -d "long process is done" ntfy.sh/mychan
-    publish on channel
+## Usage
 
-curl ntfy.sh/mychan
-    subscribe to channel
+### Subscribe to a topic
 
-ntfy.sh/mychan/ws
+You can subscribe to a topic either in a web UI, or in your own app by subscribing to an SSE/EventSource
+or JSON feed. 
+
+Here's how to do it via curl see the SSE stream in `curl`:
+
+```
+curl -s localhost:9997/mytopic/sse
+```
+
+You can easily script it to execute any command when a message arrives:
+```
+while read json; do 
+  msg="$(echo "$json" | jq -r .message)"
+  notify-send "$msg"
+done < <(stdbuf -i0 -o0 curl -s localhost:9997/mytopic/json)
+```
+
+### Publish messages
+
+Publishing messages can be done via PUT or POST using. Here's an example using `curl`:
+```
+curl -d "long process is done" ntfy.sh/mytopic
+```
+
+## TODO
+- /raw endpoint
+- netcat usage
+- rate limiting / abuse protection
+- release/packaging
+
+## Contributing
+I welcome any and all contributions. Just create a PR or an issue.
+
+## License
+Made with ❤️ by [Philipp C. Heckel](https://heckel.io), distributed under the [Apache License 2.0](LICENSE).
diff --git a/server/index.html b/server/index.html
index eaa4d5f..50d9994 100644
--- a/server/index.html
+++ b/server/index.html
@@ -10,22 +10,22 @@
 
 <p>
     ntfy.sh is a super simple pub-sub notification service. It allows you to send desktop and (soon) phone notifications
-    via scripts, without signup or cost. It's entirely free and open source.
+    via scripts, without signup or cost. It's entirely free and open source. You can find the source code <a href="https://github.com/binwiederhier/ntfy">on GitHub</a>.
 </p>
 
 <p>
-    <b>Usage:</b> You can subscribe to a topic either in this web UI, or in your own app by subscribing to an SSE/EventSource
+    You can subscribe to a topic either in this web UI, or in your own app by subscribing to an SSE/EventSource
     or JSON feed. Once subscribed, you can publish messages via PUT or POST.
 </p>
 
-<div id="error"></div>
+<p id="error"></p>
 
 <form id="subscribeForm">
     <input type="text" id="topicField" size="64" autofocus />
     <input type="submit" id="subscribeButton" value="Subscribe topic" />
 </form>
 
-Topics:
+<p>Topics:</p>
 <ul id="topicsList">
 </ul>
 
@@ -38,50 +38,68 @@ Topics:
     const subscribeForm = document.getElementById("subscribeForm");
     const errorField = document.getElementById("error");
 
-    const subscribe = function (topic) {
+    const subscribe = (topic) => {
         if (Notification.permission !== "granted") {
-            Notification.requestPermission().then(function (permission) {
+            Notification.requestPermission().then((permission) => {
                 if (permission === "granted") {
-                    subscribeInternal(topic);
+                    subscribeInternal(topic, 0);
                 }
             });
         } else {
-            subscribeInternal(topic);
+            subscribeInternal(topic, 0);
         }
     };
 
-    const subscribeInternal = function (topic) {
-        let eventSource = new EventSource(`${topic}/sse`);
-        eventSource.onerror = function (e) {
-            console.log(e);
-            errorField.innerHTML = "Error " + e;
-        };
-        eventSource.onmessage = function (e) {
-            const event = JSON.parse(e.data);
-            new Notification(event.message);
-        };
-        topics[topic] = eventSource;
-
-        let topicEntry = document.createElement('li');
-        topicEntry.id = `topic-${topic}`;
-        topicEntry.innerHTML = `${topic} <button onclick="unsubscribe('${topic}')">Unsubscribe</button>`;
-        topicsList.appendChild(topicEntry);
+    const subscribeInternal = (topic, delaySec) => {
+        setTimeout(() => {
+            // Render list entry
+            let topicEntry = document.getElementById(`topic-${topic}`);
+            if (!topicEntry) {
+                topicEntry = document.createElement('li');
+                topicEntry.id = `topic-${topic}`;
+                topicEntry.innerHTML = `${topic} <button onclick="unsubscribe('${topic}')">Unsubscribe</button>`;
+                topicsList.appendChild(topicEntry);
+            }
+
+            // Open event source
+            let eventSource = new EventSource(`${topic}/sse`);
+            eventSource.onopen = () => {
+                topicEntry.innerHTML = `${topic} <button onclick="unsubscribe('${topic}')">Unsubscribe</button>`;
+                delaySec = 0; // Reset on successful connection
+            };
+            eventSource.onerror = (e) => {
+                console.log("onerror")
+                const newDelaySec = (delaySec + 5 <= 30) ? delaySec + 5 : 30;
+                topicEntry.innerHTML = `${topic} <i>(Reconnecting in ${newDelaySec}s ...)</i> <button onclick="unsubscribe('${topic}')">Unsubscribe</button>`;
+                eventSource.close()
+                subscribeInternal(topic, newDelaySec);
+            };
+            eventSource.onmessage = (e) => {
+                const event = JSON.parse(e.data);
+                new Notification(event.message);
+            };
+            topics[topic] = eventSource;
+            localStorage.setItem('topics', JSON.stringify(Object.keys(topics)));
+        }, delaySec * 1000);
     };
 
-        const unsubscribe = function(topic) {
+    const unsubscribe = (topic) => {
         topics[topic].close();
+        delete topics[topic];
+        localStorage.setItem('topics', JSON.stringify(Object.keys(topics)));
         document.getElementById(`topic-${topic}`).remove();
     };
 
     subscribeForm.onsubmit = function () {
-        alert("hi")
         if (!topicField.value) {
             return false;
         }
         subscribe(topicField.value);
+        topicField.value = "";
         return false;
     };
 
+    // Disable Web UI if notifications of EventSource are not available
     if (!window["Notification"] || !window["EventSource"]) {
         errorField.innerHTML = "Your browser is not compatible to use the web-based desktop notifications.";
         topicField.disabled = true;
@@ -91,6 +109,17 @@ Topics:
         topicField.disabled = true;
         subscribeButton.disabled = true;
     }
+
+    // Reset UI
+    topicField.value = "";
+
+    // Restore topics
+    const storedTopics = localStorage.getItem('topics');
+    if (storedTopics) {
+        JSON.parse(storedTopics).forEach((topic) => {
+            subscribeInternal(topic, 0);
+        });
+    }
 </script>
 
 </body>
diff --git a/server/server.go b/server/server.go
index a78019c..f620114 100644
--- a/server/server.go
+++ b/server/server.go
@@ -6,7 +6,6 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
-	"github.com/gorilla/websocket"
 	"io"
 	"log"
 	"net/http"
@@ -18,11 +17,11 @@ import (
 
 type Server struct {
 	topics map[string]*topic
-	mu sync.Mutex
+	mu     sync.Mutex
 }
 
 type message struct {
-	Time int64 `json:"time"`
+	Time    int64  `json:"time"`
 	Message string `json:"message"`
 }
 
@@ -31,14 +30,9 @@ const (
 )
 
 var (
-	topicRegex    = regexp.MustCompile(`^/[^/]+$`)
-	jsonRegex    = regexp.MustCompile(`^/[^/]+/json$`)
-	sseRegex    = regexp.MustCompile(`^/[^/]+/sse$`)
-	wsRegex    = regexp.MustCompile(`^/[^/]+/ws$`)
-	wsUpgrader = websocket.Upgrader{
-		ReadBufferSize:  messageLimit,
-		WriteBufferSize: messageLimit,
-	}
+	topicRegex = regexp.MustCompile(`^/[^/]+$`)
+	jsonRegex  = regexp.MustCompile(`^/[^/]+/json$`)
+	sseRegex   = regexp.MustCompile(`^/[^/]+/sse$`)
 
 	//go:embed "index.html"
 	indexSource string
@@ -51,26 +45,32 @@ func New() *Server {
 }
 
 func (s *Server) Run() error {
-	go func() {
-		for {
-			time.Sleep(5 * time.Second)
-			s.mu.Lock()
-			log.Printf("topics: %d", len(s.topics))
-			for _, t := range s.topics {
-				t.mu.Lock()
-				log.Printf("- %s: %d subscriber(s), %d message(s) sent, last active = %s",
-					t.id, len(t.subscribers), t.messages, t.last.String())
-				t.mu.Unlock()
-			}
-			// TODO kill dead topics
-			s.mu.Unlock()
-		}
-	}()
+	go s.runMonitor()
+	return s.listenAndServe()
+}
+
+func (s *Server) listenAndServe() error {
 	log.Printf("Listening on :9997")
 	http.HandleFunc("/", s.handle)
 	return http.ListenAndServe(":9997", nil)
 }
 
+func (s *Server) runMonitor() {
+	for {
+		time.Sleep(5 * time.Second)
+		s.mu.Lock()
+		log.Printf("topics: %d", len(s.topics))
+		for _, t := range s.topics {
+			t.mu.Lock()
+			log.Printf("- %s: %d subscriber(s), %d message(s) sent, last active = %s",
+				t.id, len(t.subscribers), t.messages, t.last.String())
+			t.mu.Unlock()
+		}
+		// TODO kill dead topics
+		s.mu.Unlock()
+	}
+}
+
 func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
 	if err := s.handleInternal(w, r); err != nil {
 		w.WriteHeader(http.StatusInternalServerError)
@@ -81,8 +81,6 @@ func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
 func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error {
 	if r.Method == http.MethodGet && r.URL.Path == "/" {
 		return s.handleHome(w, r)
-	} else if r.Method == http.MethodGet && wsRegex.MatchString(r.URL.Path) {
-		return s.handleSubscribeWS(w, r)
 	} else if r.Method == http.MethodGet && jsonRegex.MatchString(r.URL.Path) {
 		return s.handleSubscribeJSON(w, r)
 	} else if r.Method == http.MethodGet && sseRegex.MatchString(r.URL.Path) {
@@ -118,7 +116,7 @@ func (s *Server) handlePublishHTTP(w http.ResponseWriter, r *http.Request) error
 
 func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request) error {
 	t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/json")) // Hack
-	subscriberID := t.Subscribe(func (msg *message) error {
+	subscriberID := t.Subscribe(func(msg *message) error {
 		if err := json.NewEncoder(w).Encode(&msg); err != nil {
 			return err
 		}
@@ -137,12 +135,12 @@ func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request) err
 
 func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request) error {
 	t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/sse")) // Hack
-	subscriberID := t.Subscribe(func (msg *message) error {
+	subscriberID := t.Subscribe(func(msg *message) error {
 		var buf bytes.Buffer
 		if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
 			return err
 		}
-		m := fmt.Sprintf("data: %s\n\n", buf.String())
+		m := fmt.Sprintf("data: %s\n", buf.String())
 		if _, err := io.WriteString(w, m); err != nil {
 			return err
 		}
@@ -154,6 +152,12 @@ func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request) erro
 	defer t.Unsubscribe(subscriberID)
 	w.Header().Set("Content-Type", "text/event-stream")
 	w.WriteHeader(http.StatusOK)
+	if _, err := io.WriteString(w, "event: open\n\n"); err != nil {
+		return err
+	}
+	if fl, ok := w.(http.Flusher); ok {
+		fl.Flush()
+	}
 	select {
 	case <-t.ctx.Done():
 	case <-r.Context().Done():
@@ -161,40 +165,6 @@ func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request) erro
 	return nil
 }
 
-func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request) error {
-	conn, err := wsUpgrader.Upgrade(w, r, nil)
-	if err != nil {
-		return err
-	}
-	t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/ws")) // Hack
-	t.Subscribe(func (msg *message) error {
-		var buf bytes.Buffer
-		if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
-			return err
-		}
-		defer conn.Close()
-		/*conn.SetWriteDeadline(time.Now().Add(writeWait))
-		if !ok {
-			// The hub closed the channel.
-			c.conn.WriteMessage(websocket.CloseMessage, []byte{})
-			return
-		}*/
-
-		w, err := conn.NextWriter(websocket.TextMessage)
-		if err != nil {
-			return err
-		}
-		if _, err := w.Write([]byte(msg.Message)); err != nil {
-			return err
-		}
-		if err := w.Close(); err != nil {
-			return err
-		}
-		return nil
-	})
-	return nil
-}
-
 func (s *Server) createTopic(id string) *topic {
 	s.mu.Lock()
 	defer s.mu.Unlock()
diff --git a/server/topic.go b/server/topic.go
index 65dd741..283b6da 100644
--- a/server/topic.go
+++ b/server/topic.go
@@ -12,11 +12,11 @@ import (
 type topic struct {
 	id          string
 	subscribers map[int]subscriber
-	messages int
+	messages    int
 	last        time.Time
-	ctx context.Context
-	cancel context.CancelFunc
-	mu sync.Mutex
+	ctx         context.Context
+	cancel      context.CancelFunc
+	mu          sync.Mutex
 }
 
 type subscriber func(msg *message) error
-- 
GitLab