July 26, 2015

Integrating MongoDB with Fusion Middleware

oraclefuzion.blogspot.com

With Big Data success and organizations looking at NoSQL databases for gathering analytical data for a successful business model, There is a substantial demand for integrating on-premise/cloud applications with cloud DB like MongoDB. This post will explain how you can use Fusion Middleware to integrate with MongoDB and that can be easily scalable to integrate with on-premise applications.

Pre-Requisites:
1) Get a MongoDB cloud instance . I would recommend Rackspace MongoDB , However if you are on budget you can also use free version from MongoLabs.

I used version : 2.4.6 , You can go higher as well.

Configuring the MongoDB Objects and Instance:
a)  Once you have a DB instance setup :  Create a Collection, For this exercise I have created a simple collection "Customers"


b) Setup the ACL , for ease of use you can set the IP addresses to be "any". But i would recommend restricting it for actual use.


c) Instance details , The page will provide all the instance details analyze the connection string :
It would look like: <DC Name>-mongos.objectrocket.com:<Port>



To connect to this DB you need to use the connect string as:
mongodb://<user:password>@connection-string/<Database Name>

2) Download the appropriate driver for MongoDB
http://docs.mongodb.org/ecosystem/drivers/downloads/

I used the Java driver : https://github.com/mongodb/mongo-java-driver/releases
Driver: mongo-java-driver-2.13.2.jar

SOA and Java Code :
1) Create a SOA Project and add the Driver Jar in libraries for the project



2) Create a Java File for all the Utility methods for connecting to MongoDB and CRUD operations.
I created methods for Create, Search and Delete of the Customers Collections on  Mongo DB

Here is how the code looks like :
--------------------------------------------------------------------------------------------------------------------------
public class MongoUtil {

public static void mongoInsertCustomer(String mongoURL,String serialNo, String name, String emailId ) throws Exception {
        Map<String,String> valuesMap = new HashMap<String,String>();
        valuesMap.put("serialNo", serialNo);
        valuesMap.put("name", name);
        valuesMap.put("emailId", emailId);
         try
        {
         MongoUtil mg = new MongoUtil();
         DB db =   mg.getMongoDB(mongoURL);
         mg.mongoInsertRec(db, "Customers", valuesMap); 
        }catch(Exception e){e.printStackTrace();}
    }

   public void mongoInsertRec(DB dbmongo,String mongoCollection,Map valuesMap ) {
        DBCollection mongoColl = dbmongo.getCollection(mongoCollection);
        BasicDBObject dbObj = new BasicDBObject();
       Iterator it = valuesMap.entrySet().iterator();
           while (it.hasNext()) {
               Map.Entry pair = (Map.Entry)it.next();
               dbObj.put(pair.getKey().toString(),pair.getValue().toString());
           }
        mongoColl.insert(dbObj);
    }  
    public DB getMongoDB(String mongoURI) throws UnknownHostException {
      
        MongoClientURI uri  = new MongoClientURI(mongoURI);
        MongoClient client = new MongoClient(uri);
        DB db = client.getDB(uri.getDatabase());
        return db;
    }
 public String mongoSearchRec(DB dbmongo,String mongoCollection,String searchKey , String searchVal ) {
        String resp="";
        DBCollection mongoColl = dbmongo.getCollection(mongoCollection);
        BasicDBObject searchQuery = new BasicDBObject();
        searchQuery.append(searchKey, searchVal);
        DBCursor cursor =mongoColl.find(searchQuery);
          while(cursor.hasNext()){
                    resp=cursor.next().toString();
                    }
          return resp;
    }
   public static String mongoSearchCustomer(String mongoURL,String serialNo ) throws Exception {
        String resp="";
        try
        {
         MongoUtil mg = new MongoUtil();
         DB dbmongo =   mg.getMongoDB(mongoURL);
         DBCollection mongoColl = dbmongo.getCollection("Customers");
            BasicDBObject searchQuery = new BasicDBObject();
            searchQuery.append("serialNo", serialNo);
             DBCursor cursor =mongoColl.find(searchQuery);
              while(cursor.hasNext()){
                        resp=cursor.next().toString();
                        }
          } catch(Exception e){e.printStackTrace();}
        return resp;
       }
 }

-------------------------------------------------------------------------------------------------------------------------

3) Create a WSDL file to the BPEL process or OSB service.

<?xml version="1.0" encoding="UTF-8"?>
<wsdl:definitions name="BPELProcess1"
                  targetNamespace="http://www.soaexample.mongo.pc"
                  xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
                  xmlns:client="http://www.soaexample.mongo.pc"
                  xmlns:ns1="http://www.soaexample.mongo.pc"
                  xmlns:plnk="http://schemas.xmlsoap.org/ws/2003/05/partner-link/">
 <wsdl:types>
  <xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
              xmlns="http://www.soaexample.mongo.pc"
              targetNamespace="http://www.soaexample.mongo.pc"
              elementFormDefault="qualified">
   <xsd:element name="Customers">
    <xsd:complexType>
     <xsd:sequence>
      <xsd:element name="Name" type="xsd:string"/>
      <xsd:element name="SerialNo" type="xsd:string"/>
      <xsd:element name="Email" type="xsd:string"/>
     </xsd:sequence>
    </xsd:complexType>
   </xsd:element>
   <xsd:element name="Response" type="xsd:string"/>
  </xsd:schema>
 </wsdl:types>

 <wsdl:message name="Customer">
  <wsdl:part name="payload" element="ns1:Customers"/>
 </wsdl:message>
 <wsdl:message name="response">
  <wsdl:part name="payload" element="ns1:Response"/>
 </wsdl:message>

 <!-- portType implemented by the BPELProcess1 BPEL process -->
 <wsdl:portType name="mongoPort">
  <wsdl:operation name="Insert">
   <wsdl:input message="client:Customer"/>
   <wsdl:output message="client:response"/>
  </wsdl:operation>
  <wsdl:operation name="Search">
   <wsdl:input message="client:Customer"/>
   <wsdl:output message="client:Customer"/>
  </wsdl:operation>
 </wsdl:portType>
</wsdl:definitions>


4) Now we are ready to integrate the MongoDB with Bpel.
Create a BPEL process based on WSDL and use a PICK activity to choose the correct method.

For Create add a Java Embed activity with following code:
try{  
String vsrNo = (String)getVariableData("SerialNo");     
String vname = (String)getVariableData("Name");     
String vemail = (String)getVariableData("Email");     
MongoUtil.mongoInsertCustomer("mongodb://user:pwd@connection-string/MyDB",vsrNo, vname, vemail);  
} catch(Exception e){  
addAuditTrailEntry("Exception occured : " + e.toString());     
}

For Search add the following Java embed:

try{  
  
String vsrNo = (String)getVariableData("SerialNo");     
String serachRes = MongoUtil.mongoSearchCustomer(""mongodb://user:pwd@connection-string/MyDB",vsrNo);
setVariableData("searchData",serachRes);    
} catch(Exception e){  
 addAuditTrailEntry("Exception occured : " + e.toString());     
}

This is how the BPEL process will look like :


5) Testing : Deploy the project and open the EM console

For Insert : Pass the values for SerialNo, Name , Email


Response :
We can also validate the record addded to MongoDB by logging into the cloud account and it will show one document added.

For Search :
Lets search the document added with serial Number 105

Response
For SOA we can use the JSON Util to convert the response to XML.

The process can be expanded to support Delete and Update operations as well.
I would recommend using the OSB for a more light weight approach.

Cheers !!

April 16, 2015

Throttle Load to Downstream Application using OSB

oraclefuzion.blogspot.com
Recently i came across a requirement where the downstream application has a throttle limit of max messages of 20 per 10 seconds.

Here is how the design looks like:





OSB reads messages from JMS and sends the messages to downstream application for further processing. In SOA we can easily add delay while Dequeue using a JMS adapter , however in OSB the JMS transport doesn't provide such option.

Below are the steps to achieve this :

1) Create a work manager with max Thread constraint =1 and attach to the Proxy service consuming the message from JMS.
Weblogic Console -> Environment -> Work Managers

Attach to Proxy : In Dispatch Policy attach the work Manager





This will help in Dequeue only one message at a time from JMS , however there is no check on number of messages that can be passed to the downstream application.

2) Configure throttling on Business service to Downstream application



This will ensure  that only 1 message is passed to downstream application. This will resolve the throttling issue in most of the use cases however in my case this didnt helped as the downstream application has another limit of 20 req in 10 sec .

3) To resolve this issue , i used one more limit by adding a wait of 2 sec in my proxy service before invoking the business service by adding a java code with sleep for 2 sec.



Now  all these configuration assures that there is only 1 thread reading the message and only 1 message is passed to downstream application every 2 sec.

This worked for me !!

April 3, 2015

Rollback AQ Message to Exception Queue for AQ in BPEL

oraclefuzion.blogspot.com
For one of the implementation we needed to rollback the message to Exception Queue attached to a AQ in oracle Database in case there are any validation failures.

When I looked at AQ in database there was an Exception Queue attached to the Queue on the same Queue table and message will move to exception Queue after 5 retries. Below are the steps required to achieve this in BPEL process.

1) Create AQ adapter to consume from the Queue . Add following properties in composite.xml for the AQ service

<property name="adapter.aq.dequeue.threads" type="xs:string" many="false">1</property>
<property name="minimumDelayBetweenMessages" type="xs:string" many="false">100</property>
<property name="jca.retry.count" type="xs:string" many="false" override="may">5</property>

I set Delay just to monitor the incoming messages properly , this property is optional.

Make sure the retry count is same as on the AQ to move the message to Exception Queue

2) For monitoring the Queue for consumed messages i also set the retention policy
To check retention:


1.     select name,retention from user_queues where name= 'MY_QUEUE' )
 Update retention :
exec dbms_aqadm.alter_Queue('MY_QUEUE',RETENTION_TIME=> 172800); -- 2 days

3) Make the process as synchronous , Add below properties to BPEL







4) Add Validate or do custom validation and on failure throw Rollback in BPEL process. Don't catch the Rollback

If the Message is Invalid it will be rolled back to the Exception Queue and you can see in the queue table. In my case it was the same Queue table to records were displayed like :

select msgid,state,retry_count from My_Queue_table where Q_Name = 'AQ$_MY_Queue_E'



State 3-> Failure.

This works !!

July 11, 2014

FATAL Alert BAD_CERTIFICATE – A corrupt or unuseable certificate was received

oraclefuzion.blogspot.com
While testing one of the https service in OSB i got below error :

"FATAL Alert BAD_CERTIFICATE – A corrupt or unuseable certificate was received"

It looks like a cert error so I re-imported all the certs including root & intermediate , however the error was still coming.

The root cause of  the problem was  WebLogic Server does not trust certificates stronger that 128-bit. To support more stronger certs we need to enable "JSSE SSL" in weblogic , which trusts more stronger certificates.

Below are the steps:

1. Weblogic console - Servers - Configuration - SSL - Advanced
2. Select "Use JSSE SSL"
3. Restart the servers




This solved the issue.

Cheers!!