<?php
namespace SmartData;
require_once( __DIR__ . '/Exception.php');
final class Aggregator
{
public static function new($parameters){
try {
if(!isset($parameters->name))
return null;
switch ($parameters->name) {
case 'dbp':
case 'lastValue':
case 'stdbp':
return new Trickle($parameters);
case 'min':
case 'max':
return new Limit($parameters);
case 'mean':
return new Mean($parameters);
case 'drift':
return new Drift($parameters);
case 'constantGain':
return new ConstantGain($parameters);
case 'constantBias':
return new ConstantBias($parameters);
case 'stuckAt':
return new StuckAt($parameters);
case 'allAnomalies':
return new AllAnomalies($parameters);
case 'stdDevFilter':
return new StdDevFilter($parameters);
case 'lowerThan':
return new LowerThan($parameters);
case 'higherThan':
return new HigherThan($parameters);
case 'confidence':
return new Confidence($parameters);
case 'can':
return new CAN($parameters);
default:
break;
}
} catch (Exception\CantCreateException $e) {
return null;
}
}
}
interface iAggregator
{
public function __construct($parameters);
public function aggregate(SmartData $smartdata);
public function finish();
}
class Confidence implements iAggregator
{
public function __construct($parameters) { }
public function aggregate(SmartData $smartdata) {
$smartdata->value = $smartdata->confidence;
return $smartdata;
}
public function finish() { return null; }
}
abstract class Threshold implements iAggregator
{
public function __construct($parameters) {
if (isset($parameters->parameter)){
$this->threshold = $parameters->parameter;
return $this;
} else {
throw (new Exception\CantCreateException());
}
}
public function finish() { return null; }
protected $threshold;
}
class LowerThan extends Threshold
{
public function aggregate(SmartData $smartdata) {
return ($smartdata->value > $this->threshold) ? null : $smartdata;
}
}
class HigherThan extends Threshold
{
public function aggregate(SmartData $smartdata) {
return ($smartdata->value < $this->threshold) ? null : $smartdata;
}
}
class Trickle implements iAggregator
{
public function __construct($parameters) {
if (isset($parameters)){
switch ($parameters->name) {
case 'dbp':
$e = isset($parameters->parameter) ? $parameters->parameter : 2;
$this->predictor = new DBP(10, 3, $e, $e, 0);
break;
case 'lastValue':
$threshold = isset($parameters->parameter) ? $parameters->parameter : 0;
$this->predictor = new LastValue($threshold);
break;
default:
throw (new Exception\CantCreateException());
break;
}
$this->count_models = isset($parameters->range);
} else {
throw (new Exception\CantCreateException());
}
}
public function aggregate(SmartData $smartdata) {
$this->predictor->trickle($smartdata->time, $smartdata->value);
$smartdata->value = $this->predictor->predict($smartdata->time);
if($this->count_models && !$this->predictor->new_model)
return null;
return $smartdata;
}
public function finish() { return null; }
private $predictor;
private $count_models;
}
class Limit implements iAggregator
{
private const Min = 0;
private const Max = 1;
public function __construct($parameters) {
$this->begin = $this->selected = null;
if(isset($parameters->range) && $parameters->range > 0) {
$this->range = $parameters->range;
switch ($parameters->name) {
case 'min': $this->limit = Limit::Min; break;
case 'max': $this->limit = Limit::Max; break;
default:
throw (new Exception\CantCreateException());
break;
}
} else {
throw (new Exception\CantCreateException());
}
}
public function aggregate(SmartData $smartdata) {
if($this->begin == null) {
$this->begin = $smartdata->time;
$this->selected = clone $smartdata;
} else {
if($smartdata->time > ($this->begin+$this->range)) {
$ret = clone $this->selected;
$this->begin = null;
$this->selected = clone $smartdata;
return $ret;
} else {
if( ($this->limit == Limit::Min && $smartdata->value < $this->selected->value) ||
($this->limit == Limit::Max && $smartdata->value > $this->selected->value) )
$this->selected = clone $smartdata;
}
}
$this->last = $smartdata->time;
return null;
}
public function finish() { return null; }
private $begin;
private $last;
private $range;
private $selected;
private $limit;
}
class Mean implements iAggregator
{
public function __construct($parameters) {
$this->begin = null;
$this->count = $this->sum = 0;
if(isset($parameters->range) && $parameters->range > 0) {
$this->range = $parameters->range;
} else {
throw (new Exception\CantCreateException());
}
}
public function aggregate(SmartData $smartdata) {
$ret = null;
if($this->begin == null) {
$this->begin = $smartdata->time;
} else {
if($smartdata->time > ($this->begin+$this->range)) {
$ret = clone $this->last;
$ret->time = (int)($this->begin + $this->last->time)/2;
$ret->value = $this->sum/$this->count;
$this->begin = $smartdata->time;
$this->count = $this->sum = 0;
}
}
$this->last = clone $smartdata;
$this->sum += (float)$smartdata->value;
$this->count++;
return $ret;
}
public function finish() { return null; }
private $range;
private $begin;
private $last;
private $sum;
private $count;
}
/*
Drift, StuckAt, ConstantBias, ConstantGain
For this anomalies, there are some parameters:
* delay (us):
describes the time window from the first data-point to the first anomaly
* range (us):
if greater than 0 (>0), defines the time window to apply the anomaly
if equal to 0 (=0), apply the anomaly to all the data after the delay
if less than 0 (<0), ThrowException
* spacing (us):
if greater than 0 (>0), the anomaly is repeated through the execution separated by spacing
* parameter:
Drift: defines the slope, used on the calculus of the new value
StuckAt: not used
ConstantBias: defines the bias
ConstantGain: defines the gain
allAnomalies
For this anomaly inserter the parameters are:
* delay (us):
describes the time window from the first data-point to the first anomaly
* range (us):
if greater than 0 (>0), defines the time window to apply the anomaly
if less than 0 (<=0), ThrowException
* spacing (us):
if greater than 0 (>0), the anomaly is repeated through the execution separated by spacing
* drift:
defines the parameter for Drift, if 0, drift is not Applied
* stuck:
stuckAt has no parameter, but if 0, stuckAt is not Applied
* bias:
defines the parameter for ConstantBias, if 0, constantBias is not Applied
* gain:
defines the parameter for ConstantGain, if 0, constantGain is not Applied
*/
class Drift implements iAggregator
{
public function __construct($parameters) {
$this->begin = null;
$this->count = 0;
if(isset($parameters->range) && $parameters->range >= 0) {
$this->range = $parameters->range;
if (isset($parameters->parameter)) {
$this->slope = $parameters->parameter;
} else {
$this->slope = 0;
}
if (isset($parameters->delay) && $parameters->delay > 0) {
$this->delay = $parameters->delay;
} else {
$this->delay = 0;
}
if (isset($parameters->spacing) && $parameters->spacing > 0) {
$this->spacing = $parameters->spacing;
} else {
$this->spacing = 0;
}
} else {
throw (new Exception\CantCreateException());
}
}
public function aggregate(SmartData $smartdata) {
$ret = null;
if($this->begin == null) {
$this->begin = $smartdata->time + $this->delay;
}
if ($this->range === 0 && $smartdata->time > $this->begin) {
$ret = clone $smartdata;
$ret->value = (float)$smartdata->value + (float)($this->slope * ($this->count + 1));
$this->count++;
} else {
if($smartdata->time > ($this->begin+$this->range) || $smartdata->time < $this->begin) {
$ret = clone $smartdata;
$this->count = 0;
if ($this->spacing > 0) {
if ($smartdata->time > ($this->begin+$this->range+$this->spacing)) {
$this->begin = $smartdata->time;
$ret = clone $smartdata;
$ret->value = (float)$smartdata->value + (float)($this->slope * ($this->count + 1));
$this->count++;
}
}
} else {
$ret = clone $smartdata;
$ret->value = (float)$smartdata->value + (float)($this->slope * ($this->count + 1));
$this->count++;
}
}
return $ret;
}
public function finish() { return null; }
private $range;
private $begin;
private $delay;
private $spacing;
private $slope;
private $count;
}
class StuckAt implements iAggregator
{
public function __construct($parameters) {
$this->begin = null;
$this->count = 0;
if(isset($parameters->range) && $parameters->range >= 0) {
$this->range = $parameters->range;
$this->value = (float)0;
if (isset($parameters->delay) && $parameters->delay > 0) {
$this->delay = $parameters->delay;
} else {
$this->delay = 0;
}
if (isset($parameters->spacing) && $parameters->spacing > 0) {
$this->spacing = $parameters->spacing;
} else {
$this->spacing = 0;
}
} else {
throw (new Exception\CantCreateException());
}
}
public function aggregate(SmartData $smartdata) {
$ret = null;
if($this->begin == null) {
$this->begin = $smartdata->time + $this->delay;
$this->value = $smartdata->value;
}
if ($this->range === 0 && $smartdata->time > $this->begin) {
$ret = clone $smartdata;
$ret->value = $this->value;
$this->count++;
} else {
if($smartdata->time > ($this->begin+$this->range) || $smartdata->time < $this->begin) {
$ret = clone $smartdata;
$this->count = 0;
$this->value = $smartdata->value;
if ($this->spacing > 0) {
if ($smartdata->time > ($this->begin+$this->range+$this->spacing)) {
$this->begin = $smartdata->time;
$ret = clone $smartdata;
$ret->value = $this->value;
$this->count++;
}
}
} else {
$ret = clone $smartdata;
$ret->value = $this->value;
$this->count++;
}
}
return $ret;
}
public function finish() { return null; }
private $range;
private $begin;
private $delay;
private $spacing;
private $value;
private $count;
}
class ConstantBias implements iAggregator
{
public function __construct($parameters) {
$this->begin = null;
$this->count = 0;
if(isset($parameters->range) && $parameters->range >= 0) {
$this->range = $parameters->range;
if (isset($parameters->parameter)) {
$this->bias = (float)$parameters->parameter;
} else {
$this->bias = 0;
}
if (isset($parameters->delay) && $parameters->delay > 0) {
$this->delay = $parameters->delay;
} else {
$this->delay = 0;
}
if (isset($parameters->spacing) && $parameters->spacing > 0) {
$this->spacing = $parameters->spacing;
} else {
$this->spacing = 0;
}
} else {
throw (new Exception\CantCreateException());
}
}
public function aggregate(SmartData $smartdata) {
$ret = null;
if($this->begin == null) {
$this->begin = $smartdata->time + $this->delay; //seting a time window before applying the aggr
}
if ($this->range === 0 && $smartdata->time > $this->begin) {
$ret = clone $smartdata;
$ret->value = (float)$smartdata->value + ($this->bias);
$this->count++;
} else {
if($smartdata->time > ($this->begin+$this->range) || $smartdata->time < $this->begin) {
$ret = clone $smartdata;
$this->count = 0;
if ($this->spacing > 0) {
if ($smartdata->time > ($this->begin+$this->range+$this->spacing)) {
$this->begin = $smartdata->time;
$ret = clone $smartdata;
$ret->value = (float)$smartdata->value + ($this->bias);
$this->count++;
}
}
} else {
$ret = clone $smartdata;
$ret->value = (float)$smartdata->value + ($this->bias);
$this->count++;
}
}
return $ret;
}
public function finish() { return null; }
private $range;
private $begin;
private $delay;
private $spacing;
private $bias;
private $count;
}
class ConstantGain implements iAggregator
{
public function __construct($parameters) {
$this->begin = null;
$this->count = 0;
if(isset($parameters->range) && $parameters->range >= 0) {
$this->range = $parameters->range;
if (isset($parameters->parameter)) {
$this->gain = (float)$parameters->parameter;
} else {
$this->gain = 0;
}
if (isset($parameters->delay) && $parameters->delay > 0) {
$this->delay = $parameters->delay;
} else {
$this->delay = 0;
}
if (isset($parameters->spacing) && $parameters->spacing > 0) {
$this->spacing = $parameters->spacing;
} else {
$this->spacing = 0;
}
} else {
throw (new Exception\CantCreateException());
}
}
public function aggregate(SmartData $smartdata) {
$ret = null;
if($this->begin == null) {
$this->begin = $smartdata->time + $this->delay;
}
if($this->range === 0 && $smartdata->time > $this->begin) {
$ret = clone $smartdata;
$ret->value = (float)$smartdata->value * ($this->gain);
$this->count++;
} else {
if($smartdata->time > ($this->begin+$this->range) || $smartdata->time < $this->begin) {
$ret = clone $smartdata;
$this->count = 0;
if ($this->spacing > 0) {
if ($smartdata->time > ($this->begin+$this->range+$this->spacing)) {
$this->begin = $smartdata->time;
$ret = clone $smartdata;
$ret->value = (float)$smartdata->value * ($this->gain);
$this->count++;
}
}
} else {
$ret = clone $smartdata;
$ret->value = (float)$smartdata->value * ($this->gain);
$this->count++;
}
}
return $ret;
}
public function finish() { return null; }
private $range;
private $begin;
private $delay;
private $spacing;
private $gain;
private $count;
}
class AllAnomalies implements iAggregator
{
public function __construct($parameters) {
$this->begin = null;
$this->count = 0;
$this->applied = 0;
if(isset($parameters->range) && $parameters->range >= 0) {
$this->range = $parameters->range;
} else {
throw (new Exception\CantCreateException());
}
if (isset($parameters->parameter) && $parameters->drift > 0) {
$this->slope = $parameters->drift;
} else {
$this->slope = 0;
}
if (isset($parameters->stuck) && $parameters->stuck > 0) {
$this->stuck = $parameters->stuck;
} else {
$this->stuck = 0;
}
if (isset($parameters->bias) && $parameters->bias > 0) {
$this->bias = $parameters->bias;
} else {
$this->bias = 0;
}
if (isset($parameters->gain) && $parameters->gain > 0) {
$this->gain = $parameters->gain;
} else {
$this->gain = 0;
}
if (isset($parameters->delay) && $parameters->delay > 0) {
$this->delay = $parameters->delay;
} else {
$this->delay = 0;
}
if (isset($parameters->spacing) && $parameters->spacing > 0) {
$this->spacing = $parameters->spacing;
} else {
$this->spacing = 0;
}
}
public function aggregate(SmartData $smartdata) {
$ret = null;
if($this->begin == null) {
$this->begin = $smartdata->time + $this->delay;
}
if($smartdata->time > ($this->begin+$this->range) || $smartdata->time < $this->begin) {
$ret = clone $smartdata;
$this->count = 0;
if($smartdata->time > ($this->begin+$this->range)) {
$this->begin = $smartdata->time + $this->spacing;
$this->applied++;
$this->count = 0;
$this->value = $smartdata->value;
}
} else {
if($this->applied == 0) {
//apply drift
if ($this->slope == 0) {
$this->applied++;
}
$ret = clone $smartdata;
$ret->value = (float)$smartdata->value + (float)($this->slope * ($this->count + 1));
$this->count++;
}
if($this->applied == 1) {
//apply stuckAt
if ($this->stuck == 0) {
$this->applied++;
}
$ret = clone $smartdata;
$ret->value = $this->value;
$this->count++;
}
if($this->applied == 2) {
//apply constantBias
if ($this->bias == 0) {
$this->applied++;
}
$ret = clone $smartdata;
$ret->value = (float)$smartdata->value + ($this->bias);
$this->count++;
}
if($this->applied == 3) {
//apply constantGain
if ($this->gain == 0) {
$this->applied++;
}
$ret = clone $smartdata;
$ret->value = (float)$smartdata->value * ($this->gain);
$this->count++;
}
if ($this->applied > 3) {
$ret = clone $smartdata;
}
}
return $ret;
}
public function finish() { return null; }
private $range;
private $begin;
private $delay;
private $spacing;
private $slope;
private $stuck;
private $bias;
private $gain;
private $count;
private $value;
private $applied;
}
/*
STD_DEV verificator
*/
class StdDevFilter implements iAggregator
{
public function __construct($parameters) {
$this->begin = null;
$this->count = $this->average = 0;
if(isset($parameters->range) && $parameters->range >= 0) {
$this->range = $parameters->range;
if (isset($parameters->parameter)) {
$this->acceptedDrift = (float)$parameters->parameter;
} else {
$this->acceptedDrift = 0;
}
} else {
throw (new Exception\CantCreateException());
}
}
public function aggregate(SmartData $smartdata) {
$ret = null;
if($this->begin == null) {
$this->begin = $smartdata->time;
}
if ($smartdata->time <= $this->begin+$this->range || $this->range === 0) {
$ret = clone $smartdata;
$delta = (float)($ret->value - (float)$this->average);
$this->average = (float)(($this->average * $this->count + $ret->value)/($this->count+1));
$this->count++;
$std = (float)($delta*($this->average-(float)($delta/$this->count)));
$std = (float) ($std / $this->count);
$std = sqrt($std);
if ($delta > 0) {
$dif = $delta - ($std * $this->acceptedDrift);
} else {
$dif = -1*$delta - ($std * $this->acceptedDrift);
}
if ($dif > 0) {
$ret->value = -1;
}
} else {
$ret = clone $smartdata;
}
return $ret;
}
public function finish() { return null; }
private $range;
private $begin;
private $acceptedDrift;
private $count;
private $average;
}
class Cirular_Queue
{
public function __construct($size_limit) {
$this->_head = -1;
$this->_tail = -1;
$this->_limit = $size_limit;
$this->_queue = array();
}
public function insert($el) {
if($this->full()) {
$this->_head = ($this->_head + 1) % $this->_limit;
$this->_tail = ($this->_tail + 1) % $this->_limit;
} else {
if($this->empty())
$this->_head = $this->_tail = 0;
else
$this->_tail = ($this->_tail + 1) % $this->_limit;
$this->_size++;
}
$this->_queue[$this->_tail] = $el;
return $el;
}
public function size() { return $this->_size; }
public function max_size() { return $this->_limit; }
public function head() { return $this->_queue[$this->$this->_head]; }
public function tail() { return $this->_queue[$this->_tail]; }
public function at($index){ return $this->_queue[($this->_head + $index) % $this->_limit];}
public function empty() { return ($this->_head == -1 && $this->_tail == -1); }
public function full() { return (($this->_tail + 1) % $this->_limit) == $this->_head; }
public function clear() {
$this->_size = 0;
$this->_head = $this->_tail = -1;
}
public function print_r() {
for($i = 0; $i < $this->_size; $i++){
echo $this->at($i)." | ";
}
echo "\n";
}
private $_queue;
private $_size;
private $_limit;
private $_head;
private $_tail;
}
class Entry
{
public function __construct($time=0, $value=0) {
$this->_time = $time;
$this->_value = $value;
}
public function time() { return $this->_time; }
public function value() { return $this->_value; }
private $_time;
private $_value;
public function __toString() {
return "[{$this->_time}]($this->_value)";
}
}
class Model
{
public function __construct($a=0, $b=0, $t0=0) {
$this->_a = $a;
$this->_b = $b;
$this->_t0 = $t0;
}
public function a($a) { $this->_a = $a; }
public function b($b) { $this->_b = $b; }
public function t0($t0) { $this->_t0 = $t0; }
public function compute($t) {
return ($this->_a * ($t - $this->_t0) + $this->_b);
}
private $_a;
private $_b;
private $_t0;
public function __toString() {
return "[{$this->_a},{$this->_b},{$this->_t0}]";
}
}
class DBP
{
public function __construct($w, $l, $r, $a, $t) {
$this->_w = $w;
$this->_l = $l;
$this->_r = $r;
$this->_a = $a;
$this->_t = $t;
$this->_model = null;
$this->_history = new Cirular_Queue($w);
$this->new_model = false;
$this->_timer = 0;
}
public function predict($t) {
if($this->_model != null)
return $this->_model->compute($t);
else if(!$this->_history->empty())
return $this->_history->tail()->value();
else
return 0;
}
public $new_model;
public function trickle($time, $value) {
$this->new_model = false;
$this->_history->insert(new Entry($time, $value));
if($this->_model == null) {
if($this->_history->full()){
$this->build_model($time, $value);
return false;
}
} else {
$predicted = $this->predict($time);
$max_acceptable_error = $this->max($this->abso(($value * (float)$this->_r)/100), (float)$this->_a );
$error = $this->abso( (float)$value - $predicted );
if($error > $max_acceptable_error && (++$this->_timer) > $this->_t){
$this->_timer = 0;
$this->build_model($time, $value);
return false;
}
}
return true;
}
private function build_model($time, $value) {
if(!$this->_history->full())
die ("The history must be full");
$this->new_model = true;
if($this->_model == null)
$this->_model = new Model();
$avg_oldest = 0;
$avg_recent = 0;
$this->_model->t0($this->_history->at(0)->time());
for($i = 0; $i < $this->_l; $i++)
$avg_oldest += $this->_history->at($i)->value();
$avg_oldest /= $this->_l;
for($i = 0; $i < $this->_l; $i++)
$avg_recent += $this->_history->at($this->_history->size()-1-$i)->value();
$avg_recent /= $this->_l;
$t_oldest = ( $this->_history->at(0)->time() + $this->_history->at($this->_l-1)->time() )/2;
$t_recent = ( $this->_history->at($this->_history->size()-1)->time() + $this->_history->at($this->_history->size()-1-($this->_l-1))->time() )/2;
$this->_model->a(($avg_recent - $avg_oldest)/($t_recent - $t_oldest));
$this->_model->b($avg_oldest);
$this->_model->t0($t_oldest);
$predicted = $this->predict($time);
$error = $value - $predicted;
$this->_model->b($avg_oldest+$error);
}
public function print() {
$this->_history->print_r();
}
private function abso($a){ return ($a < 0) ? -$a : $a; }
private function max($a, $b) { return ($a >= $b) ? $a : $b; }
private function min($a, $b) { return ($a <= $b) ? $a : $b; }
private $_w;
private $_l;
private $_r;
private $_a;
private $_t;
private $_model;
private $_history;
private $_timer;
}
class LastValue extends DBP
{
public function __construct($threshold) {
parent::__construct(10, 3, $threshold, $threshold, 0);
$this->_last_value = 0;
}
private $_last_value;
}
class CAN implements iAggregator
{
public function __construct($parameters) {
if (isset($parameters->parameter)){
$this->id = $parameters->parameter;
$this->offset = $parameters->range;
return $this;
} else {
throw (new Exception\CantCreateException());
}
}
function chbo($num) {
$data = dechex($num);
if (strlen($data) <= 2) {
return $num;
}
$u = unpack("H*", strrev(pack("H*", $data)));
$f = hexdec($u[1]);
return $f;
}
public function aggregate(SmartData $smartdata) {
$value = (int)$smartdata->value;
switch ((int)$this->id) {
case 401:
switch (((int)$this->offset)/1000000) {
case 1:
$value = $value & 0xFFFF;
break;
case 2:
$value = ($value >> 16) & 0xFFFF;
break;
default:
break;
}
break;
case 402:
switch (((int)$this->offset)/1000000) {
case 1:
$value = abs($value >> (6*8));
break;
case 2:
//$value = abs($value >> (4*8));
break;
case 3:
//$value = ($value >> ((5*8)-1)) & 1;
break;
case 4:
//$value = ($value >> (6*8)) & 1;
default:
break;
}
break;
case 403:
switch (((int)$this->offset)/1000000) {
case 1:
$value = $value & 0xFFFF;
break;
case 2:
$value = ($value >> 16) & 0xFFFF;
break;
case 3:
$value = ($value >> 32) & 0xFFFF;
break;
case 4:
$value = ($value >> 48) & 0xFFFF;
default:
break;
}
break;
case 411:
break;
case 412:
break;
case 413:
break;
case 414:
break;
case 415:
break;
default:
# code...
break;
}
$smartdata->value = $value;
return $smartdata;
}
public function finish() { return null; }
protected $id;
protected $offset;
}