Stream Explorer and JMS for both inbound and outbound interaction

Share this on .. Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Email this to someoneShare on TumblrBuffer this page

In this article, we will look at the very common interaction between Stream Explorer and JMS. JMS is a commonly used channel for decoupled exchange of messages or events. Stream Explorer can both consume messages from a JMS destination (through Stream) and publish findings to a JMS destination (with a target). The use case we discuss here is about temperature sensors: small devices distributed over a building, measuring the local room temperature every few seconds and reporting it over JMS. The Stream Explorer application has to look out for rooms with quickly increasing temperatures and report those over a second JMS queue. Note: this article describes the Java (SE) code used for generating temperature signals. This class generates temperature values (in Celsius!) for a number of rooms, and publishes these to the queue temperatureMeasurements. At some random point, the class will start a fire in a randomly selected room. In this room, temperatures will soon be over 100 degrees. Also in this article is Java class HotRoomAlertProcessor  that consumes messages from a second JMS Queue. Any message received on that queue is reported to the console.

Our objective in this article is to read the temperature measurements from the JMS Queue into a Stream Explorer application, calculate the average value per room and then detect the room on fire. This hot room should then be reported to the JMS Queue.

Open Stream Explorer and from the Stream Explorer Catalog page, create a new item of type Stream. Select JMS as the source type.

image

Press Next.

Configure the URL for the WebLogic domain (http://localhost:7101), the WebLogic Admin’s username and password (weblogic/weblogic1) and the JNDI Name for the JMS Queue (or Topic): jndi/ temperatureMeasurements

image

Press Next.

Define a new Shape. The properties in the JMS (Map)Message produced by the Java Class TemperatureSensorSignalPublisher are called RoomId (of type String) and Temperature (of type Float).
image

Press Create.

The Exploration editor appears to create an exploration based on the Stream.

Define a Name. Then click on Create.

image

The temperature measurement events start streaming in:

image

The first step is the definition of a Summary: calculate the average temperature per room. Also set the time range for the aggregation to 10 seconds (determine the temperature using the most recent 10 seconds worth of data) and the evaluation frequency to 5 seconds.

image

Fewer events are shown in the Live Output Stream – and with less variation.

Next, add a filter: we are going to hunt for the room on fire. Only records with an average temperature higher than 80 degrees should be reported. Also change the name of the property AVG_of_Temperature to AverageTemperature.

image

The screenshot shows that in this case, it is the Cafeteria where there is a fire. If you stop class TemperatureSensorSignalPublisher and then start it again, it will take some time for it to start a fire again and when the fire was started, the Live Output Stream will show it.

Finally, click on Configure Target.

Configure a JMS Target, as shown in the figure. The URL is the familiar one (t3://localhost:7101), username and password are weblogic and weblogic1 and the JNDI Name of the JMS target is jndi/hotRooms.
image

Click on Finish. Publish the Exploration.

When there is now a room discovered with temperatures in the hot zone, a message will be published to the JMS Queue, in the form of a MapMessage with properties RoomId and AverageTemperature.

Stop and start class TemperatureSensorSignalPublisher. Run class HotRoomAlertProcessor to have it start listening to the jndi/hotRooms queue.

The former writes:

image

And the latter will report hot rooms by writing a message to the console:

image

While the Stream Explorer browser interface shows:

image

Share this on .. Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Email this to someoneShare on TumblrBuffer this page

StreamExplorer pushing findings as JSON messages to a WebSocket channel for live HTML Dashboard updates

Share this on .. Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Email this to someoneShare on TumblrBuffer this page

A common desire when doing real time event processing with Stream Explorer and/or Oracle EVent Processor is the ability to present the findings from Stream Explorer in a live dashboard. This dashboard should hold a visualization of whatever information we have set up Stream Explorer to find for us – and it should always show the latest information.

User interfaces are commonly presented in web browsers and created using HTML(5) and JavaScript. As part of the HTML5 evolution that brought today’s browsers, we now have the ability to use Web Sockets through which we can push information from server to browser to have the user interface updated based on messages pushed from the server. This allows us to create a dashboard that listens from the browser to a Web Socket and use whatever messages appear on the web socket to actualize the user interface. Such a dashboard and its implementation using standard Java (EE) was discussed in a recent article: Java Web Application sending JSON messages through WebSocket to HTML5 browser application for real time push. The results from that article provide the foundation for this article you are reading right now.

We will create a Stream Explorer application that exposes a REST interface to which we will publish JSON messages (in this example using SoapUI as the client from which to generate the test events). These messages report on groups of people entering or leaving a specific room in a movie theater. The exploration we create will aggregate the information from the messages – providing us with a constant insight in the total number of people in each room. This information is subsequently pushed to the REST service exposed by a Java EE application that routes that information across the web socket to the HTML5 client. The next figure illustrates the application architecture:

image

In this article, we will assume that Java EE application including the dashboard are already available, as described in the referenced article. All we need to do is

  • Create a Stream exposed as (inbound) REST interface – discussed in this article.
  • Create an Exploration on top of this Stream – to aggregate the events from the Stream.
  • Configure a target for this Exploration using the outbound REST adapter (an example of which is discussed here) and publish the exploration.
  • Run the Java EE application, open the dashboard and publish messages to the Stream Explorer REST service; watch the dashboard as it constantly updates to reflect the actual status

 

After configuring the Stream (as discussed in this article), create an exploration, for example called CinemaExploration. Create a Summary of type SUM based on the property partySize and group by room. Edit the Properties and change the name of property SUM_of_partySize to occupation. The exploration will look like this:

 

image

We can start pushing some messages to it from SoapUI:

image

based in part on twice sending this SoapUI request:

image

 

Next, click on Configure a Target.

image

Select type REST and set the URL

image

Click on Finish.

Publish the Exploration.

image

 

The dashboard is opened:

image

Now we can run a test case in SoapUI to send test messages to the Stream Explorer application:

image

 

Here is what the live output stream in the Stream Explorer UI shows next to a screenshot taken of the Cinema Monitor dashboard:

image

The dashboard is constantly updated with the most recent finding published by Stream Explorer.Note: the notion of having a negative occupancy is one that will require some explaining! I(more careful test data management seems to be called for)

After running some more of the SoapUI Test Cases that publish cinema events to the REST ful entry point to the Stream Explorer application, the situation is as follows:

image

Share this on .. Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Email this to someoneShare on TumblrBuffer this page

StreamExplorer – use REST adapter to feed results from event processing to a REST service

Share this on .. Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Email this to someoneShare on TumblrBuffer this page

In a recent article, I described how StreamExplorer can be configured to consume events by exposing a REST service to which clients can send HTTP POST requests with JSON payloads. StreamExplorer also can make use of an Outbound REST Adapter through which results of explorations can be sent.

This target service can be implemented in a number of ways – that does not concern StreamExplorer. In this case we will use an implementation in a Java Servlet (JAX-RS). Alternatives are a SOA Suite service or even a Stream Explorer application with a REST based input stream. The REST service that is invoked in this example is published from a Java SE application – as described in this article. When you run the class CinemaEventHandlerRestStartup, a REST service is published at http://localhost:9998/movieevent, supporting both GET and a POST operations.

REST services can be a target for StreamExplorer – so we should be able to quickly configure this service as a target. It will not do much with any messages it receives – it will just show in the console output that it did receive them.

The StreamExplorer application that will publish results to the REST target is introduced in this article. The example looks at a movie theater. More specifically, it monitors the visitors entering and leaving the various rooms in the theater so we keep track of the number of people currently present in each room. When rooms are almost full, we may have to stop selling tickets or perhaps open up an additional room showing the same movie. When a room is [almost]empty, perhaps we should cancel the show altogether. When people keep coming and going, there may be a problem we should attend to. In this case, the events are received by Stream Explorer in the form of JSON messages posted to a REST service.

The data shape for events in the stream is defined like this:

image

From the Catalog, create a new Exploration based on the stream CinemaComingAndGoing.

image

 

Run either the SoapUI test case or the PL/SQL script to have some cinema events published. Verify that these events lead to results in the new exploration.

image

Next, click on Configure a Target. Select REST as the target type

image

and set the resource URL to http://localhost:9998/movieevent .

image

Click on Finish. Next, publish the Exploration.

image

Publish some cinema events to the REST service exposed by the StreamExplorer exploration, as described in this article. The events are processed and forwarded to the REST service (exposed in this case by a Java Class running in a plain Java SE container).

 

image

Share this on .. Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Email this to someoneShare on TumblrBuffer this page

Use the inbound REST adapter of StreamExplorer to consume events from HTTP POST requests

Share this on .. Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Email this to someoneShare on TumblrBuffer this page

StreamExplorer is a fairly recent product from Oracle – a business user friendly layer around Oracle Event Processor. In various previous articles, I have discussed StreamExplorer. I have demonstrated how SX can consume events from JMS, EDN and from CSV files. This article shows how a stream in StreamExplorer can expose a REST service to which events can be pushed.

image

In this case, we will look at a movie theater. More specifically, we will monitor the visitors entering and leaving the various rooms in the theater so we keep track of the number of people currently present in each room. When rooms are almost full, we may have to stop selling tickets or perhaps open up an additional room showing the same movie. When a room is [almost]empty, perhaps we should cancel the show altogether. When people keep coming and going, there may be a problem we should attend to.

In this case, the events are received by Stream Explorer in the form of JSON messages posted to a REST service. We use a SoapUI test case with requests to send in these events. The test request called aprtyOf3InRoom1, isshown in the figure:

image

The request is configured to be sent to the endpoint http://localhost:9002. That is the endpoint for Stream Explorer (and OEP and more specifically the Jetty server running the OEP domain on top of which Stream Explorer was applied). The (REST) Resource is specified as /cinema. Together this means that this request is sent as a POST request to the end point http://localhost:9002/cinema. So that is where our StreamExplorer application will have to consume the message.

The message itself has a JSON payload with two simple properties: room (to identify a room in the movie theater) and partySize (to stipulate the number of people involved in an observation). Note: the number of people is positive when a party enters the room and negative when it leaves the room.

The TestSuite TestSuiteCinema contains a single test case with a number of steps that simulate events on a slow night in the movie theater.

Create a Stream and a First Exploration for Handling Cinema Events

Create a new Stream. The name is not crucial. The Source Type should be REST.

image

 

Specify the Context Path as /cinema.

image

Define the REST Shape. Set the name to CinemaEntryOrExitEvent. Define two properties: room – of type String – and partySize – of type Integer.

image

Click on Create to create the Stream.

The Exploration wizard opens next. Set a name for the exploration – for example RoomOccupancy. You may provide a description as well.

image

Click on Create.

Configure the Exploration for example like this:

image

The events are aggregated grouped by room. The aggregation to be calculated is a sum over the partySize. This should produce the total number of people in every room. In this case, I have also chosen to have the summary calculated once every 5 seconds and to include in the results only the events from the last 8 hours (which is quite arbitrary).

At this point I create a test case in SoapUI for the REST service – and run it:

image

The exploration starts reporting its findings:

image

 

Because publishing data to a REST service in JSON format is so easy – and is supported from many tools such as SoapUI and technologies including PL/SQL and Java, it is a very convenient way of sending events to a StreamExplorer application, both for development and testing purposes as well as for a production implementation.

Using the PL/SQL procedure described in this article  we can easily send events from inside the Oracle Database to the StreamExplorer stream:

image

– and verify the effects in Stream Explorer. Note that the movie showing in room 2 must be quite awful;-)

image

 

Resources

Getting Started with the REST Adapter in OEP 12c – A-Team Blog

Make HTTP Post call from PL/SQL

Make HTTP Post call from Java

Share this on .. Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Email this to someoneShare on TumblrBuffer this page

Quick Introduction to Oracle Stream Explorer Part Three– Business User friendly processing of real time events – some smart pattern detection

Share this on .. Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Email this to someoneShare on TumblrBuffer this page

This article is part three in a series about Oracle – very recently released by Oracle and available from OTN. With Oracle Stream Explorer, business users and citizen developers as well as hard core developers can create explorations on top of live streaming data to filter, enrich, aggregate, inspect them and detect patterns. Stream Explorer provides a business user friendly browser based user interface in which streams are configured, explorations are composed, enrichment is set up and publication to external consumers is defined. In the previous two articles I showed how to create a simple exploration based on a stream of events read from a CSV file and how to derive aggregate values from these events. The second article introduced enrichment based on data in database table, adding more meaning to the aggregation performed on the stream.

In this article, we will look at some of the built in patterns that we can easily include in our explorations to detect or use for further aggregation and interpretation.

The use case is: we are organizing a small conference. In three rooms, sessions take place simultaneously. Our attendees are free to decide which session to attend. We would like to know at virtually any moment how many people are in each room. We have set up simple detectors at the doors of the rooms that produce a signal whenever someone enters or leaves the room. This signal consists of the room identifier (1,2 or 3) and an IN/OUT flag (values +1 or -1).

In article one, we simply determined the number of people per room – updated every 10 seconds with the latest events. In the second article we enriched these updates with details about the rooms – name of the room as well as its capacity. I am still looking for a way to calculate the occupancy percentage (number of attendees divided by room capacity) in order to alert for rooms at higher than 90% occupancy. (if SX does not support these calculations, I could add a virtual column to the ROOMS table that returns the number of attendees equal to 90% capacity and work from there).

In the article you are currently reading I will make use of some built in patterns:

  • Top N – to report every 30 seconds what is the room with the highest number of people in it
  • Detect Missing Event – identify event sources that have stopped publishing events (for example to find a room with a jammed door, a broken detector or no human activity)
  • Eliminate Duplicates – do not report a specific event type (such as missing events because of jammed door) for the same room again if it was already reported in the last 30 seconds

Using a one of the predefined patterns is very simple: instead of creating a generic new exploration, we create a pattern-based-exploration and provide the parameters associated with the pattern.

 

Top N: report every 30 seconds what is the room with the highest number of people in it

In the Catalog, Create a new item and select Pattern as the type for the new item. A list of supported pattern types is shown. Pick the Top N entry:

image

The pattern template page appears – an exploration is configured, based on the template for the pattern.

The source for this exploration is EnrichedRoomFlow. We have to specify that we want to report every 20 seconds (slide 20) about the room that had the largest number of people in it (order by AttendeeCount). We want to look at the attendee count over the past 40 seconds – as to not call a room the most popular too easily. And since we are only interested in the single most popular room (instead of the top 2 or 3) we set N to 1.

image

Next we can configure the name and description of the exploration. Additionally, we can define a target – such as a CVS file – where the findings from the exploration are written to.

image

Click finish and publish the exploration.

The target file is created and before too long, the popular rooms are written to it:

image

 

 

Detect Missing Event

Create a new item of type Pattern. Select Detect Missing Event as the pattern.

image

We have to specify the source in which we want to detect the missing event. Select EnrichedRoomFlow – although NetRoomFlow would have done just as nicely. Specify the Tracking Field: which field(s) are we watching to check whether the event is reported or not? In this case, we are looking for rooms that do not report signals anymore. Therefore, the tracking field is the RoomId. We consider a room non-responding if we have not received any signal for 1 minute; that is our heartbeat interval.

image

The exploration can be configured in terms of its name and description:

image

and

image

Additionally, we can configure a target – a csv file to collect all reported events – all notifications of a non-responsive room:

image

Then, when the configuration is done, we can publish the Exploration – to have the OEP application deployed to the OEP Server. The target file is created, and events are started being written to it.

In order to test whether this exploration is working correctly, I have added a fourth room in the database table and included some entry and exit events for room #4:

image

The target file will contain – after a few minutes have passed – signals for room 4 – as expected. It turns out that room 3 is eventually reported as well. Apparently, my artificial set of events has a section where only rooms 1 and 2 are reported:

image

 

Eliminate Duplicates

Another pattern we can easily leverage is eliminate duplicates. It allows us to ensure that over a specified period of time an event of a certain type is not published more than once – for a certain combination of property values. For example the non responsive rooms discussed overhead are reported repeatedly. Room number 4 is non responsive and the exploration keeps on telling us that. Now we may not need to get that information for the same room more often than say once per 5 minutes. This can easily be taken care of with this duplicate elimination pattern.

Create a new item of type pattern and select the eliminate duplicates pattern:

image

Set as the source the output from the detect missing events exploration that reports non-responsive rooms. Select RoomId as the key to de-duplicatie on and set the Window to 5 minutes – as we want to prevent duplicate reports within a five minute period.

image

Configure a target file, to test the output from the exploration:

image

and publish.

The file is created and lo and behold: non responsive rooms are reported, but far less frequently than is done by the DetectJammedDoorsFailedDetectorsNonResponsiveRooms exploration in the nonResponsiveRooms

Share this on .. Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Email this to someoneShare on TumblrBuffer this page