mod_xxxx/verse/bosh.lua

210 lines
6.1 KiB
Lua

local new_xmpp_stream = require "util.xmppstream".new;
local st = require "util.stanza";
require "net.httpclient_listener"; -- Required for net.http to work
local http = require "net.http";
local stream_mt = setmetatable({}, { __index = verse.stream_mt });
stream_mt.__index = stream_mt;
local xmlns_stream = "http://etherx.jabber.org/streams";
local xmlns_bosh = "http://jabber.org/protocol/httpbind";
local reconnect_timeout = 5;
function verse.new_bosh(logger, url)
local stream = {
bosh_conn_pool = {};
bosh_waiting_requests = {};
bosh_rid = math.random(1,999999);
bosh_outgoing_buffer = {};
bosh_url = url;
conn = {};
};
function stream:reopen()
self.bosh_need_restart = true;
self:flush();
end
local conn = verse.new(logger, stream);
return setmetatable(conn, stream_mt);
end
function stream_mt:connect()
self:_send_session_request();
end
function stream_mt:send(data)
self:debug("Putting into BOSH send buffer: %s", tostring(data));
self.bosh_outgoing_buffer[#self.bosh_outgoing_buffer+1] = st.clone(data);
self:flush(); --TODO: Optimize by doing this on next tick (give a chance for data to buffer)
end
function stream_mt:flush()
if self.connected
and #self.bosh_waiting_requests < self.bosh_max_requests
and (#self.bosh_waiting_requests == 0
or #self.bosh_outgoing_buffer > 0
or self.bosh_need_restart) then
self:debug("Flushing...");
local payload = self:_make_body();
local buffer = self.bosh_outgoing_buffer;
for i, stanza in ipairs(buffer) do
payload:add_child(stanza);
buffer[i] = nil;
end
self:_make_request(payload);
else
self:debug("Decided not to flush.");
end
end
function stream_mt:_make_request(payload)
local request, err = http.request(self.bosh_url, { body = tostring(payload) }, function (response, code, request)
if code ~= 0 then
self.inactive_since = nil;
return self:_handle_response(response, code, request);
end
-- Connection issues, we need to retry this request
local time = os.time();
if not self.inactive_since then
self.inactive_since = time; -- So we know when it is time to give up
elseif time - self.inactive_since > self.bosh_max_inactivity then
return self:_disconnected();
else
self:debug("%d seconds left to reconnect, retrying in %d seconds...",
self.bosh_max_inactivity - (time - self.inactive_since), reconnect_timeout);
end
-- Set up reconnect timer
timer.add_task(reconnect_timeout, function ()
self:debug("Retrying request...");
-- Remove old request
for i, waiting_request in ipairs(self.bosh_waiting_requests) do
if waiting_request == request then
table.remove(self.bosh_waiting_requests, i);
break;
end
end
self:_make_request(payload);
end);
end);
if request then
table.insert(self.bosh_waiting_requests, request);
else
self:warn("Request failed instantly: %s", err);
end
end
function stream_mt:_disconnected()
self.connected = nil;
self:event("disconnected");
end
function stream_mt:_send_session_request()
local body = self:_make_body();
-- XEP-0124
body.attr.hold = "1";
body.attr.wait = "60";
body.attr["xml:lang"] = "en";
body.attr.ver = "1.6";
-- XEP-0206
body.attr.from = self.jid;
body.attr.to = self.host;
body.attr.secure = 'true';
http.request(self.bosh_url, { body = tostring(body) }, function (response, code)
if code == 0 then
-- Failed to connect
return self:_disconnected();
end
-- Handle session creation response
local payload = self:_parse_response(response)
if not payload then
self:warn("Invalid session creation response");
self:_disconnected();
return;
end
self.bosh_sid = payload.attr.sid; -- Session id
self.bosh_wait = tonumber(payload.attr.wait); -- How long the server may hold connections for
self.bosh_hold = tonumber(payload.attr.hold); -- How many connections the server may hold
self.bosh_max_inactivity = tonumber(payload.attr.inactivity); -- Max amount of time with no connections
self.bosh_max_requests = tonumber(payload.attr.requests) or self.bosh_hold; -- Max simultaneous requests we can make
self.connected = true;
self:event("connected");
self:_handle_response_payload(payload);
end);
end
function stream_mt:_handle_response(response, code, request)
if self.bosh_waiting_requests[1] ~= request then
self:warn("Server replied to request that wasn't the oldest");
for i, waiting_request in ipairs(self.bosh_waiting_requests) do
if waiting_request == request then
self.bosh_waiting_requests[i] = nil;
break;
end
end
else
table.remove(self.bosh_waiting_requests, 1);
end
local payload = self:_parse_response(response);
if payload then
self:_handle_response_payload(payload);
end
self:flush();
end
function stream_mt:_handle_response_payload(payload)
local stanzas = payload.tags;
for i = 1, #stanzas do
local stanza = stanzas[i];
if stanza.attr.xmlns == xmlns_stream then
self:event("stream-"..stanza.name, stanza);
elseif stanza.attr.xmlns then
self:event("stream/"..stanza.attr.xmlns, stanza);
else
self:event("stanza", stanza);
end
end
if payload.attr.type == "terminate" then
self:_disconnected({reason = payload.attr.condition});
end
end
local stream_callbacks = {
stream_ns = "http://jabber.org/protocol/httpbind", stream_tag = "body",
default_ns = "jabber:client",
streamopened = function (session, attr) session.notopen = nil; session.payload = verse.stanza("body", attr); return true; end;
handlestanza = function (session, stanza) session.payload:add_child(stanza); end;
};
function stream_mt:_parse_response(response)
self:debug("Parsing response: %s", response);
if response == nil then
self:debug("%s", debug.traceback());
self:_disconnected();
return;
end
local session = { notopen = true, stream = self };
local stream = new_xmpp_stream(session, stream_callbacks);
stream:feed(response);
return session.payload;
end
function stream_mt:_make_body()
self.bosh_rid = self.bosh_rid + 1;
local body = verse.stanza("body", {
xmlns = xmlns_bosh;
content = "text/xml; charset=utf-8";
sid = self.bosh_sid;
rid = self.bosh_rid;
});
if self.bosh_need_restart then
self.bosh_need_restart = nil;
body.attr.restart = 'true';
end
return body;
end