mod_xxxx/verse/init.lua

251 lines
6.0 KiB
Lua

-- Use LuaRocks if available
pcall(require, "luarocks.require");
local socket = require"socket";
-- Load LuaSec if available
pcall(require, "ssl");
local server = require "net.server";
local events = require "util.events";
local logger = require "util.logger";
module("verse", package.seeall);
local verse = _M;
_M.server = server;
local stream = {};
stream.__index = stream;
stream_mt = stream;
verse.plugins = {};
function verse.init(...)
for i=1,select("#", ...) do
local ok = pcall(require, "verse."..select(i,...));
if not ok then
error("Verse connection module not found: verse."..select(i,...));
end
end
return verse;
end
local max_id = 0;
function verse.new(logger, base)
local t = setmetatable(base or {}, stream);
max_id = max_id + 1;
t.id = tostring(max_id);
t.logger = logger or verse.new_logger("stream"..t.id);
t.events = events.new();
t.plugins = {};
t.verse = verse;
return t;
end
verse.add_task = require "util.timer".add_task;
verse.logger = logger.init; -- COMPAT: Deprecated
verse.new_logger = logger.init;
verse.log = verse.logger("verse");
local function format(format, ...)
local n, arg, maxn = 0, { ... }, select('#', ...);
return (format:gsub("%%(.)", function (c) if n <= maxn then n = n + 1; return tostring(arg[n]); end end));
end
function verse.set_log_handler(log_handler, levels)
levels = levels or { "debug", "info", "warn", "error" };
logger.reset();
if io.type(log_handler) == "file" then
local f = log_handler;
function log_handler(name, level, message)
f:write(name, "\t", level, "\t", message, "\n");
end
end
if log_handler then
local function _log_handler(name, level, message, ...)
return log_handler(name, level, format(message, ...));
end
for i, level in ipairs(levels) do
logger.add_level_sink(level, _log_handler);
end
end
end
function _default_log_handler(name, level, message)
return io.stderr:write(name, "\t", level, "\t", message, "\n");
end
verse.set_log_handler(_default_log_handler, { "error" });
local function error_handler(err)
verse.log("error", "Error: %s", err);
verse.log("error", "Traceback: %s", debug.traceback());
end
function verse.set_error_handler(new_error_handler)
error_handler = new_error_handler;
end
function verse.loop()
return xpcall(server.loop, error_handler);
end
function verse.step()
return xpcall(server.step, error_handler);
end
function verse.quit()
return server.setquitting(true);
end
function stream:listen(host, port)
host = host or "localhost";
port = port or 0;
local conn, err = server.addserver(host, port, new_listener(self, "server"), "*a");
if conn then
self:debug("Bound to %s:%s", host, port);
self.server = conn;
end
return conn, err;
end
function stream:connect(connect_host, connect_port)
connect_host = connect_host or "localhost";
connect_port = tonumber(connect_port) or 5222;
-- Create and initiate connection
local conn = socket.tcp()
conn:settimeout(0);
local success, err = conn:connect(connect_host, connect_port);
if not success and err ~= "timeout" then
self:warn("connect() to %s:%d failed: %s", connect_host, connect_port, err);
return self:event("disconnected", { reason = err }) or false, err;
end
local conn = server.wrapclient(conn, connect_host, connect_port, new_listener(self), "*a");
if not conn then
self:warn("connection initialisation failed: %s", err);
return self:event("disconnected", { reason = err }) or false, err;
end
self:set_conn(conn);
return true;
end
function stream:set_conn(conn)
self.conn = conn;
self.send = function (stream, data)
self:event("outgoing", data);
data = tostring(data);
self:event("outgoing-raw", data);
return conn:write(data);
end;
end
function stream:close(reason)
if not self.conn then
verse.log("error", "Attempt to close disconnected connection - possibly a bug");
return;
end
local on_disconnect = self.conn.disconnect();
self.conn:close();
on_disconnect(self.conn, reason);
end
-- Logging functions
function stream:debug(...)
return self.logger("debug", ...);
end
function stream:info(...)
return self.logger("info", ...);
end
function stream:warn(...)
return self.logger("warn", ...);
end
function stream:error(...)
return self.logger("error", ...);
end
-- Event handling
function stream:event(name, ...)
self:debug("Firing event: "..tostring(name));
return self.events.fire_event(name, ...);
end
function stream:hook(name, ...)
return self.events.add_handler(name, ...);
end
function stream:unhook(name, handler)
return self.events.remove_handler(name, handler);
end
function verse.eventable(object)
object.events = events.new();
object.hook, object.unhook = stream.hook, stream.unhook;
local fire_event = object.events.fire_event;
function object:event(name, ...)
return fire_event(name, ...);
end
return object;
end
function stream:add_plugin(name)
if self.plugins[name] then return true; end
if require("verse.plugins."..name) then
local ok, err = verse.plugins[name](self);
if ok ~= false then
self:debug("Loaded %s plugin", name);
self.plugins[name] = true;
else
self:warn("Failed to load %s plugin: %s", name, err);
end
end
return self;
end
-- Listener factory
function new_listener(stream)
local conn_listener = {};
function conn_listener.onconnect(conn)
if stream.server then
local client = verse.new();
conn:setlistener(new_listener(client));
client:set_conn(conn);
stream:event("connected", { client = client });
else
stream.connected = true;
stream:event("connected");
end
end
function conn_listener.onincoming(conn, data)
stream:event("incoming-raw", data);
end
function conn_listener.ondisconnect(conn, err)
if conn ~= stream.conn then return end
stream.connected = false;
stream:event("disconnected", { reason = err });
end
function conn_listener.ondrain(conn)
stream:event("drained");
end
function conn_listener.onstatus(conn, new_status)
stream:event("status", new_status);
end
return conn_listener;
end
return verse;