Skip to content
Snippets Groups Projects
Backend.php 109 KiB
Newer Older
root's avatar
root committed
                // change this

                $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, $this->_username, $this->_password, Config::config()::CASSANDRA_KEYSPACE);
                $insert_stmt = $session->prepare("INSERT INTO {$cassandra_table} (id, timestamp, entry) VALUES (?, ?, ?);");
                $batch = new \Cassandra\BatchStatement(\Cassandra::BATCH_LOGGED);
                $count = 0;
                foreach ($smart_data as $key => $data) {
                    $batch->add($insert_stmt, array($row_id,
                                                    new \Cassandra\Varint($data->time),
                                                    $entry_type->create(
                                                        'value',      ($is_digital) ? new \Cassandra\Blob($data->value) : (double) $data->value,
                                                        'error',      $error,
                                                        'confidence', $confidence,
                                                        'gw',         $gw
                                                    )));
                    $count++;
                    if($count == 400){
                        $session->execute($batch);
                        $count = 0;
                        $batch = new \Cassandra\BatchStatement(\Cassandra::BATCH_LOGGED);
                    }
                }

                if($count > 0){
                    $session->execute($batch);
                    $count = 0;
                }

                $update_stmt = $conn->prepare("UPDATE {$rows_table} SET count = count + :smart_data_count WHERE id = :id LIMIT 1;");
                $update_stmt->execute(array('id' => $series_db->row->id, 'smart_data_count' => $smart_data_count));
            }
        } catch (\Exception $e) {
            Logger::exception($e);
            $sucess = false;
        } finally {
            if (isset($session))
                $session->close();
            $conn = null;
        }

        return $sucess;
    }

    private function _insertMobileSmartDataBatch(Series $series, MobileSmartData ...$smart_data) : bool {
        return true;
    }

    public function query(Series $series, $aggr=null, $options=null) {
        //self::debug_X("Query Series : {$series}");

        $response_json = array('series' => array());

        try {

            $parameters = array(':version'  => $series->version,
                                ':t0'       => $series->t0,
                                ':t1'       => $series->t1,
				// series workflow is not linked to data gethering anymore.
				// Now, workflow id on get operations stands for the output worklow to be executed.
                                //':workflow' => $series->workflow ?? 0,
root's avatar
root committed
                                ':domain'   => $this->_domain);

            $series_table = Config::config()::MySQL_Table($series->version);
            $rows_table = '`rows`'; // TODO: move to Config file (row table name)

            $query_series = "SELECT * FROM {$series_table} WHERE version = :version AND ( (:t0 >= t0 AND :t0 <= t1) OR (:t0 <= t0 AND :t1 >= t1) OR (:t1 >= t0 AND :t1 <= t1 ) )";

            if($series->unit != null){
                $parameters = array_merge($parameters, array(':unit' => $series->unit));
                $query_series .= ' AND unit = :unit';
            }

            $invalid_sphere = (int)($series->x === null) +
                              (int)($series->y === null) +
                              (int)($series->z === null) +
                              (int)($series->r === null);

            if ($invalid_sphere != 0 && $invalid_sphere != 4) {
                throw new BadRequestException("Invalid sphere parameters");
            } else if($invalid_sphere == 0) {
                $parameters = array_merge($parameters, array(':x'    => $series->x,
                                                             ':y'    => $series->y,
                                                             ':z'    => $series->z,
                                                             ':r'    => $series->r));

                $query_series .= ' AND SQRT(POW(x - :x, 2) + POW(y - :y, 2) + POW(z - :z, 2)) <= :r + r';
                $filter_sphere = true;
            } else {
                $filter_sphere = false;
            }
	    //workflow removed from comparation as comment above
            $query_series .= ' AND domain = :domain;';
root's avatar
root committed

            $conn = self::_mysqlConnect(Config::config()::MYSQL_SEVERNAME, Config::config()::MYSQL_PORT, Config::config()::MYSQL_DBNAME, Config::config()::MYSQL_USERNAME, Config::config()::MYSQL_PASSWORD);
            $stmt_select = $conn->prepare($query_series);
            $stmt_select->execute($parameters);
            $series_set = $stmt_select->fetchAll(\PDO::FETCH_OBJ);
            if($series_set) {
                $selected_series = array_column($series_set, null, 'id');

                // TODO: filter the xyz here
                $query_rows = "SELECT * FROM {$rows_table} WHERE series_id IN (".implode(',',array_column($series_set, 'id')).") AND dev = :dev;";
                $stmt_select = $conn->prepare($query_rows);
                $stmt_select->execute(array(':dev' => $series->dev));

                $rows_set = $stmt_select->fetchAll(\PDO::FETCH_OBJ);
                $selected_rows = array_column($rows_set, null, 'row_id');

                $id_set = \Cassandra\Type::collection(\Cassandra\Type::bigint())->create();
                foreach ($selected_rows as $row_id => $row_fields){
                    $id_set->add(new \Cassandra\Bigint($row_id));
                }

                $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, $this->_username, $this->_password, Config::config()::CASSANDRA_KEYSPACE);

                $is_digital  = $series->unit()->is_digital();
                if($is_digital) {
                    $type = $_GET['type'] ?? $options->type ?? (PHP_INT_MAX)>>29;
                    $type = $type<<29;
                }

                // TODO: move the cassandra table's name to Config file
                $cassandra_table  = $this->_domain.'_v1_1_';
                $cassandra_table .= $is_digital ? 'digital' : 'si';
                // -----

                $session = self::_cassandraConnect(Config::config()::CASSANDRA_SERVERNAME, Config::config()::CASSANDRA_PORT, $this->_username, $this->_password, Config::config()::CASSANDRA_KEYSPACE);
                $cass_select_stmt = $session->prepare("SELECT * FROM {$cassandra_table} WHERE (id IN ?) AND (timestamp >= ?) AND (timestamp <= ?);");

                $result_page = $session->execute($cass_select_stmt,
                                array('arguments' => array($id_set, new \Cassandra\Varint($series->t0), new \Cassandra\Varint($series->t1))));

                //ini_set('memory_limit', '-1'); // check this
                //ini_set('max_execution_time', 300); // check this

                $index = 0;
                $aggregator = ($this->internal()) ? null : Aggregator::new($aggr);

                while($result_page) {
                    foreach ($result_page as $column) {
                        $row_id = (int)$column['id'];
                        $entry = $column['entry'];
                        $time  = (string)$column['timestamp']->value(); //bigint

                        if($is_digital){
                            $value_blob = $entry->get('value')->toBinaryString();

                            switch ($type) {
                                case Unit::I32:
                                    $value = Unpack::uInt32($value_blob);
                                    break;
                                case Unit::I64:
                                    $value = Unpack::uInt64($value_blob);
                                    break;
                                case Unit::F32:
                                    $value = Unpack::f32($value_blob);
                                    break;
                                case Unit::D64:
                                    $value = Unpack::d64($value_blob);
                                    break;
                                default:
                                    if(strlen($value_blob) == 1)
                                        $value = Unpack::uInt8($value_blob);
                                    ////Logger::debug("size: ".strlen($value_blob), true);
                                    //$value = Unpack::uInt8($value_blob)  ??
                                    //         Unpack::uInt32($value_blob) ??
                                    //         Unpack::uInt64($value_blob) ??
                                    //         Unpack::f32($value_blob)    ??
                                    //         Unpack::d64($value_blob);
                                    break;
                            }

                            if($value === null){
                                if($value_blob != null)
                                    $value = $value_blob; // TODO: remove this in the new API
                                else
                                    $value = -400; // TODO: remover isso quando valores inválidos não puderem mais ser inseridos
                            }
                        } else {
                            $value = (double)$entry->get('value'); // double
                        }

                        $x = $selected_rows[$row_id]->x;
                        $y = $selected_rows[$row_id]->y;
                        $z = $selected_rows[$row_id]->z;

                        // TODO: what about digital entries?
                        $error = (int)$entry->get('error');             //tinyint
                        $conf  = (int)$entry->get('confidence');        //tinyint
                        //$x     = (int)$entry->get('x');                 //int    // ONLY MOBILE
                        //$y     = (int)$entry->get('y');                 //int    // ONLY MOBILE
                        //$z     = (int)$entry->get('z');                 //int    // ONLY MOBILE
                        $d     = 0; // TODO: what about the dev origin?
                        $gw    = (int)$entry->get('gw');                //int (TO DO GET GATEWAY)

                        // TODO: check this filter here
                        if ( $filter_sphere && !$series->contains_point($x, $y, $z) )
                            continue;

                        $smartdata = null;

                        switch ($series->version) {
                            case SmartData::STATIC_VERSION:
                                $smartdata = new StaticSmartData($series->unit,$value,$error,$conf,$x,$y,$z,$time,$d,$gw);
                                break;
                            case SmartData::MOBILE_VERSION:
                                $smartdata = new MobileSmartData($series->unit,$value,$error,$conf,$x,$y,$z,$time,$d,$gw, null);
                                break;
                            default:
                                continue 2;
                                break;
                        }

                        if($aggregator != null) {
                            $smartdata = $aggregator->aggregate($smartdata);
                        }

                        if($smartdata != null) {
                            if(is_array($smartdata)){
                                foreach ($smartdata as $key => $data) {
                                    $response_json['series'][$index++] = $data->toArray();
                                    if ($index >= Config::config()::POINTS_LIMIT) break;
                                }
                            } else {
                                $response_json['series'][$index++] = $smartdata->toArray();
                                if($this->internal()){
                                    $response_json['series'][$index-1]['slice'] = $row_id;
                                    $response_json['series'][$index-1]['gw'] = $gw;
                                }
                            }
                        } else {
                            continue;
                        }

                        if ($index >= Config::config()::POINTS_LIMIT) break 2;
                    }
                    $result_page = $result_page->nextPage();

                    if(!$result_page && isset($aggregator) && $index < Config::config()::POINTS_LIMIT){
                        $smartdata = $aggregator->finish();
                        if($smartdata != null) {
                            $response_json['series'][$index++] = $smartdata->toArray();
                        }
                    }

                }

                $session->close();

                usort($response_json['series'], function ($a, $b) {
                    $a_timestamp  = gmp_init($a['timestamp']);
                    $b_timestamp  = gmp_init($b['timestamp']);
                    return gmp_cmp($a_timestamp, $b_timestamp);
                });
                //self::debug_X('Returning the query result');
            }
        } catch (\Exception $e) {
            Logger::exception($e);
            $response_json = NULL;
        } finally {
            if (isset($session))
                $session->close();
            $conn = null;
        }

        $workflow = $series->workflow ?? 0;
	$_response_json = array();
        if($response_json !== NULL && $workflow !== 0) {
	    self::debug_X('Enter output workflow');
            $_response_json = $response_json;
            $response_json = $this->_workflow_output($_response_json, $workflow);
        }

root's avatar
root committed
        return $response_json;
    }

    public function track(Series $series, $aggr=null) {
        return array('series' => array());
    }
}

switch (Config_Common::api_version()) {
    case Config_Common::VERSION_1_0: class Backend extends Backend_V1_0 {} break;
    case Config_Common::VERSION_1_1: class Backend extends Backend_V1_1 {} break;
    default:
        die ("Bad API version!\n");
}