]> Pileus Git - ~andy/gtk/commitdiff
[broadway] Stream data over websocket
authorAlexander Larsson <alexl@redhat.com>
Mon, 18 Apr 2011 17:52:05 +0000 (19:52 +0200)
committerAlexander Larsson <alexl@redhat.com>
Mon, 18 Apr 2011 18:51:53 +0000 (20:51 +0200)
The zlib compressed xmlhttprequest thing was a nice hack, but it doesn't
really work in production. Its not portable, doesn't have enought API
(missing notification for closed sockets) and having to synchronize
between two different connections in a reliable way is a pain.

So, we're going everything over the websocket. This is a pure switch,
but after this we want to modify the protocol to work better over
the uncompressed utf8 transport of websockets.

gdk/broadway/broadway.c
gdk/broadway/broadway.h
gdk/broadway/broadway.js
gdk/broadway/gdkdisplay-broadway.c

index 3917145bbcbf338fb97c51244dada8315dad66a0..710cb3ae5133f2b4683ce18f850a3879b8ae4713 100644 (file)
@@ -4,10 +4,6 @@
 #include <assert.h>
 #include <errno.h>
 #include <zlib.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
 
 #include "broadway.h"
 
@@ -450,126 +446,50 @@ to_png_a (int w, int h, int byte_stride, guint8 *data)
  ************************************************************************/
 
 struct BroadwayOutput {
-  int fd;
-  gzFile *zfd;
+  GOutputStream *out;
   int error;
   guint32 serial;
 };
 
 static void
-broadway_output_write_raw (BroadwayOutput *output,
-                          const void *buf, gsize count)
+broadway_output_write_header (BroadwayOutput *output)
 {
-  gssize res;
-  int errsave;
-  const char *ptr = (const char *)buf;
-
-  if (output->error)
-    return;
-
-  while (count > 0)
-    {
-      res = write(output->fd, ptr, count);
-      if (res == -1)
-       {
-         errsave = errno;
-         if (errsave == EINTR)
-           continue;
-         output->error = TRUE;
-         return;
-       }
-      if (res == 0)
-       {
-         output->error = TRUE;
-         return;
-       }
-      count -= res;
-      ptr += res;
-    }
+  g_output_stream_write (output->out, "\0", 1, NULL, NULL);
 }
 
 static void
 broadway_output_write (BroadwayOutput *output,
                       const void *buf, gsize count)
 {
-  gssize res;
-  const char *ptr = (const char *)buf;
-
-  if (output->error)
-    return;
-
-  while (count > 0)
-    {
-      res = gzwrite(output->zfd, ptr, count);
-      if (res == -1)
-       {
-         output->error = TRUE;
-         return;
-       }
-      if (res == 0)
-       {
-         output->error = TRUE;
-         return;
-       }
-      count -= res;
-      ptr += res;
-    }
-}
-
-static void
-broadway_output_write_header (BroadwayOutput *output)
-{
-  char *header;
-
-  header =
-    "HTTP/1.1 200 OK\r\n"
-    "Content-type: multipart/x-mixed-replace;boundary=x\r\n"
-    "Content-Encoding: gzip\r\n"
-    "\r\n";
-  broadway_output_write_raw (output,
-                            header, strlen (header));
+  g_output_stream_write_all (output->out, buf, count, NULL, NULL, NULL);
 }
 
 static void
 send_boundary (BroadwayOutput *output)
 {
-  char *boundary =
-    "--x\r\n"
-    "\r\n";
-
-  broadway_output_write (output, boundary, strlen (boundary));
+  broadway_output_write (output, "\xff", 1);
+  broadway_output_write (output, "\0", 1);
 }
 
 BroadwayOutput *
-broadway_output_new(int fd, guint32 serial)
+broadway_output_new(GOutputStream *out, guint32 serial)
 {
   BroadwayOutput *output;
-  int flag = 1;
 
   output = g_new0 (BroadwayOutput, 1);
 
-  output->fd = fd;
+  output->out = g_object_ref (out);
   output->serial = serial;
 
-  setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
-
   broadway_output_write_header (output);
 
-  output->zfd = gzdopen(fd, "wb");
-
-  /* Need an initial multipart boundary */
-  send_boundary (output);
-
   return output;
 }
 
 void
 broadway_output_free (BroadwayOutput *output)
 {
-  if (output->zfd)
-    gzclose (output->zfd);
-  else
-    close (output->fd);
+  g_object_unref (output->out);
   free (output);
 }
 
@@ -583,7 +503,6 @@ int
 broadway_output_flush (BroadwayOutput *output)
 {
   send_boundary (output);
-  gzflush (output->zfd, Z_SYNC_FLUSH);
   return !output->error;
 }
 
index fce53a30a6a9eeaf317e9047c46e70e88cba7180..12582654c45c73c10a341fc9b0203e2edc7c77b2 100644 (file)
@@ -1,4 +1,5 @@
 #include <glib.h>
+#include <gio/gio.h>
 
 typedef struct BroadwayOutput BroadwayOutput;
 
@@ -7,7 +8,7 @@ typedef struct  {
     int width, height;
 } BroadwayRect;
 
-BroadwayOutput *broadway_output_new             (int             fd,
+BroadwayOutput *broadway_output_new             (GOutputStream  *out,
                                                 guint32         serial);
 void            broadway_output_free            (BroadwayOutput *output);
 int             broadway_output_flush           (BroadwayOutput *output);
index 7870eeff6fb62e3b76949e208bc41033e17241bb..ba5b2fa810241e32b9be0e038f3c597d5d9fe3a0 100644 (file)
@@ -888,10 +888,10 @@ function handleOutstanding()
     }
 }
 
-function handleLoad(event)
+function handleMessage(message)
 {
     var cmdObj = {};
-    cmdObj.data = event.target.responseText;
+    cmdObj.data = message;
     cmdObj.pos = 0;
 
     outstandingCommands.push(cmdObj);
@@ -2768,22 +2768,10 @@ function connect()
        if (params[0].indexOf("toplevel") != -1)
            useToplevelWindows = true;
     }
-    var xhr = createXHR();
-    if (xhr) {
-       if (typeof xhr.multipart == 'undefined') {
-           alert("Sorry, this example only works in browsers that support multipart.");
-           return;
-       }
-
-       xhr.multipart = true;
-       xhr.open("GET", "/output", true);
-       xhr.onload = handleLoad;
-       xhr.send(null);
-    }
 
     if ("WebSocket" in window) {
        var loc = window.location.toString().replace("http:", "ws:");
-       loc = loc.substr(0, loc.lastIndexOf('/')) + "/input";
+       loc = loc.substr(0, loc.lastIndexOf('/')) + "/socket";
        var ws = new WebSocket(loc, "broadway");
        ws.onopen = function() {
            inputSocket = ws;
@@ -2806,6 +2794,9 @@ function connect()
        ws.onclose = function() {
            inputSocket = null;
        };
+       ws.onmessage = function(event) {
+           handleMessage(event.data);
+       };
     } else {
        alert("WebSocket not supported, input will not work!");
     }
index cdab101ab5136eb88cfc71e6d4baeb885c024308..6a8ca210a3ecb24d04ad8fe982091c371a7ece9e 100644 (file)
 #include <string.h>
 #include <errno.h>
 #include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
 
 static void   gdk_broadway_display_dispose            (GObject            *object);
 static void   gdk_broadway_display_finalize           (GObject            *object);
@@ -124,6 +128,8 @@ typedef struct HttpRequest {
   GString *request;
 }  HttpRequest;
 
+static void start_output (HttpRequest *request);
+
 static void
 http_request_free (HttpRequest *request)
 {
@@ -495,21 +501,6 @@ _gdk_broadway_display_block_for_input (GdkDisplay *display, char op,
   }
 }
 
-#include <unistd.h>
-#include <fcntl.h>
-static void
-set_fd_blocking (int fd)
-{
-  glong arg;
-
-  if ((arg = fcntl (fd, F_GETFL, NULL)) < 0)
-    arg = 0;
-
-  arg = arg & ~O_NONBLOCK;
-
-  fcntl (fd, F_SETFL, arg);
-}
-
 static char *
 parse_line (char *line, char *key)
 {
@@ -657,7 +648,7 @@ start_input (HttpRequest *request)
                         "Upgrade: WebSocket\r\n"
                         "Connection: Upgrade\r\n"
                         "Sec-WebSocket-Origin: %s\r\n"
-                        "Sec-WebSocket-Location: ws://%s/input\r\n"
+                        "Sec-WebSocket-Location: ws://%s/socket\r\n"
                         "Sec-WebSocket-Protocol: broadway\r\n"
                         "\r\n",
                         origin, host);
@@ -679,6 +670,8 @@ start_input (HttpRequest *request)
 
   broadway_display->input = input;
 
+  start_output (request);
+
   /* This will free and close the data input stream, but we got all the buffered content already */
   http_request_free (request);
 
@@ -699,14 +692,13 @@ start_output (HttpRequest *request)
 {
   GSocket *socket;
   GdkBroadwayDisplay *broadway_display;
-  int fd;
+  int flag = 1;
 
   socket = g_socket_connection_get_socket (request->connection);
+  setsockopt(g_socket_get_fd (socket), IPPROTO_TCP,
+            TCP_NODELAY, (char *) &flag, sizeof(int));
 
   broadway_display = GDK_BROADWAY_DISPLAY (request->display);
-  fd = g_socket_get_fd (socket);
-  set_fd_blocking (fd);
-  /* We dup this because otherwise it'll be closed with the request SocketConnection */
 
   if (broadway_display->output)
     {
@@ -714,15 +706,16 @@ start_output (HttpRequest *request)
       broadway_output_free (broadway_display->output);
     }
 
-  broadway_display->output = broadway_output_new (dup(fd), broadway_display->saved_serial);
+  broadway_display->output =
+    broadway_output_new (g_io_stream_get_output_stream (G_IO_STREAM (request->connection)),
+                        broadway_display->saved_serial);
+
   _gdk_broadway_resync_windows ();
 
   if (broadway_display->pointer_grab_window)
     broadway_output_grab_pointer (broadway_display->output,
                                  GDK_WINDOW_IMPL_BROADWAY (broadway_display->pointer_grab_window->impl)->id,
                                  broadway_display->pointer_grab_owner_events);
-
-  http_request_free (request);
 }
 
 static void
@@ -787,9 +780,7 @@ got_request (HttpRequest *request)
     send_data (request, "text/html", client_html, G_N_ELEMENTS(client_html) - 1);
   else if (strcmp (escaped, "/broadway.js") == 0)
     send_data (request, "text/javascript", broadway_js, G_N_ELEMENTS(broadway_js) - 1);
-  else if (strcmp (escaped, "/output") == 0)
-    start_output (request);
-  else if (strcmp (escaped, "/input") == 0)
+  else if (strcmp (escaped, "/socket") == 0)
     start_input (request);
   else
     send_error (request, 404, "File not found");