From 0abd5e2767c83758d7305a3854428e715cf7af02 Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Mon, 18 Apr 2011 19:52:05 +0200 Subject: [PATCH] [broadway] Stream data over websocket The zlib compressed xmlhttprequest thing was a nice hack, but it doesn't really work in production. Its not portable, doesn't have enought API (missing notification for closed sockets) and having to synchronize between two different connections in a reliable way is a pain. So, we're going everything over the websocket. This is a pure switch, but after this we want to modify the protocol to work better over the uncompressed utf8 transport of websockets. --- gdk/broadway/broadway.c | 99 +++--------------------------- gdk/broadway/broadway.h | 3 +- gdk/broadway/broadway.js | 21 ++----- gdk/broadway/gdkdisplay-broadway.c | 43 +++++-------- 4 files changed, 34 insertions(+), 132 deletions(-) diff --git a/gdk/broadway/broadway.c b/gdk/broadway/broadway.c index 3917145bb..710cb3ae5 100644 --- a/gdk/broadway/broadway.c +++ b/gdk/broadway/broadway.c @@ -4,10 +4,6 @@ #include #include #include -#include -#include -#include -#include #include "broadway.h" @@ -450,126 +446,50 @@ to_png_a (int w, int h, int byte_stride, guint8 *data) ************************************************************************/ struct BroadwayOutput { - int fd; - gzFile *zfd; + GOutputStream *out; int error; guint32 serial; }; static void -broadway_output_write_raw (BroadwayOutput *output, - const void *buf, gsize count) +broadway_output_write_header (BroadwayOutput *output) { - gssize res; - int errsave; - const char *ptr = (const char *)buf; - - if (output->error) - return; - - while (count > 0) - { - res = write(output->fd, ptr, count); - if (res == -1) - { - errsave = errno; - if (errsave == EINTR) - continue; - output->error = TRUE; - return; - } - if (res == 0) - { - output->error = TRUE; - return; - } - count -= res; - ptr += res; - } + g_output_stream_write (output->out, "\0", 1, NULL, NULL); } static void broadway_output_write (BroadwayOutput *output, const void *buf, gsize count) { - gssize res; - const char *ptr = (const char *)buf; - - if (output->error) - return; - - while (count > 0) - { - res = gzwrite(output->zfd, ptr, count); - if (res == -1) - { - output->error = TRUE; - return; - } - if (res == 0) - { - output->error = TRUE; - return; - } - count -= res; - ptr += res; - } -} - -static void -broadway_output_write_header (BroadwayOutput *output) -{ - char *header; - - header = - "HTTP/1.1 200 OK\r\n" - "Content-type: multipart/x-mixed-replace;boundary=x\r\n" - "Content-Encoding: gzip\r\n" - "\r\n"; - broadway_output_write_raw (output, - header, strlen (header)); + g_output_stream_write_all (output->out, buf, count, NULL, NULL, NULL); } static void send_boundary (BroadwayOutput *output) { - char *boundary = - "--x\r\n" - "\r\n"; - - broadway_output_write (output, boundary, strlen (boundary)); + broadway_output_write (output, "\xff", 1); + broadway_output_write (output, "\0", 1); } BroadwayOutput * -broadway_output_new(int fd, guint32 serial) +broadway_output_new(GOutputStream *out, guint32 serial) { BroadwayOutput *output; - int flag = 1; output = g_new0 (BroadwayOutput, 1); - output->fd = fd; + output->out = g_object_ref (out); output->serial = serial; - setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); - broadway_output_write_header (output); - output->zfd = gzdopen(fd, "wb"); - - /* Need an initial multipart boundary */ - send_boundary (output); - return output; } void broadway_output_free (BroadwayOutput *output) { - if (output->zfd) - gzclose (output->zfd); - else - close (output->fd); + g_object_unref (output->out); free (output); } @@ -583,7 +503,6 @@ int broadway_output_flush (BroadwayOutput *output) { send_boundary (output); - gzflush (output->zfd, Z_SYNC_FLUSH); return !output->error; } diff --git a/gdk/broadway/broadway.h b/gdk/broadway/broadway.h index fce53a30a..12582654c 100644 --- a/gdk/broadway/broadway.h +++ b/gdk/broadway/broadway.h @@ -1,4 +1,5 @@ #include +#include typedef struct BroadwayOutput BroadwayOutput; @@ -7,7 +8,7 @@ typedef struct { int width, height; } BroadwayRect; -BroadwayOutput *broadway_output_new (int fd, +BroadwayOutput *broadway_output_new (GOutputStream *out, guint32 serial); void broadway_output_free (BroadwayOutput *output); int broadway_output_flush (BroadwayOutput *output); diff --git a/gdk/broadway/broadway.js b/gdk/broadway/broadway.js index 7870eeff6..ba5b2fa81 100644 --- a/gdk/broadway/broadway.js +++ b/gdk/broadway/broadway.js @@ -888,10 +888,10 @@ function handleOutstanding() } } -function handleLoad(event) +function handleMessage(message) { var cmdObj = {}; - cmdObj.data = event.target.responseText; + cmdObj.data = message; cmdObj.pos = 0; outstandingCommands.push(cmdObj); @@ -2768,22 +2768,10 @@ function connect() if (params[0].indexOf("toplevel") != -1) useToplevelWindows = true; } - var xhr = createXHR(); - if (xhr) { - if (typeof xhr.multipart == 'undefined') { - alert("Sorry, this example only works in browsers that support multipart."); - return; - } - - xhr.multipart = true; - xhr.open("GET", "/output", true); - xhr.onload = handleLoad; - xhr.send(null); - } if ("WebSocket" in window) { var loc = window.location.toString().replace("http:", "ws:"); - loc = loc.substr(0, loc.lastIndexOf('/')) + "/input"; + loc = loc.substr(0, loc.lastIndexOf('/')) + "/socket"; var ws = new WebSocket(loc, "broadway"); ws.onopen = function() { inputSocket = ws; @@ -2806,6 +2794,9 @@ function connect() ws.onclose = function() { inputSocket = null; }; + ws.onmessage = function(event) { + handleMessage(event.data); + }; } else { alert("WebSocket not supported, input will not work!"); } diff --git a/gdk/broadway/gdkdisplay-broadway.c b/gdk/broadway/gdkdisplay-broadway.c index cdab101ab..6a8ca210a 100644 --- a/gdk/broadway/gdkdisplay-broadway.c +++ b/gdk/broadway/gdkdisplay-broadway.c @@ -40,6 +40,10 @@ #include #include #include +#include +#include +#include +#include static void gdk_broadway_display_dispose (GObject *object); static void gdk_broadway_display_finalize (GObject *object); @@ -124,6 +128,8 @@ typedef struct HttpRequest { GString *request; } HttpRequest; +static void start_output (HttpRequest *request); + static void http_request_free (HttpRequest *request) { @@ -495,21 +501,6 @@ _gdk_broadway_display_block_for_input (GdkDisplay *display, char op, } } -#include -#include -static void -set_fd_blocking (int fd) -{ - glong arg; - - if ((arg = fcntl (fd, F_GETFL, NULL)) < 0) - arg = 0; - - arg = arg & ~O_NONBLOCK; - - fcntl (fd, F_SETFL, arg); -} - static char * parse_line (char *line, char *key) { @@ -657,7 +648,7 @@ start_input (HttpRequest *request) "Upgrade: WebSocket\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Origin: %s\r\n" - "Sec-WebSocket-Location: ws://%s/input\r\n" + "Sec-WebSocket-Location: ws://%s/socket\r\n" "Sec-WebSocket-Protocol: broadway\r\n" "\r\n", origin, host); @@ -679,6 +670,8 @@ start_input (HttpRequest *request) broadway_display->input = input; + start_output (request); + /* This will free and close the data input stream, but we got all the buffered content already */ http_request_free (request); @@ -699,14 +692,13 @@ start_output (HttpRequest *request) { GSocket *socket; GdkBroadwayDisplay *broadway_display; - int fd; + int flag = 1; socket = g_socket_connection_get_socket (request->connection); + setsockopt(g_socket_get_fd (socket), IPPROTO_TCP, + TCP_NODELAY, (char *) &flag, sizeof(int)); broadway_display = GDK_BROADWAY_DISPLAY (request->display); - fd = g_socket_get_fd (socket); - set_fd_blocking (fd); - /* We dup this because otherwise it'll be closed with the request SocketConnection */ if (broadway_display->output) { @@ -714,15 +706,16 @@ start_output (HttpRequest *request) broadway_output_free (broadway_display->output); } - broadway_display->output = broadway_output_new (dup(fd), broadway_display->saved_serial); + broadway_display->output = + broadway_output_new (g_io_stream_get_output_stream (G_IO_STREAM (request->connection)), + broadway_display->saved_serial); + _gdk_broadway_resync_windows (); if (broadway_display->pointer_grab_window) broadway_output_grab_pointer (broadway_display->output, GDK_WINDOW_IMPL_BROADWAY (broadway_display->pointer_grab_window->impl)->id, broadway_display->pointer_grab_owner_events); - - http_request_free (request); } static void @@ -787,9 +780,7 @@ got_request (HttpRequest *request) send_data (request, "text/html", client_html, G_N_ELEMENTS(client_html) - 1); else if (strcmp (escaped, "/broadway.js") == 0) send_data (request, "text/javascript", broadway_js, G_N_ELEMENTS(broadway_js) - 1); - else if (strcmp (escaped, "/output") == 0) - start_output (request); - else if (strcmp (escaped, "/input") == 0) + else if (strcmp (escaped, "/socket") == 0) start_input (request); else send_error (request, 404, "File not found"); -- 2.43.2