<?php namespace SmartData; if (!extension_loaded('cassandra')) die ("Unable to load cassandra extension\n"); require_once( __DIR__ . '/Config.php'); require_once( __DIR__ . '/Logger.php'); require_once( __DIR__ . '/Exception.php'); require_once( __DIR__ . '/Aggregators.php'); require_once( __DIR__ . '/Series.php'); require_once( __DIR__ . '/SmartData.php'); require_once( __DIR__ . '/Unit.php'); require_once( __DIR__ . '/Packer.php'); require_once( __DIR__ . '/Time.php'); use SmartData\Utility\{Pack,Unpack}; use SmartData\Exception\{AuthenticationException,BadRequestException,InternalException}; class Backend_Common { protected $_domain; protected $_username; protected $_password; protected $_client_ip; protected $_client_name; protected $_gw_id; private $_session_id; private $_internal; protected function internal(){ return $this->_internal; } protected const READ = 'r'; protected const WRITE = 'w'; // The parameter "$internal" means that the Backend has been called without a requisition. // Please, be careful and use only for development tasks. public function __construct(Credentials $credentials = NULL, $internal=false) { $credentials = $credentials ?? new Credentials(); $testint = 0x00FF; // Ensure the machine is little-endian if(!($testint===current(unpack('v', pack('S', $testint))))) throw new \Exception("The machine is not little-endian"); $this->_gw_id = -1; //NOTE: That means that the client isn't a known gateway $this->_internal = $internal; //NOTE: just for developers $USER = ($credentials->username != NULL and $credentials->password != NULL); $DOMAIN = ($credentials->domain != NULL); // If it's not in an internal mode and the request is not secure (SSL), only the // domains listed in Config::config()::ACCESSIBLE_WITHOUT_SSL can be accessed. if (!$internal && !REQUEST_HTTPS && !in_array($credentials->domain, Config::config()::ACCESSIBLE_WITHOUT_SSL)){ throw new AuthenticationException("Authentication failure [{$credentials}]"); } switch (REQUEST_URI) { case 'get.php': /** * ====== Summary of rules ====== * HTTPS ^ CERT ^ domain -> OK, domain = $domain * HTTPS ^ CERT -> OK, domain = DB(CERT)->domain * HTTPS ^ domain ^ valid(usr+password) -> OK, domain = $domain * HTTPS ^ valid(usr+password) -> OK, domain = public * HTTPS ^ domain -> OK, domain is public ? domain = $domain : fail * HTTPS -> OK, domain = public * HTTP ^ valid(usr+password) -> OK, domain = hydrology * Fail */ if ( REQUEST_HTTPS && REQUEST_CERT && $DOMAIN ) { if(!$this->_checkCertificate(REQUEST_SERIAL, self::READ, $credentials->domain)) throw new AuthenticationException(REQUEST_DBG.' crd:'.$credentials.'(CD) Unauthorized access'); } else if ( REQUEST_HTTPS && REQUEST_CERT ) { if(!$this->_checkCertificate(REQUEST_SERIAL, self::READ)) throw new AuthenticationException(REQUEST_DBG.' crd:'.$credentials.'(C) Unauthorized access'); } else if ( REQUEST_HTTPS && $DOMAIN && $USER ) { if(!$this->_checkUser($credentials->username, $credentials->password, self::READ, $credentials->domain)) throw new AuthenticationException(REQUEST_DBG.' crd:'.$credentials.'(DU) Unauthorized access'); } else if ( REQUEST_HTTPS && $USER ) { $credentials->domain = Config::config()::PUBLIC_DOMAIN; if(!$this->_checkUser($credentials->username, $credentials->password, self::READ, Config::config()::PUBLIC_DOMAIN)) throw new AuthenticationException(REQUEST_DBG.' crd:'.$credentials.'(U) Unauthorized access'); } else if ( REQUEST_HTTPS && $DOMAIN ) { // NOTE: Publicly readable domains if(!$this->_checkUser(Config::config()::DEFAULT_USERNAME, Config::config()::DEFAULT_PASSWORD, self::READ, $credentials->domain)) throw new AuthenticationException(REQUEST_DBG.' crd:'.$credentials.'(D) Unauthorized access'); } else if ( REQUEST_HTTPS ) { $this->_domain = Config::config()::PUBLIC_DOMAIN; $this->_username = Config::config()::DEFAULT_USERNAME; $this->_password = Config::config()::DEFAULT_PASSWORD; } else if ( !REQUEST_HTTPS && $DOMAIN && $USER ) { //NOTE: Domain that are accessible with HTTP (without SSL) if(!$this->_checkUser($credentials->username, $credentials->password, self::READ, $credentials->domain)) throw new AuthenticationException(REQUEST_DBG.' crd:'.$credentials.'(DU) Unauthorized access'); } else { throw new BadRequestException(REQUEST_DBG.' crd:'.$credentials.' Bad protocol and options'); } break; case 'create.php' : case 'attach.php' : case 'put.php' : case 'batch.php' : case 'put_batch.php' : /** * ====== Summary of rules ====== * HTTPS ^ CERT ^ domain -> OK, domain = $domain * HTTPS ^ CERT -> OK, domain = DB(CERT)->domain * HTTPS ^ domain ^ valid(usr+password) -> OK, domain = $domain * HTTPS ^ valid(usr+password) -> OK, domain = public * HTTP ^ valid(usr+password) -> OK, domain = hydrology * Fail */ if ( REQUEST_HTTPS && REQUEST_CERT && $DOMAIN ) { if(!$this->_checkCertificate(REQUEST_SERIAL, self::WRITE, $credentials->domain)) throw new AuthenticationException(REQUEST_DBG.' crd:'.$credentials.'(CD) Unauthorized access'); } else if ( REQUEST_HTTPS && REQUEST_CERT ) { if(!$this->_checkCertificate(REQUEST_SERIAL, self::WRITE)) throw new AuthenticationException(REQUEST_DBG.' crd:'.$credentials.'(C) Unauthorized access'); } else if ( REQUEST_HTTPS && $DOMAIN && $USER ) { if(!$this->_checkUser($credentials->username, $credentials->password, self::WRITE, $credentials->domain)) throw new AuthenticationException(REQUEST_DBG.' crd:'.$credentials.'(DU) Unauthorized access'); } else if ( REQUEST_HTTPS && $USER ) { //TODO: Check what is the main idea $credentials->domain = Config::config()::PUBLIC_DOMAIN; if(!$this->_checkUser($credentials->username, $credentials->password, self::WRITE, Config::config()::PUBLIC_DOMAIN)) throw new AuthenticationException(REQUEST_DBG.' crd:'.$credentials.'(U) Unauthorized access'); } else if ( REQUEST_HTTPS && $DOMAIN ) { // NOTE: Public writable domains if(!$this->_checkUser(Config::config()::DEFAULT_USERNAME, Config::config()::DEFAULT_PASSWORD, self::WRITE, $credentials->domain)) throw new AuthenticationException(REQUEST_DBG.' crd:'.$credentials.'(D) Unauthorized access'); } else if ( !REQUEST_HTTPS && $DOMAIN && $USER ) { //NOTE: Domain that are accessible with HTTP (without SSL) if(!$this->_checkUser($credentials->username, $credentials->password, self::WRITE, $credentials->domain)) throw new AuthenticationException(REQUEST_DBG.' crd:'.$credentials.'(DU) Unauthorized access'); } else { throw new BadRequestException(REQUEST_DBG.' crd:'.$credentials.' Bad protocol and options'); } break; default: if($this->internal()){ $this->_domain = $credentials->domain; $this->_username = $credentials->username; $this->_password = $credentials->password; } else throw new BadRequestException(REQUEST_DBG.' crd:'.$credentials.' Bad protocol and options'); break; } if($this->_domain == null || $this->_username == null || $this->_password == null){ throw new BadRequestException("Incorrect use of protocol and options {$credentials}"); } $this->_session_id = $this->_domain.'_'.substr(uniqid(), -5); if(REQUEST_URI != 'get.php') self::debug_X($this); } // Checks the health of databases public static function health_check() : bool { $sucess = true; try { $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); $conn = null; $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, Config::config()::DEFAULT_USERNAME, Config::config()::DEFAULT_PASSWORD, Config::config()::CASSANDRA_KEYSPACE); $session->close(); } catch (\Exception $e) { Logger::exception($e); $sucess = false; } finally { if (isset($session)) $session->close(); $conn = null; } return $sucess; } // Check Certificate private function _checkCertificate(string $certificate, string $level, string $domain = null) : bool { $sucess = true; $target_domain = ""; $gw_id = -1; $parameters = array(':certificate' => $certificate, ':level' => $level); try { $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); if(Config::config()::api_version() == Config::config()::VERSION_1_1) // TODO: Remove from development_mode $table = 'clients'; else $table = 'gateways'; // table that has to be changed to clients if($domain == null){ $stmt = $conn->prepare("SELECT id, domain FROM {$table} WHERE enable = true AND certificate = :certificate AND INSTR(level, :level) > 0 ORDER BY id ASC LIMIT 1"); } else { $parameters = array_merge($parameters, array(':domain'=> $domain)); $stmt = $conn->prepare("SELECT id, domain FROM {$table} WHERE enable = true AND certificate = :certificate AND (domain = :domain OR domain = '*') AND INSTR(level, :level) > 0"); } $stmt->execute($parameters); if($stmt->rowCount() != 1) throw new AuthenticationException("Permission not found or number of domains larger than one [{$certificate} / {$level}]"); else{ $result = $stmt->fetch(\PDO::FETCH_OBJ); $target_domain = $domain ?? $result->domain; $gw_id = $result->id; } } catch (\Exception $e) { Logger::exception($e); $sucess = false; } finally { $conn = null; } if ($sucess) { $this->_domain = $target_domain; $this->_username = Config::config()::CASSANDRA_SUPERUSER; $this->_password = Config::config()::CASSANDRA_SUPERPASS; $this->_gw_id = $gw_id; return true; } else { return false; } } // Check User Connection private function _checkUserConnection(string $username, string $password) : bool { $sucess = false; try { $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, $username, $password, Config::config()::CASSANDRA_KEYSPACE); $sucess = true; } catch (\Exception $e) { Logger::exception($e); $sucess = false; } finally { if (isset($session)) $session->close(); } return $sucess; } // Check if an user has permission to read/write on a domain private function _checkUser(string $username, string $password, string $level, string $domain = null) : bool { //TODO: Check what is the main idea if($domain == NULL) return false; $sucess = false; try { $permission = array(); switch($level) { case self::READ: array_push($permission, 'SELECT'); case self::WRITE: array_push($permission, 'MODIFY'); break; // Test to write default; return false; // invalid option } if(Config::config()::api_version() == Config::config()::VERSION_1_1) { // TODO: Remove from development_mode $keyspace = Config::config()::CASSANDRA_KEYSPACE; //$query = "LIST ALL PERMISSIONS ON table {$keyspace}.{$domain} OF {$username};"; $query = "LIST ALL PERMISSIONS OF {$username};"; $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, Config::config()::CASSANDRA_SUPERUSER, Config::config()::CASSANDRA_SUPERPASS, Config::config()::CASSANDRA_KEYSPACE); $statement = new \Cassandra\SimpleStatement($query); $result = $session->execute($statement); foreach ($result as $column) { $pattern = "<table {$keyspace}.{$domain}_v"; if((strpos($column['resource'], $pattern) !== false)) { if(in_array($column['permission'], $permission)){ $sucess = true; break; } } } if($sucess) $sucess = $this->_checkUserConnection($username, $password); } else { $keyspace = Config::config()::CASSANDRA_KEYSPACE; $query = "LIST ALL PERMISSIONS ON table {$keyspace}.{$domain} OF {$username};"; $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, Config::config()::CASSANDRA_SUPERUSER, Config::config()::CASSANDRA_SUPERPASS, Config::config()::CASSANDRA_KEYSPACE); $statement = new \Cassandra\SimpleStatement($query); $result = $session->execute($statement); foreach ($result as $column) { if(in_array($column['permission'], $permission)){ $sucess = $this->_checkUserConnection($username, $password); break; } } } } catch (\Exception $e) { Logger::exception($e); $sucess = false; } finally { if (isset($session)) $session->close(); } if ($sucess) { $this->_domain = $domain; $this->_username = $username; $this->_password = $password; return true; } else { return false; } } // Open a connection with MySQL database protected static function _mysqlConnect(string $servername, int $port, string $dbname, string $username, string $password) : \PDO { $conn = new \PDO("mysql:host=$servername;port=$port;dbname=$dbname", $username, $password); // set the PDO error mode to exception $conn->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); return $conn; } // Open a connection with Cassandra database protected static function _cassandraConnect(string $servername, int $port, string $username, string $password, string $keyspace) : \Cassandra\DefaultSession { $cluster = \Cassandra::cluster()->withContactPoints($servername) ->withPort($port) ->withCredentials($username, $password) //->withDefaultPageSize(200) ->build(); $session = $cluster->connect($keyspace); return $session; } protected function debug_X(string $message, $ctrl=false) { Logger::debug_X('['.$this->_session_id.'] '.$message, ($ctrl === false) ? get_called_class() : $ctrl); } public function __toString() { return REQUEST_DBG.' crd:{'.$this->_username.'@'.$this->_domain.'}'; } } class Tracker { public $version; public $unit; public $signature; public $t0; public $t1; public function __construct($version, $unit, $signature, $t0, $t1) { // Conditions for a valid tracker. if ((gmp_cmp(gmp_init($t0), gmp_init($t1)) > 0) or (false)){ throw new BadRequestException("Invalid tracker"); } $this->version = $version; $this->unit = $unit; $this->signature = $signature; $this->t0 = $t0; $this->t1 = $t1; } } final class Credentials { public $domain; public $username; public $password; public function __construct($d=null, $u=null, $p=null) { if ($u == null xor $p == null) { throw new Exception\BadCredentialsException("Invalid credentials fields [{$username}@{$domain}]"); } $this->domain = $d; $this->username = $u; $this->password = $p; } public function pack(array $suppress = array()) { $bin = ''; $bin .= Pack::string($this->domain); $bin .= Pack::string($this->username); $bin .= Pack::string($this->password); return $bin; } public static function unpack(& $bin, array $compl = array()) { $domain = Unpack::string($bin, true); $username = Unpack::string($bin, true); $password = Unpack::string($bin, true); return new self($domain, $username, $password); } public static function fromJson(\stdClass $json) { return new self($json->domain ?? null, $json->username ?? null, $json->password ?? null); } public function __toString() { $string = "{"; $string .= "{$this->username}@{$this->domain}, "; $string .= strlen($this->password); $string .= '}'; return $string; } } ## Backend of API version 1.0 ############################################################################## class Backend_V1_0 extends Backend_Common { // Create a new series if not exist //public function create(Series $series) : bool { public function create($series) { $sucess = true; $parameters = array(':version' => $series->version, ':unit' => $series->unit, ':x' => $series->x, ':y' => $series->y, ':z' => $series->z, ':r' => $series->r, ':t0' => $series->t0, ':t1' => $series->t1, ':domain' => $this->_domain, ':count' => Config::config()::CASSANDRA_MAX_ROW_SIZE); try { $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); $table = Config::config()::MySQL_Table($series->version); $stmt_select = $conn->prepare("SELECT * FROM {$table} WHERE version = :version AND unit = :unit AND x = :x AND y = :y AND z = :z AND r = :r AND t0 = :t0 AND t1 = :t1 AND domain = :domain AND count < :count ORDER BY id DESC LIMIT 1;"); $stmt_select->execute($parameters); $series = $stmt_select->fetch(\PDO::FETCH_OBJ); if ($series == NULL) { // Not exists yet $stmt_insert = $conn->prepare("INSERT INTO {$table} VALUES (DEFAULT, :version, :unit, :x, :y, :z, :r, :t0, :t1, DEFAULT, :domain, (SELECT counter(:domain)), DEFAULT);"); $stmt_insert->execute($parameters); } } catch (\Exception $e) { Logger::exception($e); $sucess = false; } finally { $conn = null; } return $sucess; } // Create a new series if there is not one that contains it //public function attach(Series $series) : bool { public function attach($series) { $sucess = true; $table = Config::config()::MySQL_Table($series->version); if($series->version == SmartData::MOBILE_VERSION){ $parameters = array(':version' => $series->version, ':unit' => $series->unit, ':signature' => $series->signature, ':x' => $series->x, ':y' => $series->y, ':z' => $series->z, ':r' => $series->r, ':t0' => $series->t0, ':t1' => $series->t1, ':domain' => $this->_domain, ':count' => Config::config()::CASSANDRA_MAX_ROW_SIZE); $query = "SELECT * FROM {$table} WHERE version = :version AND unit = :unit AND signature = :signature AND r >= (SQRT(POW(x - :x, 2) + POW(y - :y, 2) + POW(z - :z, 2)) + :r) AND t0 <= :t0 AND t1 >= :t1 AND domain = :domain AND count < :count ORDER BY r ASC LIMIT 1;"; $insert = "INSERT INTO {$table} VALUES (DEFAULT, :version, :unit, :x, :y, :z, :r, :t0, :t1, DEFAULT, :domain, (SELECT counter(:domain)), DEFAULT, :signature);"; Logger::debug('Attaching MOBILE:'.$series, true); }else{ Logger::debug('Attaching STATIC'.$series, true); $parameters = array(':version' => $series->version, ':unit' => $series->unit, ':x' => $series->x, ':y' => $series->y, ':z' => $series->z, ':r' => $series->r, ':t0' => $series->t0, ':t1' => $series->t1, ':domain' => $this->_domain, ':count' => Config::config()::CASSANDRA_MAX_ROW_SIZE); $query = "SELECT * FROM {$table} WHERE version = :version AND unit = :unit AND r >= (SQRT(POW(x - :x, 2) + POW(y - :y, 2) + POW(z - :z, 2)) + :r) AND t0 <= :t0 AND t1 >= :t1 AND domain = :domain AND count < :count ORDER BY r ASC LIMIT 1;"; $insert = "INSERT INTO {$table} VALUES (DEFAULT, :version, :unit, :x, :y, :z, :r, :t0, :t1, DEFAULT, :domain, (SELECT counter(:domain)), DEFAULT, DEFAULT);"; } try { $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); $stmt_select = $conn->prepare($query); $stmt_select->execute($parameters); $series = $stmt_select->fetch(\PDO::FETCH_OBJ); if ($series == NULL) { $stmt_insert = $conn->prepare($insert); $stmt_insert->execute($parameters); Logger::debug(__FILE__."> Attach (new series)"); }else{ //print_r($series); } } catch (\Exception $e) { Logger::exception($e); $sucess = false; } finally { $conn = null; } return $sucess; } // Insert SmartData into series/tracker matched public function insert(SmartData $smart_data) { if ($smart_data instanceof StaticSmartData) return $this->_insertStaticSmartData($smart_data); else if ($smart_data instanceof MobileSmartData) return $this->_insertMobileSmartData($smart_data); else throw new BadRequestException("Error processing JSON request: unsupported SmartData version"); } // Insert a static version of smartdata private function _insertStaticSmartData(StaticSmartData $smart_data) : bool { $sucess = true; $parameters = array(':unit' => $smart_data->unit, ':x' => $smart_data->x, ':y' => $smart_data->y, ':z' => $smart_data->z, ':time' => $smart_data->time, ':domain' => $this->_domain); $is_digital = $smart_data->unit()->is_digital(); try { $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); $table = Config::config()::MySQL_Table($smart_data->version); $series = NULL; $first = true; do { $stmt_select = $conn->prepare("SELECT * FROM {$table} WHERE unit = :unit AND (SQRT(POW(x - :x, 2) + POW(y - :y, 2) + POW(z - :z, 2))) <= r AND :time >= t0 AND :time <= t1 AND domain = :domain ORDER BY r ASC, count ASC LIMIT 1;"); $stmt_select->execute($parameters); $series = $stmt_select->fetch(\PDO::FETCH_OBJ); if ($first && $series!=NULL && $series->count>=Config::config()::CASSANDRA_MAX_ROW_SIZE) { $this->create(new Series($series->version, $series->unit, $series->x, $series->y, $series->z, $series->r, $series->t0, $series->t1)); $first = false; } else { break; } } while (true); if ($series == NULL) { throw new Exception\NoMatchingSeriesException('No matching series'); }else{ //Logger::debug("Inserting StaticSmartData"); $entry_type = \Cassandra\Type::userType( // Just for attend projects requirements of SO2 ------------------------------------- 'value', ($is_digital) ? \Cassandra\Type::blob() : \Cassandra\Type::double(), // ---------------------------------------------------------------------------------- 'error', \Cassandra\Type::tinyint(), 'confidence', \Cassandra\Type::tinyint(), 'x', \Cassandra\Type::int(), 'y', \Cassandra\Type::int(), 'z', \Cassandra\Type::int(), 'd', \Cassandra\Type::bigint(), 'gw', \Cassandra\Type::bigint() ); $smart_data->confidence = $this->assign_confidence($smart_data); $arguments = array( new \Cassandra\Bigint($series->slice), new \Cassandra\Varint($smart_data->time), $entry_type->create( 'value', ($is_digital) ? new \Cassandra\Blob($smart_data->value) : (double) $smart_data->value, 'error', new \Cassandra\Tinyint($smart_data->error), 'confidence', new \Cassandra\Tinyint($smart_data->confidence), 'x', (int)$smart_data->x, 'y', (int)$smart_data->y, 'z', (int)$smart_data->z, 'd', new \Cassandra\Bigint($smart_data->dev??0), //TODO: remove the ?? //'gw', new \Cassandra\Bigint($smart_data->gw->id) 'gw', new \Cassandra\Bigint($this->_gw_id) // TODO: CHECK THIS ) ); $cassandra_table = "{$this->_domain}" . (($is_digital) ? '_dig' : ''); $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, $this->_username, $this->_password, Config::config()::CASSANDRA_KEYSPACE); $insert_stmt = $session->prepare("INSERT INTO {$cassandra_table} (id, timestamp, entry) VALUES (?, ?, ?);"); $session->execute($insert_stmt, array('arguments' => $arguments)); $update_stmt = $conn->prepare("UPDATE {$table} SET count = count + 1 WHERE id = :id LIMIT 1;"); $update_stmt->execute(array('id' => $series->id)); } } catch (\Exception $e) { Logger::exception($e); $sucess = false; } finally { if (isset($session)) $session->close(); $conn = null; } return $sucess; } // Insert a mobile version of smartdata private function _insertMobileSmartData(MobileSmartData $smart_data) : bool { Logger::debug("_insertMobileSmartData".$smart_data, true); $sucess = true; $parameters = array(':unit' => $smart_data->unit, ':signature' => $smart_data->signature, ':x' => $smart_data->x, ':y' => $smart_data->y, ':z' => $smart_data->z, ':time' => $smart_data->time, ':domain' => $this->_domain); $is_digital = $smart_data->unit()->is_digital(); try { $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); $table = Config::config()::MySQL_Table($smart_data->version); $series = NULL; $first = true; do { $stmt_select = $conn->prepare("SELECT * FROM {$table} WHERE unit = :unit AND (SQRT(POW(x - :x, 2) + POW(y - :y, 2) + POW(z - :z, 2))) <= r AND :time >= t0 AND :time <= t1 AND domain = :domain AND signature = :signature ORDER BY r ASC, count ASC LIMIT 1;"); $stmt_select->execute($parameters); $series = $stmt_select->fetch(\PDO::FETCH_OBJ); if ($first && $series!=NULL && $series->count>=Config::config()::CASSANDRA_MAX_ROW_SIZE) { $this->create(new Series($series->version, $series->unit, $series->x, $series->y, $series->z, $series->r, $series->t0, $series->t1)); $first = false; } else { break; } } while (true); if ($series == NULL) { throw new Exception\NoMatchingSeriesException('No matching series'); }else{ //Logger::debug("Inserting StaticSmartData"); $entry_type = \Cassandra\Type::userType( // Just for attend projects requirements of SO2 ------------------------------------- 'value', ($is_digital) ? \Cassandra\Type::blob() : \Cassandra\Type::double(), // ---------------------------------------------------------------------------------- 'error', \Cassandra\Type::tinyint(), 'confidence', \Cassandra\Type::tinyint(), 'x', \Cassandra\Type::int(), 'y', \Cassandra\Type::int(), 'z', \Cassandra\Type::int(), 'd', \Cassandra\Type::bigint(), 'gw', \Cassandra\Type::bigint() ); $smart_data->confidence = $this->assign_confidence($smart_data); $arguments = array( new \Cassandra\Bigint($series->slice), new \Cassandra\Varint($smart_data->time), $entry_type->create( 'value', ($is_digital) ? new \Cassandra\Blob($smart_data->value) : (double) $smart_data->value, 'error', new \Cassandra\Tinyint($smart_data->error), 'confidence', new \Cassandra\Tinyint($smart_data->confidence), 'x', (int)$smart_data->x, 'y', (int)$smart_data->y, 'z', (int)$smart_data->z, 'd', new \Cassandra\Bigint($smart_data->dev??0), //TODO: remove the ?? //'gw', new \Cassandra\Bigint($smart_data->gw->id) 'gw', new \Cassandra\Bigint($this->_gw_id) // TODO: CHECK THIS ) ); $cassandra_table = "{$this->_domain}" . (($is_digital) ? '_dig' : ''); $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, $this->_username, $this->_password, Config::config()::CASSANDRA_KEYSPACE); $insert_stmt = $session->prepare("INSERT INTO {$cassandra_table} (id, timestamp, entry) VALUES (?, ?, ?);"); $session->execute($insert_stmt, array('arguments' => $arguments)); $update_stmt = $conn->prepare("UPDATE {$table} SET count = count + 1 WHERE id = :id LIMIT 1;"); $update_stmt->execute(array('id' => $series->id)); } } catch (\Exception $e) { Logger::exception($e); $sucess = false; } finally { if (isset($session)) $session->close(); $conn = null; } return $sucess; } // Query data from series public function query(Series $series, $aggr=null, $options=null) { $response_json = NULL; try { $parameters = array(':version' => $series->version, ':t0' => $series->t0, ':t1' => $series->t1, ':domain' => $this->_domain); $table = Config::config()::MySQL_Table($series->version); $query = "SELECT slice, unit FROM {$table} WHERE version = :version AND ( (:t0 >= t0 AND :t0 <= t1) OR (:t0 <= t0 AND :t1 >= t1) OR (:t1 >= t0 AND :t1 <= t1 ) )"; if($series->unit != null){ $parameters = array_merge($parameters, array(':unit' => $series->unit)); //$parameters = array_merge($parameters, array(':dev' => $series->dev)); $query .= ' AND unit = :unit'; //$query .= ' AND dev = :dev'; } $invalid_sphere = (int)($series->x === null) + (int)($series->y === null) + (int)($series->z === null) + (int)($series->r === null); if ($invalid_sphere != 0 && $invalid_sphere != 4) { throw new BadRequestException("Invalid sphere parameters"); } else if($invalid_sphere == 0) { $parameters = array_merge($parameters, array(':x' => $series->x, ':y' => $series->y, ':z' => $series->z, ':r' => $series->r)); $query .= ' AND SQRT(POW(x - :x, 2) + POW(y - :y, 2) + POW(z - :z, 2)) <= :r + r'; $filter_sphere = true; } else { $filter_sphere = false; } $query .= ' AND domain = :domain AND count > 0;'; $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); $select_stmt = $conn->prepare($query); $select_stmt->execute($parameters); $id_set = \Cassandra\Type::collection(\Cassandra\Type::bigint())->create(); $selected_slices = array_column($select_stmt->fetchAll(), 'unit', 'slice'); foreach ($selected_slices as $slice => $unit) $id_set->add(new \Cassandra\Bigint($slice)); $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, $this->_username, $this->_password, Config::config()::CASSANDRA_KEYSPACE); $is_digital = $series->unit()->is_digital(); if($is_digital) { $type = $_GET['type'] ?? $options->type ?? null; $type = $type << 29; } $cassandra_table = "{$this->_domain}" . (($is_digital) ? '_dig' : ''); $cass_select_stmt = $session->prepare("SELECT * FROM {$cassandra_table} WHERE (id IN ?) AND (timestamp >= ?) AND (timestamp <= ?);"); $result_page = $session->execute($cass_select_stmt, array('arguments' => array($id_set, new \Cassandra\Varint($series->t0), new \Cassandra\Varint($series->t1)))); //ini_set('memory_limit', '-1'); // check this //ini_set('max_execution_time', 300); // check this $index = 0; $response_json = array('series' => array()); $aggregator = ($this->internal()) ? null : Aggregator::new($aggr); while($result_page) { foreach ($result_page as $column) { $slice = (int)$column['id']; $entry = $column['entry']; $time = (string)$column['timestamp']->value(); //bigint if($is_digital){ $value_blob = $entry->get('value')->toBinaryString(); switch ($type) { case Unit::I32: $value = Unpack::uInt32($value_blob); break; case Unit::I64: $value = Unpack::uInt64($value_blob); break; case Unit::F32: $value = Unpack::f32($value_blob); break; case Unit::D64: break; default: $value = Unpack::d64($value_blob); break; } if($value === null){ if($value_blob != null) $value = $value_blob; // TODO: remove this in the new API else $value = -400; // TODO: remover isso quando valores inválidos não puderem mais ser inseridos } } else { $value = (double)$entry->get('value'); // double } $error = (int)$entry->get('error'); //tinyint $conf = (int)$entry->get('confidence'); //tinyint $x = (int)$entry->get('x'); //int $y = (int)$entry->get('y'); //int $z = (int)$entry->get('z'); //int $d = (int)$entry->get('d'); //int $gw = (int)$entry->get('gw'); //int (TO DO GET GATEWAY) if ( $filter_sphere && !$series->contains_point($x, $y, $z) ) continue; if ($series->unit == 2226272548 && $d != $series->dev) //Angle Rate Sensor (testing) continue; if ($this->_domain == 'tutorial' && $series->unit == 2224179556 && $d != $series->dev) //Debuging continue; $smartdata = null; switch ($series->version) { case SmartData::STATIC_VERSION: $smartdata = new StaticSmartData($series->unit,$value,$error,$conf,$x,$y,$z,$time,$d,$gw); break; case SmartData::MOBILE_VERSION: $smartdata = new MobileSmartData($series->unit,$value,$error,$conf,$x,$y,$z,$time,$d,$gw, null); break; default: continue 2; break; } if($aggregator != null) { $smartdata = $aggregator->aggregate($smartdata); } if($smartdata != null) { if(is_array($smartdata)){ foreach ($smartdata as $key => $data) { $response_json['series'][$index++] = $data->toArray(); if ($index >= Config::config()::POINTS_LIMIT) break; } } else { $response_json['series'][$index++] = $smartdata->toArray(); if($this->internal()){ $response_json['series'][$index-1]['slice'] = $slice; $response_json['series'][$index-1]['gw'] = $gw; } } } else { continue; } if ($index >= Config::config()::POINTS_LIMIT) break 2; } $result_page = $result_page->nextPage(); if(!$result_page && isset($aggregator) && $index < Config::config()::POINTS_LIMIT){ $smartdata = $aggregator->finish(); if($smartdata != null) { $response_json['series'][$index++] = $smartdata->toArray(); } } } $session->close(); usort($response_json['series'], function ($a, $b) { $a_timestamp = gmp_init($a['timestamp']); $b_timestamp = gmp_init($b['timestamp']); return gmp_cmp($a_timestamp, $b_timestamp); }); } catch (\Exception $e) { Logger::exception($e); $response_json = NULL; } finally { if (isset($session)) $session->close(); $conn = null; } return $response_json; } // Track mobile node //public function track(Tracker $tracker) : bool { public function track(Series $series, $aggr=null) { //throw new Exception\NotImplementedException("Tracker has not been implemented yet"); $response_json = NULL; try { $parameters = array(':version' => $series->version, ':signature' => $series->signature, ':t0' => $series->t0, ':t1' => $series->t1, ':domain' => $this->_domain); $table = Config::config()::MySQL_Table($series->version); $query = "SELECT slice, unit FROM {$table} WHERE version = :version AND signature = :signature AND ( (:t0 >= t0 AND :t0 <= t1) OR (:t0 <= t0 AND :t1 >= t1) OR (:t1 >= t0 AND :t1 <= t1 ) )"; if($series->unit != null){ $parameters = array_merge($parameters, array(':unit' => $series->unit)); $query .= ' AND unit = :unit'; } $invalid_sphere = (int)($series->x === null) + (int)($series->y === null) + (int)($series->z === null) + (int)($series->r === null); if ($invalid_sphere != 0 && $invalid_sphere != 4) { throw new BadRequestException("Invalid sphere parameters"); } else if($invalid_sphere == 0) { $parameters = array_merge($parameters, array(':x' => $series->x, ':y' => $series->y, ':z' => $series->z, ':r' => $series->r)); $query .= ' AND SQRT(POW(x - :x, 2) + POW(y - :y, 2) + POW(z - :z, 2)) <= :r + r'; $filter_sphere = true; } else { $filter_sphere = false; } $query .= ' AND domain = :domain AND count > 0;'; $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); $select_stmt = $conn->prepare($query); $select_stmt->execute($parameters); $id_set = \Cassandra\Type::collection(\Cassandra\Type::bigint())->create(); $selected_slices = array_column($select_stmt->fetchAll(), 'unit', 'slice'); foreach ($selected_slices as $slice => $unit) $id_set->add(new \Cassandra\Bigint($slice)); $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, $this->_username, $this->_password, Config::config()::CASSANDRA_KEYSPACE); $is_digital = $series->unit()->is_digital(); $cassandra_table = "{$this->_domain}" . (($is_digital) ? '_dig' : ''); $cass_select_stmt = $session->prepare("SELECT * FROM {$cassandra_table} WHERE (id IN ?) AND (timestamp >= ?) AND (timestamp <= ?);"); $result_page = $session->execute($cass_select_stmt, array('arguments' => array($id_set, new \Cassandra\Varint($series->t0), new \Cassandra\Varint($series->t1)))); //ini_set('memory_limit', '-1'); // check this //ini_set('max_execution_time', 300); // check this $index = 0; $response_json = array('series' => array()); $aggregator = ($this->internal()) ? null : Aggregator::new($aggr); while($result_page) { foreach ($result_page as $column) { $slice = (int)$column['id']; $entry = $column['entry']; $time = (string)$column['timestamp']->value(); //bigint if($is_digital){ $value_blob = $entry->get('value')->toBinaryString(); if(isset($_GET['type'])){ $value = Unpack::uInt64($value_blob);// blob }else{ $value = Unpack::d64($value_blob);// blob } if($value === null) $value = $value_blob; // TODO: remove this in the new API } else { $value = (double)$entry->get('value'); // double } $error = (int)$entry->get('error'); //tinyint $conf = (int)$entry->get('confidence'); //tinyint $x = (int)$entry->get('x'); //int $y = (int)$entry->get('y'); //int $z = (int)$entry->get('z'); //int $d = (int)$entry->get('d'); //int $gw = (int)$entry->get('gw'); //int (TO DO GET GATEWAY) if ( $filter_sphere && !$series->contains_point($x, $y, $z) ) continue; $smartdata = null; switch ($series->version) { case SmartData::STATIC_VERSION: $smartdata = new StaticSmartData($series->unit,$value,$error,$conf,$x,$y,$z,$time,$d,$gw); break; case SmartData::MOBILE_VERSION: $smartdata = new MobileSmartData($series->unit,$value,$error,$conf,$x,$y,$z,$time,$d,$gw, null); break; default: continue 2; break; } if($aggregator != null) { $smartdata = $aggregator->aggregate($smartdata); } if($smartdata != null) { if(is_array($smartdata)){ foreach ($smartdata as $key => $data) { $response_json['series'][$index++] = $data->toArray(); if ($index >= Config::config()::POINTS_LIMIT) break; } } else { $response_json['series'][$index++] = $smartdata->toArray(); if($this->internal()){ $response_json['series'][$index-1]['slice'] = $slice; $response_json['series'][$index-1]['gw'] = $gw; } } } else { continue; } if ($index >= Config::config()::POINTS_LIMIT) break 2; } $result_page = $result_page->nextPage(); if(!$result_page && isset($aggregator) && $index < Config::config()::POINTS_LIMIT){ $smartdata = $aggregator->finish(); if($smartdata != null) { $response_json['series'][$index++] = $smartdata->toArray(); } } } $session->close(); usort($response_json['series'], function ($a, $b) { $a_timestamp = gmp_init($a['timestamp']); $b_timestamp = gmp_init($b['timestamp']); return gmp_cmp($a_timestamp, $b_timestamp); }); } catch (\Exception $e) { Logger::exception($e); $response_json = NULL; } finally { if (isset($session)) $session->close(); $conn = null; } return $response_json; } public function insertBatch(Series $series, SmartData ...$smart_data) { if ($smart_data[0] instanceof StaticSmartData) return $this->_insertStaticSmartDataBatch($series, ...$smart_data); else throw new BadRequestException("Error processing JSON request: unsupported SmartData version"); } // Insert a static version of smartdata private function _insertStaticSmartDataBatch(Series $series, StaticSmartData ...$smart_data) : bool { $sucess = true; $smart_data_count = count($smart_data); $parameters = array(':version' => $series->version, ':unit' => $series->unit, ':x' => $series->x, ':y' => $series->y, ':z' => $series->z, ':r' => $series->r, ':t0' => $series->t0, ':t1' => $series->t1, ':domain' => $this->_domain); try { $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); $table = Config::config()::MySQL_Table($series->version); $series_db = NULL; $stmt_select = $conn->prepare("SELECT * FROM {$table} WHERE version = :version AND unit = :unit AND x = :x AND y = :y AND z = :z AND r = :r AND t0 = :t0 AND t1 = :t1 AND domain = :domain ORDER BY id DESC LIMIT 1;"); $stmt_select->execute($parameters); $series_db = $stmt_select->fetch(\PDO::FETCH_OBJ); if ($series_db == NULL){// || (($series_db->count + $smart_data_count) >= Config\CASSANDRA_MAX_ROW_SIZE) ) { throw new Exception\NoMatchingSeriesException('No matching series'); }else{ //Logger::debug("Inserting a Batch of StaticSmartData"); $is_digital = $series->unit()->is_digital(); $entry_type = \Cassandra\Type::userType( 'value', ($is_digital) ? \Cassandra\Type::blob() : \Cassandra\Type::double(), 'error', \Cassandra\Type::tinyint(), 'confidence', \Cassandra\Type::tinyint(), 'x', \Cassandra\Type::int(), 'y', \Cassandra\Type::int(), 'z', \Cassandra\Type::int(), 'd', \Cassandra\Type::bigint(), 'gw', \Cassandra\Type::bigint() ); $cassandra_table = "{$this->_domain}" . (($is_digital) ? '_dig' : ''); /* change this */ // TODO: get this values from the smartdata object $slice = new \Cassandra\Bigint($series_db->slice); $error = new \Cassandra\Tinyint(0); $confidence = new \Cassandra\Tinyint(0); $gw = new \Cassandra\Bigint(-1); /* change this */ $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, $this->_username, $this->_password, Config::config()::CASSANDRA_KEYSPACE); $insert_stmt = $session->prepare("INSERT INTO {$cassandra_table} (id, timestamp, entry) VALUES (?, ?, ?);"); $batch = new \Cassandra\BatchStatement(\Cassandra::BATCH_LOGGED); $count = 0; foreach ($smart_data as $key => $data) { $batch->add($insert_stmt, array($slice, new \Cassandra\Varint($data->time), $entry_type->create( 'value', ($is_digital) ? new \Cassandra\Blob($data->value) : (double) $data->value, 'error', $error, 'confidence', $confidence, 'x', (int)$data->x, 'y', (int)$data->y, 'z', (int)$data->z, 'd', new \Cassandra\Bigint($data->dev), 'gw', $gw ))); $count++; if($count == 400){ //echo " Inserted\n"; $session->execute($batch); $count = 0; $batch = new \Cassandra\BatchStatement(\Cassandra::BATCH_LOGGED); } } if($count > 0){ $session->execute($batch); $count = 0; } $update_stmt = $conn->prepare("UPDATE {$table} SET count = count + :smart_data_count WHERE id = :id LIMIT 1;"); $update_stmt->execute(array('id' => $series_db->id, 'smart_data_count' => $smart_data_count)); } } catch (\Exception $e) { Logger::exception($e); $sucess = false; } finally { if (isset($session)) $session->close(); $conn = null; } return $sucess; } private function _insertMobileSmartDataBatch(Series $series, MobileSmartData ...$smart_data) : bool { throw new Exception\NotImplementedException("Batch insertion has not been implemented yet"); } public function assignConfidence($t0, $t1, $max_time_diff, $query_filename="nn_input.csv", $confidence_filename="nn_output.csv", $train=false, $get=false) : bool { return true; } public function assign_confidence($smart_data) { $pushd = 'pushd '.Config::config()::CONFIDENCE_ASSIGNER; $popd = 'popd'; $json = $smart_data->toJson(); //$return = shell_exec("{$pushd}; ./assign '{$json}'; {$popd};"); preg_match('#\{(.*?)\}#', $return??'', $match); $confidence = $match[1] ?? $smart_data->confidence; //Logger::debug('assigned confidence: '.$confidence); return $smart_data->confidence = $confidence; } } ## Backend of API version 1.1 ############################################################################## class Backend_V1_1 extends Backend_Common { public function create($to_be_created) { switch($to_be_created->version) { case SmartData::STATIC_VERSION: self::debug_X("Creating Series : {$to_be_created}"); return $this->_createSeries($to_be_created); case SmartData::MOBILE_VERSION: self::debug_X("Creating Tracker : {$to_be_created}"); return $this->_createTracker($to_be_created); default: throw new BadRequestException("Error processing request: unsupported Series/Tracker version [{$to_be_attached->version}]"); } } public function attach($to_be_attached) { switch($to_be_attached->version) { case SmartData::STATIC_VERSION: self::debug_X("Attaching Series : {$to_be_attached}"); return $this->_attachSeries($to_be_attached); case SmartData::MOBILE_VERSION: self::debug_X("Attaching Tracker : {$to_be_attached}"); return $this->_attachTracker($to_be_attached); default: throw new BadRequestException("Error processing request: unsupported Series/Tracker version [{$to_be_attached->version}]"); return null; } } // Insert SmartData into series/tracker matched public function insert(SmartData $smart_data) { if ($smart_data instanceof StaticSmartData) { self::debug_X("Inserting a Static SmartData : {$smart_data}"); return $this->_insertStaticSmartData($smart_data); } else if ($smart_data instanceof MobileSmartData){ self::debug_X("Inserting a Mobile SmartData : {$smart_data}"); return $this->_insertMobileSmartData($smart_data); } else throw new BadRequestException("Error processing request: unsupported SmartData version"); } public function insertBatch(...$batch) { if ($batch[0] instanceof Series){ self::debug_X('Batching Series : ['.count($batch).']'); return $this->_insertStaticSmartDataBatch(...$batch); } else if ($batch[0] instanceof Tracker){ self::debug_X('Batching Tracker : ['.count($batch).']'); throw new Exception\NotImplementedException("Batching Tracker has not been implemented yet"); //return $this->_insertMobileSmartDataBatch(...$batch); } else throw new BadRequestException("Error processing request: unsupported data version"); } // Create a new series if not exist private function _createSeries(Series $series) : bool { $sucess = true; $parameters = array(':version' => $series->version, ':unit' => $series->unit, ':x' => $series->x, ':y' => $series->y, ':z' => $series->z, ':r' => $series->r, //':dev' => $series->dev, // TODO: The series doesn't has a dev ':t0' => $series->t0, ':t1' => $series->t1, ':workflow' => $series->workflow ?? 0, ':domain' => $this->_domain); try { $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); $table = Config::config()::MySQL_Table($series->version); $stmt_select = $conn->prepare("SELECT * FROM {$table} WHERE version = :version AND unit = :unit AND (SQRT(POW(x - :x, 2) + POW(y - :y, 2) + POW(z - :z, 2))) <= (r + :r) AND ((:t0 >= t0 AND :t0 <= t1) OR (:t0 <= t0 AND :t1 >= t1) OR (:t1 >= t0 AND :t1 <= t1 )) AND workflow = :workflow AND domain = :domain;"); $stmt_select->execute($parameters); $series_set = $stmt_select->fetch(\PDO::FETCH_OBJ); if ($series_set == NULL) { self::debug_X('The Series did not exist before'); $stmt_insert = $conn->prepare("INSERT INTO {$table} VALUES (DEFAULT, :version, :unit, :x, :y, :z, :r, DEFAULT, :t0, :t1, :workflow, :domain, DEFAULT);"); $stmt_insert->execute($parameters); } else { $rows_table = '`rows`'; foreach ($series_set as $series_aux) { $dist = SQRT(POW($series_aux->x - $series->x, 2) + POW($series_aux->y - $series->y, 2) + POW($series_aux->z - $series->z, 2)); $t0 = ($series_aux->t0 <= $series->t0) ? $series_aux->t0 : $series->t0; $t1 = ($series_aux->t1 >= $series->t1) ? $series_aux->t0 : $series->t1; if ($series_aux->r >= ($dist + $series->r)) { //new series is spatially contained by the already inserted one $x = $series_aux->x; $y = $series_aux->y; $z = $series_aux->z; $r = $series_aux->r; } else if ($series->r >= ($dist + $series_aux->r)) { //already inserted series is spatially contained by the new one $x = $series->x; $y = $series->y; $z = $series->z; $r = $series->r; } else { $r = ($series->r + $dist + $series_aux->r) $x = ($series_aux->x + ($series_aux->x - $series->x) * ($r - $series_aux->r)) / $dist; $y = ($series_aux->y + ($series_aux->y - $series->y) * ($r - $series_aux->r)) / $dist; $z = ($series_aux->z + ($series_aux->z - $series->z) * ($r - $series_aux->r)) / $dist; //http://answers.google.com/answers/threadview/id/342125.html } $parameters = array(':x' => $x, ':y' => $y, ':z' => $z, ':r' => $r, ':t0' => $t0, ':t1' => $t1); if (isset($series->id)) { $update_stmt = $conn->prepare("UPDATE {$table} SET x = :x, z = :z, r = :r, t0 = :t0, t1 = :t1 WHERE id = {$series->id};"); $update_stmt->execute($parameters); $update_stmt = $conn->prepare("UPDATE {$rows_table} SET series_id = {$series->id} WHERE series_id = {$series_aux->id};"); $update_stmt->execute(); $update_stmt = $conn->prepare("DELETE FROM {$table} WHERE id = {$series_aux->id};"); $update_stmt->execute(); } else { $update_stmt = $conn->prepare("UPDATE {$table} SET x = :x, z = :z, r = :r, t0 = :t0, t1 = :t1 WHERE id = {$series_aux->id};"); $update_stmt->execute($parameters); $series->id = $series_aux->id; } $series->x = $x; $series->y = $y; $series->z = $z; $series->r = $r; $series->t0 = $t0; $series->t1 = $t1; } //self::debug_X($series); } } catch (\Exception $e) { Logger::exception($e); $sucess = false; } finally { $conn = null; } return $sucess; } // Create a new tracker if not exist private function _createTracker(Series $tracker) : bool { $sucess = true; $parameters = array(':version' => $tracker->version, ':unit' => $tracker->unit, ':signature' => 0, // TODO: include signature ':t0' => $tracker->t0, ':t1' => $tracker->t1, ':dev' => $tracker->dev, ':workflow' => $tracker->workflow ?? 0, ':domain' => $this->_domain); try { $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); $table = Config::config()::MySQL_Table($tracker->version); $stmt_select = $conn->prepare("SELECT * FROM {$table} WHERE version = :version AND unit = :unit AND dev = :dev AND t0 = :t0 AND t1 = :t1 AND signature = :signature AND workflow = :workflow AND domain = :domain ORDER BY count ASC LIMIT 1;"); $stmt_select->execute($parameters); $selected_tracker = $stmt_select->fetch(\PDO::FETCH_OBJ); if ($selected_tracker == NULL) { self::debug_X('The Tracker did not exist before'); $stmt_insert = $conn->prepare("INSERT INTO {$table} VALUES (DEFAULT, :version, :unit, :dev, :t0, :t1, :signature, DEFAULT, :workflow, :domain, (SELECT counter(:domain)), DEFAULT);"); $stmt_insert->execute($parameters); } else { //self::debug_X($selected_tracker); } } catch (\Exception $e) { Logger::exception($e); $sucess = false; } finally { $conn = null; } return $sucess; } // Create a new series if there is not one that contains it private function _attachSeries(Series $series) : bool { $sucess = true; $parameters = array(':version' => $series->version, ':unit' => $series->unit, ':x' => $series->x, ':y' => $series->y, ':z' => $series->z, ':r' => $series->r, //':dev' => $series->dev, // TODO: The series doesn't has a dev ':t0' => $series->t0, ':t1' => $series->t1, ':workflow' => $series->workflow ?? 0, ':domain' => $this->_domain); try { $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); $table = Config::config()::MySQL_Table($series->version); $stmt_select = $conn->prepare("SELECT * FROM {$table} WHERE version = :version AND unit = :unit AND r >= (SQRT(POW(x - :x, 2) + POW(y - :y, 2) + POW(z - :z, 2)) + :r) AND t0 <= :t0 AND t1 >= :t1 AND workflow = :workflow AND domain = :domain ORDER BY r ASC LIMIT 1;"); $stmt_select->execute($parameters); $series = $stmt_select->fetch(\PDO::FETCH_OBJ); if ($series == NULL) { self::debug_X('The Series did not exist before'); $stmt_insert = $conn->prepare("INSERT INTO {$table} VALUES (DEFAULT, :version, :unit, :x, :y, :z, :r, DEFAULT, :t0, :t1, :workflow, :domain, DEFAULT);"); $stmt_insert->execute($parameters); } else { //self::debug_X($series); } } catch (\Exception $e) { Logger::exception($e); $sucess = false; } finally { $conn = null; } return $sucess; } // Create a new series if there is not one that contains it private function _attachTracker(Series $tracker) : bool { $sucess = true; $parameters = array(':version' => $tracker->version, ':unit' => $tracker->unit, ':signature' => 0, // TODO: include signature ':t0' => $tracker->t0, ':t1' => $tracker->t1, ':dev' => $tracker->dev, ':workflow' => $tracker->workflow ?? 0, ':domain' => $this->_domain); try { $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); $table = Config::config()::MySQL_Table($tracker->version); $stmt_select = $conn->prepare("SELECT * FROM {$table} WHERE version = :version AND unit = :unit AND dev = :dev AND t0 <= :t0 AND t1 >= :t1 AND signature = :signature AND workflow = :workflow AND domain = :domain ORDER BY count ASC LIMIT 1;"); $stmt_select->execute($parameters); $selected_tracker = $stmt_select->fetch(\PDO::FETCH_OBJ); if ($selected_tracker == NULL) { self::debug_X('The Tracker did not exist before'); $stmt_insert = $conn->prepare("INSERT INTO {$table} VALUES (DEFAULT, :version, :unit, :dev, :t0, :t1, :signature, DEFAULT, :workflow, :domain, (SELECT counter(:domain)), DEFAULT);"); $stmt_insert->execute($parameters); } else { //self::debug_X($selected_tracker); } } catch (\Exception $e) { Logger::exception($e); $sucess = false; } finally { $conn = null; } return $sucess; } // Insert a static version of smartdata private function _insertStaticSmartData(StaticSmartData $smart_data) : bool { $sucess = true; $parameters = array(':unit' => $smart_data->unit, ':x' => $smart_data->x, ':y' => $smart_data->y, ':z' => $smart_data->z, //':dev' => $smart_data->dev, // TODO: The series doesn't has a dev ':time' => $smart_data->time, ':domain' => $this->_domain); $is_digital = $smart_data->unit()->is_digital(); try { Logger::time_elapsed(true); $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); $table = Config::config()::MySQL_Table($smart_data->version); $stmt_select = $conn->prepare("SELECT * FROM {$table} WHERE unit = :unit AND (SQRT(POW(x - :x, 2) + POW(y - :y, 2) + POW(z - :z, 2))) <= r AND :time >= t0 AND :time <= t1 AND domain = :domain;"); $stmt_select->execute($parameters); $series_set = $stmt_select->fetchAll(\PDO::FETCH_OBJ); Logger::time_elapsed(); if ($series_set == NULL) { throw new Exception\NoMatchingSeriesException('No matching series'); } else { // the series exists $rows_table = '`rows`'; // TODO: move to Config file (row table name) $count_limit = Config::config()::CASSANDRA_MAX_ROW_SIZE; foreach ($series_set as $series) { $series->row = NULL; $first = true; do { $stmt_select_row = $conn->prepare("SELECT * FROM {$rows_table} WHERE series_id in (:series_id) AND x = :x AND y = :y AND z = :z AND dev = :dev AND count < {$count_limit} LIMIT 1;"); $parameters_row = array(':series_id' => $series->id, ':x' => $smart_data->x, ':y' => $smart_data->y, ':z' => $smart_data->z, ':dev' => $smart_data->dev); $stmt_select_row->execute($parameters_row); $series->row = $stmt_select_row->fetch(\PDO::FETCH_OBJ); if ($first && $series->row==NULL) { self::debug_X('There were no rows before'); $stmt_insert_row = $conn->prepare("INSERT INTO {$rows_table} VALUES (DEFAULT, :x, :y, :z, :dev, DEFAULT, (SELECT counter(:domain)), :series_id, DEFAULT);"); $stmt_insert_row->execute(array_merge($parameters_row, array(':domain' => $this->_domain))); $first = false; } else { break; } } while (true); } } Logger::time_elapsed(); $entry_type = \Cassandra\Type::userType( 'value', ($is_digital) ? \Cassandra\Type::blob() : \Cassandra\Type::double(), 'error', \Cassandra\Type::tinyint(), 'confidence', \Cassandra\Type::tinyint(), 'gw', \Cassandra\Type::bigint() ); foreach ($series_set as $series) { self::debug_X('Inserting StaticSmartData on series ['.$series->id.'] row ['.$series->row->row_id.']'); $_smart_data = null; if($series->workflow ?? false){ self::debug_X('Enter workflow'); $_smart_data = clone $smart_data; $_smart_data = $this->_workflow_input($_smart_data, $series->workflow); }else{ $_smart_data = $smart_data; } if($_smart_data === null) continue; $arguments = array( new \Cassandra\Bigint($series->row->row_id), new \Cassandra\Varint($_smart_data->time), $entry_type->create( 'value', ($is_digital) ? new \Cassandra\Blob($_smart_data->value) : (double) $_smart_data->value, 'error', new \Cassandra\Tinyint($_smart_data->error), 'confidence', new \Cassandra\Tinyint($_smart_data->confidence), 'gw', new \Cassandra\Bigint($this->_gw_id) ) ); // TODO: alter the cassandra table's name to Config file $cassandra_table = $this->_domain.'_v1_1_'; $cassandra_table .= $is_digital ? 'digital' : 'si'; // ----- $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, $this->_username, $this->_password, Config::config()::CASSANDRA_KEYSPACE); $insert_stmt = $session->prepare("INSERT INTO {$cassandra_table} (id, timestamp, entry) VALUES (?, ?, ?);"); $session->execute($insert_stmt, array('arguments' => $arguments)); $update_stmt = $conn->prepare("UPDATE {$rows_table} SET count = count + 1 WHERE id = :id LIMIT 1;"); $update_stmt->execute(array('id' => $series->row->id)); } Logger::time_elapsed(); } catch (\Exception $e) { Logger::exception($e); $sucess = false; } finally { if (isset($session)) $session->close(); $conn = null; } return $sucess; } private function _process_exists($proc_id) { $exists= false; exec("ps -A | grep -i $proc_id | grep -v grep", $pids); if (!empty($pids)) { $exists = true; } return ($exists || file_exists("/proc/{$proc_id}")); } private function _initialize_daemon($workflow) { $_root = __DIR__; //self::debug_X("root at: {$_root}"); if (file_exists("{$_root}/../workflow/{$this->_domain}/{$workflow}_daemon")) { $alive = false; try { if(file_exists("{$_root}/../workflow/{$this->_domain}/{$workflow}_pid")) { $pid_file = fopen("{$_root}/../workflow/{$this->_domain}/{$workflow}_pid", "r"); if($pid_file) { $pid = fgets($pid_file); fclose($pid_file); $alive = $this->_process_exists($pid) || $this->_process_exists("{$workflow}_daemon"); //maybe this string check is not valid } } } catch (Exception $e) { $alive = false; } if (!($alive)) { if (file_exists("{$_root}/../workflow/{$this->_domain}/{$workflow}_error.txt")) unlink("{$_root}/../workflow/{$this->_domain}/{$workflow}_error.txt"); if (file_exists("{$_root}/../workflow/{$this->_domain}/{$workflow}_pid")) unlink("{$_root}/../workflow/{$this->_domain}/{$workflow}_pid"); if (file_exists("{$_root}/../workflow/{$this->_domain}/{$workflow}_input")) unlink("{$_root}/../workflow/{$this->_domain}/{$workflow}_input"); if (file_exists("{$_root}/../workflow/{$this->_domain}/{$workflow}_output")) unlink("{$_root}/../workflow/{$this->_domain}/{$workflow}_output"); $descriptorspec = array( 0 => array("pipe", "r"), // stdin is a pipe that the child will read from 1 => array("pipe", "w"), // stdout is a pipe that the child will write to 2 => array("file", "{$_root}/../workflow/{$this->_domain}/{$workflow}_error.txt", "a") // stderr is a file to write to ); $path = "{$_root}/../workflow/{$this->_domain}"; $pid = proc_open("./{$workflow}_daemon $> {$workflow}_results.log", $descriptorspec, $pipes, $path); $timeout = 100000; //what is the best value? we can use a timeout in seconds also while (!(file_exists("{$_root}/../workflow/{$this->_domain}/{$workflow}_pid")) && $timeout >= 0) $timeout--; if($timeout < 0) { self::debug_X("Fail to initialize daemon for workflow {$this->_domain}/{$workflow}"); } } } else { self::debug_X("No daemon found for {$this->_domain}/{$workflow}"); } } private function _notify(\stdClass $json) { if(isset($json->notify)) { $notification = "Domain: {$this->_domain}. Data irregularity."; if(isset($json->notify->description)) { $notification .= " Description: {$json->notify->description}."; } if(isset($json->notify->severity)) { //threshold can be either defined by the workflow or by a standard value (100% in this case) // isset($json->notify->severity_threshold) $notification .= " Severity level: {$json->notify->severity}."; if ($json->notify->severity > 100) { //send mail or message bus } } //log notification self::debug_X($notification); } } private function _workflow_input($smart_data, $wf) { $_root = __DIR__; $_wf = "in{$wf}"; $pushd = "pushd {$_root}/../workflow/{$this->_domain} > /dev/null"; $popd = 'popd > /dev/null'; $this->_initialize_daemon($_wf); $json = "{\"smartdata\":" . $smart_data->toJson() . "}"; $return = shell_exec("{$pushd}; ./{$_wf} '{$json}'; {$popd};"); $json = json_decode($return, false, 512, JSON_BIGINT_AS_STRING); if (json_last_error() === JSON_ERROR_NONE){ $smart_data = SmartData::fromJson($json->smartdata); }else{ throw new Exception\InternalException('Error while processing workflow!'); } $this->_notify($json); return $smart_data; } private function _workflow_output($json, $wf) { $_root = __DIR__; $_wf = "out{$wf}"; $pushd = "pushd {$_root}/../workflow/{$this->_domain} > /dev/null"; $popd = 'popd > /dev/null'; $this->_initialize_daemon($_wf); $str_json = json_encode($json); $input = fopen("{$_root}/../workflow/{$this->_domain}/{$_wf}_input", "w"); fwrite($input, "$str_json\n"); fclose($input); $return = shell_exec("{$pushd}; ./{$_wf}; {$popd};"); $json = json_decode($return, false, 512, JSON_BIGINT_AS_STRING); if (json_last_error() !== JSON_ERROR_NONE){ throw new Exception\InternalException('Error while processing workflow!'); } return $json; } // Insert a mobile version of smartdata private function _insertMobileSmartData(MobileSmartData $smart_data) : bool { // TODO: check if it is working fine $sucess = true; $parameters = array(':version' => $smart_data->version, ':unit' => $smart_data->unit, ':signature' => $smart_data->signature, ':time' => $smart_data->time, ':dev' => $smart_data->dev, ':domain' => $this->_domain); $is_digital = $smart_data->unit()->is_digital(); try { $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); $table_trackers = Config::config()::MySQL_Table($smart_data->version); $tracker_set = NULL; $first = true; do { $sql="SELECT t.* FROM (SELECT *, min(count) as min FROM trackers WHERE version = :version AND unit = :unit AND signature = :signature AND :time >= t0 AND :time <= t1 AND dev = :dev AND domain = :domain GROUP BY version, unit, dev, t0, t1, signature, workflow, domain) as x inner join trackers as t on t.version = x.version AND t.unit = x.unit AND t.dev = x.dev AND t.t0 = x.t0 AND t.t1 = x.t1 AND t.signature = x.signature AND t.workflow = x.workflow AND t.domain = x.domain AND t.count = x.min GROUP BY version, unit, dev, t0, t1, signature, workflow, domain;"; $stmt_select = $conn->prepare($sql); $stmt_select->execute($parameters); $tracker_set = $stmt_select->fetchAll(\PDO::FETCH_OBJ); if ($tracker_set == NULL) { // TODO: change do NoMatchingTrackerException throw new Exception\NoMatchingSeriesException('No matching tracker'); } else { if ($first){ foreach ($tracker_set as $tracker) { if($tracker->count>=Config::config()::CASSANDRA_MAX_ROW_SIZE){ $stmt_insert = $conn->prepare("INSERT INTO {$table_trackers} VALUES (DEFAULT, :version, :unit, :dev, :t0, :t1, :signature, DEFAULT, :workflow, :domain, (SELECT counter(:domain)), DEFAULT);"); $stmt_insert->execute(array(':version' => $tracker->version, ':unit' => $tracker->unit, ':signature' => $tracker->signature, ':t0' => $tracker->t0, ':t1' => $tracker->t1, ':dev' => $tracker->dev, ':workflow' => $tracker->workflow, ':domain' => $this->_domain)); //$this->create(new Series($series->version, $series->unit, $series->x, $series->y, $series->z, $series->r, $series->t0, $series->t1)); //Logger::debug('creating new Tracker', true); } } $first = false; } else { break; } } } while (true); $entry_type = \Cassandra\Type::userType( 'value', ($is_digital) ? \Cassandra\Type::blob() : \Cassandra\Type::double(), 'error', \Cassandra\Type::tinyint(), 'confidence', \Cassandra\Type::tinyint(), 'x', \Cassandra\Type::int(), 'y', \Cassandra\Type::int(), 'z', \Cassandra\Type::int(), 'gw', \Cassandra\Type::bigint() ); foreach ($tracker_set as $tracker) { self::debug_X('Inserting MobileSmartData on Tracker ['.$tracker->id.'] row ['.$tracker->row_id.']'); // TODO: alter to workflow //if($tracker->workflow != 0) // Logger::debug('Workflow execution', true); $arguments = array( new \Cassandra\Bigint($tracker->row_id), new \Cassandra\Varint($smart_data->time), $entry_type->create( 'value', ($is_digital) ? new \Cassandra\Blob($smart_data->value) : (double) $smart_data->value, 'error', new \Cassandra\Tinyint($smart_data->error), 'confidence', new \Cassandra\Tinyint($smart_data->confidence), 'x', (int)$smart_data->x, 'y', (int)$smart_data->y, 'z', (int)$smart_data->z, 'gw', new \Cassandra\Bigint($this->_gw_id) ) ); // TODO: alter the cassandra table's name to Config file $cassandra_table = $this->_domain.'_v1_2_'; $cassandra_table .= $is_digital ? 'digital' : 'si'; $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, $this->_username, $this->_password, Config::config()::CASSANDRA_KEYSPACE); $insert_stmt = $session->prepare("INSERT INTO {$cassandra_table} (id, timestamp, entry) VALUES (?, ?, ?);"); $session->execute($insert_stmt, array('arguments' => $arguments)); $update_stmt = $conn->prepare("UPDATE {$table_trackers} SET count = count + 1 WHERE id = :id LIMIT 1;"); $update_stmt->execute(array('id' => $tracker->id)); } } catch (\Exception $e) { Logger::exception($e); $sucess = false; } finally { if (isset($session)) $session->close(); $conn = null; } return $sucess; } private function _insertStaticSmartDataBatch(Series $series, StaticSmartData ...$smart_data) : bool { if($series->r != 0) throw new Exception\NoMatchingSeriesException('Invalid request'); // TODO: change this exception type $sucess = true; $smart_data_count = count($smart_data); $parameters = array(':version' => $series->version, ':unit' => $series->unit, ':x' => $series->x, ':y' => $series->y, ':z' => $series->z, ':r' => $series->r, //':dev' => $series->dev, // TODO: The series doesn't has a dev ':t0' => $series->t0, ':t1' => $series->t1, ':workflow' => $series->workflow ?? 0, ':domain' => $this->_domain); try { $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); $table = Config::config()::MySQL_Table($series->version); $series_db = NULL; $stmt_select = $conn->prepare("SELECT * FROM {$table} WHERE version = :version AND unit = :unit AND x = :x AND y = :y AND z = :z AND r = :r AND t0 = :t0 AND t1 = :t1 AND workflow = :workflow AND domain = :domain LIMIT 1;"); $stmt_select->execute($parameters); $series_db = $stmt_select->fetch(\PDO::FETCH_OBJ); if ($series_db == NULL){// || (($series_db->count + $smart_data_count) >= Config\CASSANDRA_MAX_ROW_SIZE) ) { throw new Exception\NoMatchingSeriesException('No matching series'); }else{ $rows_table = '`rows`'; // TODO: move to Config file (row table name) $count_limit = Config::config()::CASSANDRA_MAX_ROW_SIZE; $series_db->row = NULL; $first = true; do { $stmt_select_row = $conn->prepare("SELECT * FROM {$rows_table} WHERE series_id = :series_id AND x = :x AND y = :y AND z = :z AND dev = :dev AND count < {$count_limit} LIMIT 1;"); $parameters_row = array(':series_id' => $series_db->id, ':x' => $series_db->x, ':y' => $series_db->y, ':z' => $series_db->z, ':dev' => $series->dev); // TODO: check this (remove the dev from series) $stmt_select_row->execute($parameters_row); $series_db->row = $stmt_select_row->fetch(\PDO::FETCH_OBJ); if ($first && $series_db->row==NULL) { self::debug_X('There were no rows before'); $stmt_insert_row = $conn->prepare("INSERT INTO {$rows_table} VALUES (DEFAULT, :x, :y, :z, :dev, DEFAULT, (SELECT counter(:domain)), :series_id, DEFAULT);"); $stmt_insert_row->execute(array_merge($parameters_row, array(':domain' => $this->_domain))); $first = false; } else { break; } } while (true); self::debug_X('Inserting StaticSmartDataBatch on Series ['.$series_db->id.'] row['.$series_db->row->id.']'); $is_digital = $series->unit()->is_digital(); $entry_type = \Cassandra\Type::userType( 'value', ($is_digital) ? \Cassandra\Type::blob() : \Cassandra\Type::double(), 'error', \Cassandra\Type::tinyint(), 'confidence', \Cassandra\Type::tinyint(), 'gw', \Cassandra\Type::bigint() ); // TODO: alter the cassandra table's name to Config file $cassandra_table = $this->_domain.'_v1_1_'; $cassandra_table .= $is_digital ? 'digital' : 'si'; // ----- // change this // TODO: get this values from the smartdata object $row_id = new \Cassandra\Bigint($series_db->row->row_id); $error = new \Cassandra\Tinyint(0); $confidence = new \Cassandra\Tinyint(0); $gw = new \Cassandra\Bigint($this->_gw_id); // change this $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, $this->_username, $this->_password, Config::config()::CASSANDRA_KEYSPACE); $insert_stmt = $session->prepare("INSERT INTO {$cassandra_table} (id, timestamp, entry) VALUES (?, ?, ?);"); $batch = new \Cassandra\BatchStatement(\Cassandra::BATCH_LOGGED); $count = 0; foreach ($smart_data as $key => $data) { $batch->add($insert_stmt, array($row_id, new \Cassandra\Varint($data->time), $entry_type->create( 'value', ($is_digital) ? new \Cassandra\Blob($data->value) : (double) $data->value, 'error', $error, 'confidence', $confidence, 'gw', $gw ))); $count++; if($count == 400){ $session->execute($batch); $count = 0; $batch = new \Cassandra\BatchStatement(\Cassandra::BATCH_LOGGED); } } if($count > 0){ $session->execute($batch); $count = 0; } $update_stmt = $conn->prepare("UPDATE {$rows_table} SET count = count + :smart_data_count WHERE id = :id LIMIT 1;"); $update_stmt->execute(array('id' => $series_db->row->id, 'smart_data_count' => $smart_data_count)); } } catch (\Exception $e) { Logger::exception($e); $sucess = false; } finally { if (isset($session)) $session->close(); $conn = null; } return $sucess; } private function _insertMobileSmartDataBatch(Series $series, MobileSmartData ...$smart_data) : bool { return true; } public function query(Series $series, $aggr=null, $options=null) { //self::debug_X("Query Series : {$series}"); $response_json = array('series' => array()); try { $parameters = array(':version' => $series->version, ':t0' => $series->t0, ':t1' => $series->t1, // series workflow is not linked to data gethering anymore. // Now, workflow id on get operations stands for the output worklow to be executed. //':workflow' => $series->workflow ?? 0, ':domain' => $this->_domain); $series_table = Config::config()::MySQL_Table($series->version); $rows_table = '`rows`'; // TODO: move to Config file (row table name) $query_series = "SELECT * FROM {$series_table} WHERE version = :version AND ( (:t0 >= t0 AND :t0 <= t1) OR (:t0 <= t0 AND :t1 >= t1) OR (:t1 >= t0 AND :t1 <= t1 ) )"; if($series->unit != null){ $parameters = array_merge($parameters, array(':unit' => $series->unit)); $query_series .= ' AND unit = :unit'; } $invalid_sphere = (int)($series->x === null) + (int)($series->y === null) + (int)($series->z === null) + (int)($series->r === null); if ($invalid_sphere != 0 && $invalid_sphere != 4) { throw new BadRequestException("Invalid sphere parameters"); } else if($invalid_sphere == 0) { $parameters = array_merge($parameters, array(':x' => $series->x, ':y' => $series->y, ':z' => $series->z, ':r' => $series->r)); $query_series .= ' AND SQRT(POW(x - :x, 2) + POW(y - :y, 2) + POW(z - :z, 2)) <= :r + r'; $filter_sphere = true; } else { $filter_sphere = false; } //workflow removed from comparation as comment above $query_series .= ' AND domain = :domain;'; $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD); $stmt_select = $conn->prepare($query_series); $stmt_select->execute($parameters); $series_set = $stmt_select->fetchAll(\PDO::FETCH_OBJ); if($series_set) { $selected_series = array_column($series_set, null, 'id'); // TODO: filter the xyz here $query_rows = "SELECT * FROM {$rows_table} WHERE series_id IN (".implode(',',array_column($series_set, 'id')).") AND dev = :dev;"; $stmt_select = $conn->prepare($query_rows); $stmt_select->execute(array(':dev' => $series->dev)); $rows_set = $stmt_select->fetchAll(\PDO::FETCH_OBJ); $selected_rows = array_column($rows_set, null, 'row_id'); $id_set = \Cassandra\Type::collection(\Cassandra\Type::bigint())->create(); foreach ($selected_rows as $row_id => $row_fields){ $id_set->add(new \Cassandra\Bigint($row_id)); } $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, $this->_username, $this->_password, Config::config()::CASSANDRA_KEYSPACE); $is_digital = $series->unit()->is_digital(); if($is_digital) { $type = $_GET['type'] ?? $options->type ?? (PHP_INT_MAX)>>29; $type = $type<<29; } // TODO: move the cassandra table's name to Config file $cassandra_table = $this->_domain.'_v1_1_'; $cassandra_table .= $is_digital ? 'digital' : 'si'; // ----- $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, $this->_username, $this->_password, Config::config()::CASSANDRA_KEYSPACE); $cass_select_stmt = $session->prepare("SELECT * FROM {$cassandra_table} WHERE (id IN ?) AND (timestamp >= ?) AND (timestamp <= ?);"); $result_page = $session->execute($cass_select_stmt, array('arguments' => array($id_set, new \Cassandra\Varint($series->t0), new \Cassandra\Varint($series->t1)))); //ini_set('memory_limit', '-1'); // check this //ini_set('max_execution_time', 300); // check this $index = 0; $aggregator = ($this->internal()) ? null : Aggregator::new($aggr); while($result_page) { foreach ($result_page as $column) { $row_id = (int)$column['id']; $entry = $column['entry']; $time = (string)$column['timestamp']->value(); //bigint if($is_digital){ $value_blob = $entry->get('value')->toBinaryString(); switch ($type) { case Unit::I32: $value = Unpack::uInt32($value_blob); break; case Unit::I64: $value = Unpack::uInt64($value_blob); break; case Unit::F32: $value = Unpack::f32($value_blob); break; case Unit::D64: $value = Unpack::d64($value_blob); break; default: if(strlen($value_blob) == 1) $value = Unpack::uInt8($value_blob); ////Logger::debug("size: ".strlen($value_blob), true); //$value = Unpack::uInt8($value_blob) ?? // Unpack::uInt32($value_blob) ?? // Unpack::uInt64($value_blob) ?? // Unpack::f32($value_blob) ?? // Unpack::d64($value_blob); break; } if($value === null){ if($value_blob != null) $value = $value_blob; // TODO: remove this in the new API else $value = -400; // TODO: remover isso quando valores inválidos não puderem mais ser inseridos } } else { $value = (double)$entry->get('value'); // double } $x = $selected_rows[$row_id]->x; $y = $selected_rows[$row_id]->y; $z = $selected_rows[$row_id]->z; // TODO: what about digital entries? $error = (int)$entry->get('error'); //tinyint $conf = (int)$entry->get('confidence'); //tinyint //$x = (int)$entry->get('x'); //int // ONLY MOBILE //$y = (int)$entry->get('y'); //int // ONLY MOBILE //$z = (int)$entry->get('z'); //int // ONLY MOBILE $d = 0; // TODO: what about the dev origin? $gw = (int)$entry->get('gw'); //int (TO DO GET GATEWAY) // TODO: check this filter here if ( $filter_sphere && !$series->contains_point($x, $y, $z) ) continue; $smartdata = null; switch ($series->version) { case SmartData::STATIC_VERSION: $smartdata = new StaticSmartData($series->unit,$value,$error,$conf,$x,$y,$z,$time,$d,$gw); break; case SmartData::MOBILE_VERSION: $smartdata = new MobileSmartData($series->unit,$value,$error,$conf,$x,$y,$z,$time,$d,$gw, null); break; default: continue 2; break; } if($aggregator != null) { $smartdata = $aggregator->aggregate($smartdata); } if($smartdata != null) { if(is_array($smartdata)){ foreach ($smartdata as $key => $data) { $response_json['series'][$index++] = $data->toArray(); if ($index >= Config::config()::POINTS_LIMIT) break; } } else { $response_json['series'][$index++] = $smartdata->toArray(); if($this->internal()){ $response_json['series'][$index-1]['slice'] = $row_id; $response_json['series'][$index-1]['gw'] = $gw; } } } else { continue; } if ($index >= Config::config()::POINTS_LIMIT) break 2; } $result_page = $result_page->nextPage(); if(!$result_page && isset($aggregator) && $index < Config::config()::POINTS_LIMIT){ $smartdata = $aggregator->finish(); if($smartdata != null) { $response_json['series'][$index++] = $smartdata->toArray(); } } } $session->close(); usort($response_json['series'], function ($a, $b) { $a_timestamp = gmp_init($a['timestamp']); $b_timestamp = gmp_init($b['timestamp']); return gmp_cmp($a_timestamp, $b_timestamp); }); //self::debug_X('Returning the query result'); } } catch (\Exception $e) { Logger::exception($e); $response_json = NULL; } finally { if (isset($session)) $session->close(); $conn = null; } $workflow = $series->workflow ?? 0; $_response_json = array(); if($response_json !== NULL && $workflow !== 0) { self::debug_X('Enter output workflow'); $_response_json = $response_json; $response_json = $this->_workflow_output($_response_json, $workflow); } return $response_json; } public function track(Series $series, $aggr=null) { return array('series' => array()); } } switch (Config_Common::api_version()) { case Config_Common::VERSION_1_0: class Backend extends Backend_V1_0 {} break; case Config_Common::VERSION_1_1: class Backend extends Backend_V1_1 {} break; default: die ("Bad API version!\n"); }