RAPP Platform  v0.6.0
RAPP Platform is a collection of ROS nodes and back-end processes that aim to deliver ready-to-use generic services to robots
 All Classes Namespaces Files Functions Variables Macros
helper_functions.py
Go to the documentation of this file.
1 import rospy
2 import sys
3 import calendar
4 import time
5 from datetime import datetime
6 from os.path import expanduser
7 from collections import OrderedDict
8 from app_error_exception import AppError
9 import traceback
10 
11 from rapp_platform_ros_communications.srv import (
12  ontologySubSuperClassesOfSrv,
13  ontologySubSuperClassesOfSrvRequest,
14  ontologySubSuperClassesOfSrvResponse,
15  createOntologyAliasSrv,
16  createOntologyAliasSrvRequest,
17  createOntologyAliasSrvResponse,
18  userPerformanceCognitveTestsSrv,
19  userPerformanceCognitveTestsSrvRequest,
20  userScoreHistoryForAllCategoriesSrv,
21  userScoreHistoryForAllCategoriesSrvResponse,
22  getUserLanguageSrv,
23  getUserLanguageSrvRequest,
24  getUserLanguageSrvResponse,
25  cognitiveTestsOfTypeSrv,
26  cognitiveTestsOfTypeSrvRequest,
27  cognitiveTestsOfTypeSrvResponse
28  )
29 
30 from rapp_platform_ros_communications.msg import (
31  StringArrayMsg
32  )
33 
35 
36  @staticmethod
37  ## @brief Gets the user cognitive test performance records for given test type
38  # @param username [string] The user's username
39  #
40  # @return userPerformanceReq [rapp_platform_ros_communications::userPerformanceCognitveTestsSrvRequest::Request&] The user's performance records
41  # @exception Exception AppError
42  def getUserPerformanceRecordsForTestType(testType,userOntologyAlias):
43  serv_topic = rospy.get_param('rapp_knowrob_wrapper_user_performance_cognitve_tests')
44  userPerformanceReq=userPerformanceCognitveTestsSrvRequest()
45  userPerformanceReq.test_type=testType
46  userPerformanceReq.ontology_alias=userOntologyAlias
47  knowrob_service = rospy.ServiceProxy(serv_topic, userPerformanceCognitveTestsSrv)
48  return knowrob_service(userPerformanceReq)
49 
50  @staticmethod
51  ## @brief Gets the users ontology alias and if it doesnt exist it creates it
52  # @param username [string] The user's username
53  #
54  # @return ontologyAlias [string] The user's ontology alias
55  # @exception Exception AppError
56  def getUserOntologyAlias(username):
57  serv_topic = rospy.get_param('rapp_knowrob_wrapper_create_ontology_alias')
58  knowrob_service = rospy.ServiceProxy(serv_topic, createOntologyAliasSrv)
59  createOntologyAliasReq = createOntologyAliasSrvRequest()
60  createOntologyAliasReq.username=username
61  createOntologyAliasResponse = knowrob_service(createOntologyAliasReq)
62  if(createOntologyAliasResponse.success!=True):
63  raise AppError(createOntologyAliasResponse.error, createOntologyAliasResponse.trace)
64  return createOntologyAliasResponse.ontology_alias
65 
66  @staticmethod
67  ## @brief Queries the ontology and returns the cognitive test types available
68  #
69  # @return testTypesList [list] The list of the available tests as they were read from the ontology
70  # @exception Exception AppError
72  serv_topic = rospy.get_param('rapp_knowrob_wrapper_subclasses_of_topic')
73  knowrob_service = rospy.ServiceProxy(serv_topic, ontologySubSuperClassesOfSrv)
74  testTypesReq = ontologySubSuperClassesOfSrvRequest()
75  testTypesReq.ontology_class="CognitiveTests"
76  testTypesResponse = knowrob_service(testTypesReq)
77  if(testTypesResponse.success!=True):
78  testTypesResponse.trace.append("cannot load test categories from ontology")
79  raise AppError(testTypesResponse.error+"cannot load test categories from ontology",testTypesResponse.trace)
80  testTypesList=[]
81  for s in testTypesResponse.results:
82  tmpList=s.split('#')
83  testTypesList.append(tmpList[1])
84  return testTypesList
85 
86  @staticmethod
87  ## @brief Queries the ontology and returns the cognitive test languages available
88  #
89  # @return languagesList [list] The list of the available cognitive test languages they were read from the ontology
90  # @exception Exception AppError
92  serv_topic = rospy.get_param('rapp_knowrob_wrapper_subclasses_of_topic')
93  knowrob_service = rospy.ServiceProxy(serv_topic, ontologySubSuperClassesOfSrv)
94  testTypesReq = ontologySubSuperClassesOfSrvRequest()
95  testTypesReq.ontology_class="HumanLanguage"
96  testTypesResponse = knowrob_service(testTypesReq)
97  if(testTypesResponse.success!=True):
98  testTypesResponse.trace.append("cannot load test categories from ontology")
99  raise AppError(testTypesResponse.error+"cannot load test categories from ontology",testTypesResponse.trace)
100  testTypesList=[]
101  for s in testTypesResponse.results:
102  tmpList=s.split('#')
103  testTypesList.append(tmpList[1])
104  return testTypesList
105 
106  @staticmethod
107  ## @brief Gets the cognitive tests of the given type and difficulty available in the ontology
108  # @param testType [string] The test type (category)
109  # @param userLanguage [string] The user's language
110  #
111  # @return cognitiveTestsOfTypeResponse [cognitiveTestsOfTypeResponse] The cognitive tests of type service response
112  # @exception Exception AppError
113  def getCognitiveTestsOfType(testType,userLanguage):
114  serv_topic = rospy.get_param('rapp_knowrob_wrapper_cognitive_tests_of_type')
115  cognitiveTestsOfTypeSrvReq=cognitiveTestsOfTypeSrvRequest()
116  cognitiveTestsOfTypeSrvReq.test_type=testType
117  cognitiveTestsOfTypeSrvReq.test_language=userLanguage
118  knowrob_service = rospy.ServiceProxy(serv_topic, cognitiveTestsOfTypeSrv)
119  cognitiveTestsOfTypeResponse = knowrob_service(cognitiveTestsOfTypeSrvReq)
120  if(cognitiveTestsOfTypeResponse.success!=True):
121  raise AppError(cognitiveTestsOfTypeResponse.error, cognitiveTestsOfTypeResponse.trace)
122  #testsOfTypeOrdered=self.filterTestsbyDifficulty(cognitiveTestsOfTypeResponse,chosenDif,testSubType,trace)
123  return cognitiveTestsOfTypeResponse
124 
125  @staticmethod
126  ## @brief Filters the tests by Difficulty and test subtype
127  # @param testsOfType [dictionary] The dictionary containing all the tests
128  # @param chosenDif [string] The test difficulty
129  # @param testSubType [string] The test sub type
130  #
131  # @return d [dictionary] The selected cognitive tests after filtering was applied
132  def filterTestsbyDifficultyAndSubtype(testsOfType,chosenDif,testSubType):
133  testSubTypePrefix="http://knowrob.org/kb/knowrob.owl#"
134  d=dict()
135  markForDeletion=[]
136  for i in range(len(testsOfType.tests)):
137  if((chosenDif=="" or chosenDif==testsOfType.difficulty[i]) and (testSubType=="" or testSubTypePrefix+testSubType==testsOfType.subtype[i])):
138  tlist=[testsOfType.file_paths[i],testsOfType.difficulty[i],testsOfType.subtype[i], testsOfType.test_id[i]]
139  d[testsOfType.tests[i]]=[tlist]
140  return d
141 
142  @staticmethod
143  ## @brief Queries the MySQL database through the MySQL wrapper and returns the user's language
144  # @param username [string] The username of the user as is in the MySQL database
145  # @param languageInSrv [string] The user's language for assigning it to the srv response trace
146  #
147  # @return userLanguage [string] The user's language
148  # @exception Exception AppError
149  def getUserLanguage(username):
150  serv_topic = rospy.get_param('rapp_mysql_wrapper_get_user_language_service_topic')
151  mysql_service = rospy.ServiceProxy(serv_topic, getUserLanguageSrv)
152  getUserLanguageSrvReq = getUserLanguageSrvRequest()
153  getUserLanguageSrvReq.username=username
154  getUserLanguageSrvResponse = mysql_service(getUserLanguageSrvReq)
155  if(getUserLanguageSrvResponse.success!=True):
156  raise AppError(getUserLanguageSrvResponse.error, getUserLanguageSrvResponse.trace)
157  return getUserLanguageSrvResponse.user_language
158 
159  @staticmethod
160  ## @brief Validates the provided test type or selects all test types from ontology if non provided
161  # @param testType [string] The provided test type
162  # @param validtestTypesList [list] The list of valid test types
163  #
164  # @return testTypesList [list] The list of testTypes returned
165  # @exception Exception AppError
166  def determineTestTypeListForReturningScoresOrHistory(testType,validtestTypesList):
167  if(not testType==""):
168  if(testType not in validtestTypesList):
169  error="invalid test type, not contained in ontology subclasses of cognitive test types"
170  raise AppError(error,error)
171  testTypesList=[]
172  testTypesList.append(testType)
173  else:
174  testTypesList=validtestTypesList
175  return testTypesList
176 
177  @staticmethod
178  ## @brief Organizes the user's performance entries by timestamp
179  # @param userPerf [dict] The dictionary containing the user's performance entries
180  #
181  # @return userPerfOrganizedByTimestamp [OrderedDict] The dictionary containing the user's performance entries organized by timestamp
183  userPerfOrganizedByTimestamp=OrderedDict()
184  for i in range(len(userPerf.tests)):
185  tlist=[userPerf.tests[i],userPerf.scores[i],userPerf.difficulty[i], userPerf.subtypes[i]]
186  userPerfOrganizedByTimestamp[int(userPerf.timestamps[i])]=[tlist]
187  userPerfOrganizedByTimestamp=OrderedDict(sorted(userPerfOrganizedByTimestamp.items(), key=lambda t: t[0], reverse=True)) #Order is descending
188  return userPerfOrganizedByTimestamp
189 
190  @staticmethod
191  ## @brief Traces and returns the code line where an error occured
192  # @param error [String] the error occured
193  # @param trace [list] the execution messages trace list
194  #
195  # @param trace [list] the execution messages trace list
196  def traceError(error,trace):
197  for frame in traceback.extract_tb(sys.exc_info()[2]):
198  fname,lineno,fn,text = frame
199  print "Error in %s on line %d" % (fname, lineno)
200  error = error + "Error in %s on line %d" % (fname, lineno)
201  trace.append("Error in %s on line %d" % (fname, lineno))
def traceError
Traces and returns the code line where an error occured.
def determineTestTypeListForReturningScoresOrHistory
Validates the provided test type or selects all test types from ontology if non provided.
def getTestTypesFromOntology
Queries the ontology and returns the cognitive test types available.
Exception compliant with the ros error and trace srvs.
def filterTestsbyDifficultyAndSubtype
Filters the tests by Difficulty and test subtype.
def getUserOntologyAlias
Gets the users ontology alias and if it doesnt exist it creates it.
def getTestLanguagesFromOntology
Queries the ontology and returns the cognitive test languages available.
def organizeUserPerformanceByTimestamp
Organizes the user's performance entries by timestamp.
def getUserLanguage
Queries the MySQL database through the MySQL wrapper and returns the user's language.
def getUserPerformanceRecordsForTestType
Gets the user cognitive test performance records for given test type.
def getCognitiveTestsOfType
Gets the cognitive tests of the given type and difficulty available in the ontology.