Source: serverCore.js

/**
 *  MIT License (MIT)
 *
 *  Copyright (c) <2014> <Rapp Project EU>
 *
 *  Permission is hereby granted, free of charge, to any person obtaining a copy
 *  of this software and associated documentation files (the "Software"), to deal
 *  in the Software without restriction, including without limitation the rights
 *  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 *  copies of the Software, and to permit persons to whom the Software is
 *  furnished to do so, subject to the following conditions:
 *
 *  The above copyright notice and this permission notice shall be included in
 *  all copies or substantial portions of the Software.
 *
 *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 *  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 *  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 *  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 *  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 *  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 *  THE SOFTWARE.
 *
 *
 *  Authors: Konstantinos Panayiotou
 *  Contact: klpanagi@gmail.com
 */

/**
 * @file
 * @description Server Core implementation. Register workers/services,
 * handle communication between web services and worker threads, etc.
 *
 * @author Konstantinos Panayiotou <klpanagi@gmail.com>
 * @copyright Rapp Project EU 2015
 */



var path = require('path');
var hop = require('hop');
var util = require('util');

const ENV = require(path.join(__dirname, '../', 'env.js'));
const PKG_DIR = ENV.PATHS.PKG_DIR;
const INCLUDE_DIR = ENV.PATHS.INCLUDE_DIR;

var Rsg = require (path.join(INCLUDE_DIR, 'common', 'randStringGen.js'));
var Fs = require(path.join(INCLUDE_DIR, 'common', 'fileUtils.js'));


/**
 *  Server Core implementation. Register workers/services, handle communication
 *  between web services and worker threads, etc.
 *
 * @class ServerCore
 *
 * @param {Object} args - Arguments.
 * @param {Object} args.logger - Pass a logger to the Prototype constructor.
 */
function ServerCore(opts) {
  opts = opts || {};
  this.logger = opts.logger || console;

  this.services = {};
  this.workers = {};
  this.workers.main_thread = {
    services: []
  };
  this.randStrLength = parseInt(opts.rand_str_lenth) || 5;
  this.strGen = new Rsg(this.randStrLength);

  /** Load server parameters.
   * ------------------------ */
  this.serverParams = {
    hostname: hop.hostname,
    port: hop.port,
    protocol: (require(hop.config).HTTPSPort) ? "https" : "http"
  };
}


/**
 * Add a custom logger for the Server core to use for logging
 *
 * @function applyLogger
 * @memberOf ServerCore
 */
ServerCore.prototype.applyLogger = function(logger) {
  this.logger = logger || console;
};


/**
 * Register a Web Service.
 *
 * @function registerService
 * @memberOf ServerCore
 *
 * @param {string} svcName - The name of the web service
 * @param {Object} frame - Hop Web service frame.
 * @param {string} path - The Web Service relative url path
 * e.g. /face_detection/detect_faces
 * @param {string} wName - The name of the worker to register this web service
 */
ServerCore.prototype.registerService = function(svcName, frame, path, wName) {
  if (!(svcName && frame && path && wName)) {
    this.logger.error("Service registration failed.");
    return false;
  }

  var svcObj = {
    worker: wName,
    path: path,
    url: this.serviceUrl(path),
    frame: frame
  };

  if( this.workerExists(wName) && (! this.serviceExists(svcName)) ) {
    // Append to worker services array
    this.workers[ wName ].services.push(svcName);
    // Append service object to services array.
    this.services[ svcName ] = svcObj;
    this.logger.log(
      util.format("Registered worker service {%s} under worker thread {%s}",
        this.services[ svcName ].url, wName)
    );
    this.logger.log(svcObj);
    return true;
  } else {
    this.logger.error(
      util.format("{%s} service registration failed. Either non existed " +
        " worker {%s} , or service with same name already registered", svcName,
        wName)
    );
    return false;
  }
};


/**
 * Parse worker messages.
 *
 * @function parseWorkerMsg
 * @memberOf ServerCore
 *
 * @param {Object} msg - Communication message
 * @param {Object} msg.data - Message data.
 * @param {String} msg.data.svc_name - Caller service name.
 * @param {String} msg.data.worker_name - Caller worker name.
 * @param {String} msg.data.svc_path - Caller service path (url).
 * @param {Function} msg.data.svc_frame - Caller service frame.
 */
ServerCore.prototype.parseWorkerMsg = function(msg){
  var wName = msg.data.worker_name || "";
  var request = msg.data.request || "";
  var response = {};

  if( (! wName) || (! this.workerExists(wName)) ){
    this.logger.error("Received message from non registered worker");
  }

  switch (request) {
    case "svc_registration":
      // Request to register service
      let svcName = msg.data.svc_name || '';
      let svcFrame = msg.data.svc_frame || undefined;
      let svcPath = msg.data.svc_path || '';

      this.registerService(svcName, svcFrame, svcPath, wName);
      break;

    case "get_svc_url":
      let svcName = msg.data.svc_name || "";
      if ( ! ( svcName && serviceExists(svcName) ) ) {
        response.error = util.format(
          "Service %s does not exist in registed services", svcName);
        response.svc_name = svcName;
        response.svc_url = '';
      }
      else{
        response.error = '';
        response.svc_url = this.getSvcUrl(svcName);
      }

      this.workers[wName].postMessage(response);
      break;

    case "active_services":
      let activeServices = [];
      for (var k in this.services) {
        let obj = {
          name: k,
          url: this.services[k].url
        };
        activeServices.push(obj);
      }
      this.workers[wName].postMessage(activeServices);
      break;

    default:
      break;
  }
};


/**
 * Send message to worker thread.
 *
 * @function callWorker
 * @memberOf ServerCore
 *
 * @param {String} wName - Worker's name to call.
 * @param {Object} msg - Message object.
 */
ServerCore.prototype.callWorker = function( wName, msg ){
  if (this.workerExists(wName)) {
    this.workers[ wName ].postMessage(msg);
  } else {
    this.logger.log(
      util.format("Attempt to call not existed worder service %s", wName)
    );
  }
};


/**
 * Kill/Terminate/Close worker service.
 *
 * @function killWorker
 * @memberOf ServerCore
 *
 * @param {String} wName - Worker name.
 */
ServerCore.prototype.killWorker = function( wName ){
  if ( this.workerExists(wName) ){
    this.workers[ wName ].terminate();
    this.logger.warn("Terminated worker: %s", wName);
  }
};


/**
 * Creates and instantiates a worker thread.
 *
 * @function createWorker
 * @memberOf ServerCore
 *
 * @param {String} wName - Worker name.
 * @param {String} wFile - JS file to feed to worker thread.
 */
ServerCore.prototype.createWorker = function(wName, wFile) {
  wName = wName || '';
  wFile = wFile || '';

  // Check if worker with given name already exists in list of registered
  // workers.
  if (this.workerExists(wName)) {
    this.logger.error(util.format("Worker with name {%s} " +
      "exists in registered workers", wName));
    return false;
  }

  var this_ = this;

  try {
    this.workers[ wName ] = new Worker ( wFile );
    // Each worker holds his own Array of registered services.
    this.workers[ wName ].services = [];
    //  Register 'this' worker onmessage callback
    this.workers[ wName ].onmessage = function(msg){
      this_.parseWorkerMsg(msg);
    };
    // Register worker onexit event callback.
    this.workers[ wName ].onexit = function(){
      this_.logger.error(util.format("Worker {%s} terminated"));
    };
    // Register worker onerror event callback.
    this.workers[ wName ].onerror = function( e ){
      this_.logger.warn(util.format("Worker send error message -> %s", e));
    };
  }
  catch(e){
    this.logger.error(util.format("Failed to launch worker {%s}: %s",
        wName, e));
    return false;
  }

  return true;
};


/**
 * Register worker service.
 * A worker can handle multiple services;
 *
 * @function registerWorker
 * @memberOf ServerCore
 *
 * @param {Object} worker - Service information.
 * @param {String} worker.file - Path to worker file.
 * @param {String} worker.name - Service name.
 */
ServerCore.prototype.registerWorker = function(worker) {
  worker = worker || {};
  /** Worker registration requires the following properties:
   *  name - worker.name.
   *  file - worker.file.
   */
  if( (! worker) || (! worker.name) ){
    this.logger.error("Could not register worker. " +
      "Not a worker name was provided");
    return false;
  }
  if (! worker.file) {
    this.logger.error("Worker registration failed. Empty worker.file property");
    return false;
  }
  // Check if worker with given name already exists in list of registered
  // workers.
  if (this.workerExists(worker.name)) {
    this.logger.error(util.format("Worker named {%s} already registered"),
      worker.name);
    return false;
  }

  // Instantiate a new worker thread.
  this.createWorker(worker.name, worker.file);
};


/**
 * Create web service full url path by given service path.
 *
 * @function serviceUrl
 * @memberOf ServerCore
 *
 * @param {String} path - Service's path (e.g. /hop/face_detection).
 */
ServerCore.prototype.serviceUrl = function(path) {
  return util.format('%s://%s:%s%s', this.serverParams.protocol,
    this.serverParams.hostname, this.serverParams.port, path);
};


/**
 * Set the url path of given web service..
 *
 * @function setSvcUrl
 * @memberOf ServerCore
 *
 * @param {String} svcName - Service name.
 */
ServerCore.prototype.setSvcUrl = function(svcName, path) {
  path = path || '';
  svcName = svcName || '';
  if (! path) {
    this.logger.error("Non service path provided!");
    return false;
  }
  if (! svcName) {
    this.logger.error("Non service name provided!");
    return false;
  }
  if( ! serviceExists(svcName) ){
    this.logger.error(util.format("Cannot set %s service url path. " +
      "Service is not registerd!", svcName)
    );
    return false;
  }
  this.services[svcName].url = this.serviceUrl(path);
  this.services[svcName].path = path;
  return true;
};


/**
 * Get the url path of given web service..
 *
 * @function getSvcUrl
 * @memberOf ServerCore
 *
 * @param {String} svcName - Service name.
 */
ServerCore.prototype.getSvcUrl = function(svcName) {
  svcName = svcName || '';
  if (! svcName) {
    this.logger.error("Non service name provided!");
    return '';
  }
  if (! serviceExists(svcName)) {
    this.logger.error(util.format("Cannot get %s service url path. " +
      "Service is not registerd!", svcName)
    );
    return '';
  }
  return this.services[ svcName ].url;
};


/**
 * Returns true if a service has been registered.
 *
 * @function serviceExists
 * @memberOf ServerCore
 *
 * @param {String} svcName - Service name.
 */
ServerCore.prototype.serviceExists = function(svcName) {
  if (this.services[svcName]) {
    return true;
  } else {
    return false;
  }
};


/**
 *  Returns true if a worker has been registered.
 *
 * @function workerExists
 * @memberOf ServerCore
 *
 * @param {String} wName - Worker name.
 */
ServerCore.prototype.workerExists = function(wName) {
  if (this.workers[wName]) {
    return true;
  }
  return false;
};

var serverCore = new ServerCore();


// Export the se r.
module.exports = serverCore;