Monthly Archives: November 2015

Analyzing huge sensor data in near realtime with Apache Spark Streaming

For this demo I downloaded and installed Apache Spark 1.5.1

Suppose you have a stream of data from several (industrial) machines like

MACHINE,TIMESTAMP,SIGNAL1,SIGNAL2,SIGNAL3,...
1,2015-01-01 11:00:01,1.0,1.1,1.2,1.3,..
2,2015-01-01 11:00:01,2.2,2.1,2.6,2.8,.
3,2015-01-01 11:00:01,1.1,1.2,1.3,1.3,.
1,2015-01-01 11:00:02,1.0,1.1,1.2,1.4,.
1,2015-01-01 11:00:02,1.3,1.2,3.2,3.3,..
...

Below a system, written in Python, that reads data from a stream (use the command “nc -lk 9999” to send data to the stream) and every 10 seconds collects alerts from signals: at least 4 suspicious values of a specific signal of the same machine

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

min_occurs = 4

def signals_from_1_row_to_many(row):
  "output is (machine, date, signal_number, signal_value)"
  result = []
  for f in range(2,21):
    result = result + [(row[0], row[1], f-1, row[f])]
  return result

def isAlert(signal, value):
  defaults = [83.0, 57.0, 37.0, 57.0, 45.0, 19.0, -223.0, 20.50, 20.42, 20.48, 20.24, 20.22, 20.43, 20, 20.44, 20.39, 20.36, 20.25, 1675.0]
  soglia = 0.95
  if value == '':
     return True
  value = float(value)
  ref = defaults[signal -1]
  if value < ref - soglia*ref or value > ref + soglia*ref:
    return True
  else:
    return False
  
def isException(machine, signal):
  # sample data. the sensor 19 of machine 11 is broken
  exceptions = [(11,19)]
  return (int(machine), signal) in exceptions 

# Create a local StreamingContext with two working thread and batch interval of 10 second
sc = SparkContext("local[2]", "SignalsAlerts")
ssc = StreamingContext(sc, 10)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

all_alerts = lines.map(lambda l: l.split(",")) \
                 .flatMap(signals_from_1_row_to_many) \
                 .filter(lambda s: isAlert(s[2], s[3])) \
                 .filter(lambda s: not isException(s[0], s[2])) \
                 .map(lambda s: (s[0]+'-'+str(s[2]), [(s[1], s[3])])) \
                 .reduceByKey(lambda x, y: x + y) 

alerts = all_alerts.filter(lambda s: len(s[1]) > min_occurs)

alerts.pprint()

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate