/** * MIT License (MIT) * * Copyright (c) <2014> <Rapp Project EU> * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. * * * Authors: Konstantinos Panayiotou * Contact: klpanagi@gmail.com */ /** * @file * @description Server Core implementation. Register workers/services, * handle communication between web services and worker threads, etc. * * @author Konstantinos Panayiotou <klpanagi@gmail.com> * @copyright Rapp Project EU 2015 */ var path = require('path'); var hop = require('hop'); var util = require('util'); const ENV = require(path.join(__dirname, '../', 'env.js')); const PKG_DIR = ENV.PATHS.PKG_DIR; const INCLUDE_DIR = ENV.PATHS.INCLUDE_DIR; var Rsg = require (path.join(INCLUDE_DIR, 'common', 'randStringGen.js')); var Fs = require(path.join(INCLUDE_DIR, 'common', 'fileUtils.js')); /** * Server Core implementation. Register workers/services, handle communication * between web services and worker threads, etc. * * @class ServerCore * * @param {Object} args - Arguments. * @param {Object} args.logger - Pass a logger to the Prototype constructor. */ function ServerCore(opts) { opts = opts || {}; this.logger = opts.logger || console; this.services = {}; this.workers = {}; this.workers.main_thread = { services: [] }; this.randStrLength = parseInt(opts.rand_str_lenth) || 5; this.strGen = new Rsg(this.randStrLength); /** Load server parameters. * ------------------------ */ this.serverParams = { hostname: hop.hostname, port: hop.port, protocol: (require(hop.config).HTTPSPort) ? "https" : "http" }; } /** * Add a custom logger for the Server core to use for logging * * @function applyLogger * @memberOf ServerCore */ ServerCore.prototype.applyLogger = function(logger) { this.logger = logger || console; }; /** * Register a Web Service. * * @function registerService * @memberOf ServerCore * * @param {string} svcName - The name of the web service * @param {Object} frame - Hop Web service frame. * @param {string} path - The Web Service relative url path * e.g. /face_detection/detect_faces * @param {string} wName - The name of the worker to register this web service */ ServerCore.prototype.registerService = function(svcName, frame, path, wName) { if (!(svcName && frame && path && wName)) { this.logger.error("Service registration failed."); return false; } var svcObj = { worker: wName, path: path, url: this.serviceUrl(path), frame: frame }; if( this.workerExists(wName) && (! this.serviceExists(svcName)) ) { // Append to worker services array this.workers[ wName ].services.push(svcName); // Append service object to services array. this.services[ svcName ] = svcObj; this.logger.log( util.format("Registered worker service {%s} under worker thread {%s}", this.services[ svcName ].url, wName) ); this.logger.log(svcObj); return true; } else { this.logger.error( util.format("{%s} service registration failed. Either non existed " + " worker {%s} , or service with same name already registered", svcName, wName) ); return false; } }; /** * Parse worker messages. * * @function parseWorkerMsg * @memberOf ServerCore * * @param {Object} msg - Communication message * @param {Object} msg.data - Message data. * @param {String} msg.data.svc_name - Caller service name. * @param {String} msg.data.worker_name - Caller worker name. * @param {String} msg.data.svc_path - Caller service path (url). * @param {Function} msg.data.svc_frame - Caller service frame. */ ServerCore.prototype.parseWorkerMsg = function(msg){ var wName = msg.data.worker_name || ""; var request = msg.data.request || ""; var response = {}; if( (! wName) || (! this.workerExists(wName)) ){ this.logger.error("Received message from non registered worker"); } switch (request) { case "svc_registration": // Request to register service let svcName = msg.data.svc_name || ''; let svcFrame = msg.data.svc_frame || undefined; let svcPath = msg.data.svc_path || ''; this.registerService(svcName, svcFrame, svcPath, wName); break; case "get_svc_url": let svcName = msg.data.svc_name || ""; if ( ! ( svcName && serviceExists(svcName) ) ) { response.error = util.format( "Service %s does not exist in registed services", svcName); response.svc_name = svcName; response.svc_url = ''; } else{ response.error = ''; response.svc_url = this.getSvcUrl(svcName); } this.workers[wName].postMessage(response); break; case "active_services": let activeServices = []; for (var k in this.services) { let obj = { name: k, url: this.services[k].url }; activeServices.push(obj); } this.workers[wName].postMessage(activeServices); break; default: break; } }; /** * Send message to worker thread. * * @function callWorker * @memberOf ServerCore * * @param {String} wName - Worker's name to call. * @param {Object} msg - Message object. */ ServerCore.prototype.callWorker = function( wName, msg ){ if (this.workerExists(wName)) { this.workers[ wName ].postMessage(msg); } else { this.logger.log( util.format("Attempt to call not existed worder service %s", wName) ); } }; /** * Kill/Terminate/Close worker service. * * @function killWorker * @memberOf ServerCore * * @param {String} wName - Worker name. */ ServerCore.prototype.killWorker = function( wName ){ if ( this.workerExists(wName) ){ this.workers[ wName ].terminate(); this.logger.warn("Terminated worker: %s", wName); } }; /** * Creates and instantiates a worker thread. * * @function createWorker * @memberOf ServerCore * * @param {String} wName - Worker name. * @param {String} wFile - JS file to feed to worker thread. */ ServerCore.prototype.createWorker = function(wName, wFile) { wName = wName || ''; wFile = wFile || ''; // Check if worker with given name already exists in list of registered // workers. if (this.workerExists(wName)) { this.logger.error(util.format("Worker with name {%s} " + "exists in registered workers", wName)); return false; } var this_ = this; try { this.workers[ wName ] = new Worker ( wFile ); // Each worker holds his own Array of registered services. this.workers[ wName ].services = []; // Register 'this' worker onmessage callback this.workers[ wName ].onmessage = function(msg){ this_.parseWorkerMsg(msg); }; // Register worker onexit event callback. this.workers[ wName ].onexit = function(){ this_.logger.error(util.format("Worker {%s} terminated")); }; // Register worker onerror event callback. this.workers[ wName ].onerror = function( e ){ this_.logger.warn(util.format("Worker send error message -> %s", e)); }; } catch(e){ this.logger.error(util.format("Failed to launch worker {%s}: %s", wName, e)); return false; } return true; }; /** * Register worker service. * A worker can handle multiple services; * * @function registerWorker * @memberOf ServerCore * * @param {Object} worker - Service information. * @param {String} worker.file - Path to worker file. * @param {String} worker.name - Service name. */ ServerCore.prototype.registerWorker = function(worker) { worker = worker || {}; /** Worker registration requires the following properties: * name - worker.name. * file - worker.file. */ if( (! worker) || (! worker.name) ){ this.logger.error("Could not register worker. " + "Not a worker name was provided"); return false; } if (! worker.file) { this.logger.error("Worker registration failed. Empty worker.file property"); return false; } // Check if worker with given name already exists in list of registered // workers. if (this.workerExists(worker.name)) { this.logger.error(util.format("Worker named {%s} already registered"), worker.name); return false; } // Instantiate a new worker thread. this.createWorker(worker.name, worker.file); }; /** * Create web service full url path by given service path. * * @function serviceUrl * @memberOf ServerCore * * @param {String} path - Service's path (e.g. /hop/face_detection). */ ServerCore.prototype.serviceUrl = function(path) { return util.format('%s://%s:%s%s', this.serverParams.protocol, this.serverParams.hostname, this.serverParams.port, path); }; /** * Set the url path of given web service.. * * @function setSvcUrl * @memberOf ServerCore * * @param {String} svcName - Service name. */ ServerCore.prototype.setSvcUrl = function(svcName, path) { path = path || ''; svcName = svcName || ''; if (! path) { this.logger.error("Non service path provided!"); return false; } if (! svcName) { this.logger.error("Non service name provided!"); return false; } if( ! serviceExists(svcName) ){ this.logger.error(util.format("Cannot set %s service url path. " + "Service is not registerd!", svcName) ); return false; } this.services[svcName].url = this.serviceUrl(path); this.services[svcName].path = path; return true; }; /** * Get the url path of given web service.. * * @function getSvcUrl * @memberOf ServerCore * * @param {String} svcName - Service name. */ ServerCore.prototype.getSvcUrl = function(svcName) { svcName = svcName || ''; if (! svcName) { this.logger.error("Non service name provided!"); return ''; } if (! serviceExists(svcName)) { this.logger.error(util.format("Cannot get %s service url path. " + "Service is not registerd!", svcName) ); return ''; } return this.services[ svcName ].url; }; /** * Returns true if a service has been registered. * * @function serviceExists * @memberOf ServerCore * * @param {String} svcName - Service name. */ ServerCore.prototype.serviceExists = function(svcName) { if (this.services[svcName]) { return true; } else { return false; } }; /** * Returns true if a worker has been registered. * * @function workerExists * @memberOf ServerCore * * @param {String} wName - Worker name. */ ServerCore.prototype.workerExists = function(wName) { if (this.workers[wName]) { return true; } return false; }; var serverCore = new ServerCore(); // Export the se r. module.exports = serverCore;