Author Archives: matteo

Analyzing huge sensor data in near realtime with Apache Spark Streaming

For this demo I downloaded and installed Apache Spark 1.5.1

Suppose you have a stream of data from several (industrial) machines like

1,2015-01-01 11:00:01,1.0,1.1,1.2,1.3,..
2,2015-01-01 11:00:01,2.2,2.1,2.6,2.8,.
3,2015-01-01 11:00:01,1.1,1.2,1.3,1.3,.
1,2015-01-01 11:00:02,1.0,1.1,1.2,1.4,.
1,2015-01-01 11:00:02,1.3,1.2,3.2,3.3,..

Below a system, written in Python, that reads data from a stream (use the command “nc -lk 9999” to send data to the stream) and every 10 seconds collects alerts from signals: at least 4 suspicious values of a specific signal of the same machine

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

min_occurs = 4

def signals_from_1_row_to_many(row):
  "output is (machine, date, signal_number, signal_value)"
  result = []
  for f in range(2,21):
    result = result + [(row[0], row[1], f-1, row[f])]
  return result

def isAlert(signal, value):
  defaults = [83.0, 57.0, 37.0, 57.0, 45.0, 19.0, -223.0, 20.50, 20.42, 20.48, 20.24, 20.22, 20.43, 20, 20.44, 20.39, 20.36, 20.25, 1675.0]
  soglia = 0.95
  if value == '':
     return True
  value = float(value)
  ref = defaults[signal -1]
  if value < ref - soglia*ref or value > ref + soglia*ref:
    return True
    return False
def isException(machine, signal):
  # sample data. the sensor 19 of machine 11 is broken
  exceptions = [(11,19)]
  return (int(machine), signal) in exceptions 

# Create a local StreamingContext with two working thread and batch interval of 10 second
sc = SparkContext("local[2]", "SignalsAlerts")
ssc = StreamingContext(sc, 10)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

all_alerts = l: l.split(",")) \
                 .flatMap(signals_from_1_row_to_many) \
                 .filter(lambda s: isAlert(s[2], s[3])) \
                 .filter(lambda s: not isException(s[0], s[2])) \
                 .map(lambda s: (s[0]+'-'+str(s[2]), [(s[1], s[3])])) \
                 .reduceByKey(lambda x, y: x + y) 

alerts = all_alerts.filter(lambda s: len(s[1]) > min_occurs)


ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

TwitterPopularTags.scala example of Apache Spark Streaming in a standalone project

This is an easy tutorial of using Apache Spark Streaming with Scala language using the official  TwitterPopularTags.scala example and putting it in a standalone sbt project.


In few minutes you will be able to receive streams of tweets and manipulating then in realtime with  Apache Spark Streaming

  • Install Apache Spark (I used 1.5.1)
  • Install sbt
  • git clone
  • cd TwitterPopularTags
  • cp
  • edit
  • sbt package
  • spark-submit –master local –packages “org.apache.spark:spark-streaming-twitter_2.10:1.5.1” ./target/scala-2.10/twitterpopulartags_2.10-1.0.jar italy

Howto collecting twitter data in 15 minutes

For this tutorial I assume you are using a  Debian/Ubuntu Linux system but it could be easily adapted for other Openrating Systems

Install the software

apt-get install openjdk-7-jdk  
tar xvfz apache-karaf-4.0.2.tar.gz

Start the server

cd apache-karaf-4.0.2/

Install additional connectors

ssh -p 8101 karaf@localhost
feature:repo-add camel 2.16.0
feature:install camel camel-blueprint camel-twitter camel-jackson camel-dropbox

Configure our routes

Create two new files:


<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns=""

  <camelContext id="twitter-to-file" streamCache="true" xmlns="">

      <json id="jack" library="Jackson" />
      <jaxb id="myJaxb" prettyPrint="true" contextPath="org.apache.camel.example"/>

    <route id="twitter-tweets-to-file">
      <from uri="vm:twitter-tweets-to-file" />
      <setHeader headerName="CamelFileName">
        <to uri="vm:twitter-tweet-to-file" />

    <route id="twitter-tweet-to-file">
      <from uri="vm:twitter-tweet-to-file" />
      <log message="Saving tweet id= ${}" />
      <!-- transforming the body (a single tweet) to a json doc -->
      <marshal ref="jack" />
      <convertBodyTo type="java.lang.String" charset="UTF8" />
      <setHeader headerName="CamelFileName">
      <to uri="file:twitter-data?autoCreate=true&amp;fileExist=Append" />


<blueprint xmlns="">
  <camelContext id="twitter-search-sample" xmlns="">
    <route id="twitter-search-sample">
      <from uri="twitter://streaming/sample?count=100&amp;type=polling&amp;consumerKey=XXX&amp;consumerSecret=XXX&amp;accessToken=XXX&amp;accessTokenSecret=XXX" />
      <setHeader headerName="twitter-id">
      <to uri="vm:twitter-tweets-to-file" />


and copy then in the “deploy” directory. Check logs in data/log/karaf.log and see results in the folder twitter-data/sample/yyyy/mm/dd


Good lucks


About Cayley a scalable graph database


This is fast tutorial of using the Caylay graph database (with MongoDB as backend): Cayley is “not a Google project, but created and maintained by a Googler, with permission from and assignment to Google, under the Apache License, version 2.0”

"database": "mongo",
"db_path": "",
"read_only": false,
"host": ""
  • ./cayley init -config=cayley.cfg
  • ./cayley http -config=cayley.cfg -host=”″ &
  • create a file demo.n3
"/user/matteo" "is_manager_of" "/user/ele" .
"/user/matteo" "has" "/workstation/wk0002" .
"/user/matteo" "lives_in" "/country/italy" .
  • upload data with: curl -F NQuadFile=@demo.n3
  • or: ./cayley load –config=cayley.cfg  -quads=demo.n3
  • query data with: curl –data ‘g.V(“/user/matteo”).Out(null,”predicate”).All()’
 "result": [
   "id": "/workstation/wk0002",
   "predicate": "has"
   "id": "/country/italy",
   "predicate": "lives_in"
   "id": "/user/ele",
   "predicate": "is_manager_of"

Hortonworks, IBM and Pivotal begin shipping standardized Hadoop

“Hortonworks, IBM and Pivotal begin shipping standardized Hadoop. The standardization effort is part of the Open Data Platform initiative, which is an industry effort to ensure all versions of Hadoop are based on the same Apache core..”. Read all the full article

This is t

Howto export Oracle Essbase databases with MaxL / essmsh commands


essbase@olap-server:~> /opt/essbase/Oracle/Middleware/EPMSystem11R1/products/Essbase/EssbaseServer/templates/

 Essbase MaxL Shell 64-bit - Release 11.1.2 (ESB11.
 Copyright (c) 2000, 2014, Oracle and/or its affiliates.
 All rights reserved.

MAXL> login Hypadmin mypassword on;

 OK/INFO - 1051034 - Logging in user [Hypadmin@Native Directory].
 OK/INFO - 1241001 - Logged in to Essbase.

MAXL> export database P_BSO.Plan1 level0 data to data_file 'ExpLev0_P_BSO.Plan1';

 OK/INFO - 1054014 - Database Plan1 loaded.
 OK/INFO - 1051061 - Application P_BSO loaded - connection established.
 OK/INFO - 1054027 - Application [P_BSO] started with process id [60396].
 OK/INFO - 1019020 - Writing Free Space Information For Database [Plan1].
 OK/INFO - 1005031 - Parallel export completed for this export thread. Blocks Exported: [2013908]. Elapsed time: [312.35]..
 OK/INFO - 1005002 - Ascii Backup Completed. Total blocks: [2.01391e+06]. Elapsed time: [312.35]..
 OK/INFO - 1013270 - Database export completed ['P_BSO'.'Plan1'].


/opt/essbase/Oracle/Middleware/EPMSystem11R1/products/Essbase/EssbaseServer/templates/ -u Hypadmin -p mypassword -s localhost backup-databases.msh

with a file backup-databases.msh like

export database P_BSO.Plan1 level0 data to data_file 'ExpLev0_P_BSO.Plan1';


BUT if you need to export both metadata and data, you should run the command

MAXL> alter database P_BSO_D.Plan1 force archive to file 'P_BSO_D.Plan1.arc';