Powered By Blogger

Sunday, July 15, 2012

Scala Mongo Driver Example (hammersmith)

===================================================================

package shyarmal.mongo.driver.test.scala

import java.io.InputStream

import org.bson.collection.BSONDocument
import org.bson.io.BasicOutputBuffer
import org.bson.io.OutputBuffer
import org.bson.SerializableBSONObject
import org.bson.BSONSerializer
import org.bson.DefaultBSONDeserializer
import org.bson.DefaultBSONSerializer

import akka.remote.RemoteProtocol.MessageProtocol
import akka.remote.MessageSerializer
import akka.actor.{ActorSystem, ExtendedActorSystem}
import com.typesafe.config.{ConfigFactory, Config}

/**
* Created by IntelliJ IDEA.
* User: danuka
* Date: 6/12/12
* Time: 10:03 AM
* To change this template use File | Settings | File Templates.
*/

class BSONSerializableMessageQueue extends SerializableBSONObject[Message]{
//  var c: Config = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$TestExtensionId\" ]").withFallback(AkkaSpec.testConf)
  val system : ExtendedActorSystem = ActorSystem.create("JavaExtension", ConfigFactory.load.getConfig("remotelookup")).asInstanceOf[ExtendedActorSystem]
//  val system : ExtendedActorSystem = ActorSystem.create("JavaExtension", null).asInstanceOf[ExtendedActorSystem]
//  val system : ActorSystem = ActorSystem.create()
  protected def serializeDurableMsg(msg: Message)(implicit serializer: BSONSerializer) =  {
   val b = Map.newBuilder[String, Any]
    b += "_id" -> msg._id
    b += "text1" -> msg.text1
    b += "text2" -> msg.text2
    b += "text3" -> msg.text3
    b += "text4" -> msg.text4
    b += "text5" -> msg.text5
    b += "text6" -> msg.text6
    b += "text7" -> msg.text7
    b += "text8" -> msg.text8
    b += "text9" -> msg.text9
    /**
     * TODO - Figure out a way for custom serialization of the message instance
     * TODO - Test if a serializer is registered for the message and if not, use toByteString
     */
//    val msgData = MessageSerializer.serialize(system, msg.text3.asInstanceOf[AnyRef])
//    b += "message" -> new org.bson.types.Binary(0, msgData.toByteArray)
    val doc = b.result
    serializer.putObject(doc)
  }

  /*
   * TODO - Implement some object pooling for the Encoders/decoders
   */
  def encode(msg: Message, out: OutputBuffer) = {
    implicit val serializer = new DefaultBSONSerializer
    serializer.set(out)
    serializeDurableMsg(msg)
    serializer.done
  }

  def encode(msg: Message): Array[Byte] = {
    implicit val serializer = new DefaultBSONSerializer
    val buf = new BasicOutputBuffer
    serializer.set(buf)
    serializeDurableMsg(msg)
    val bytes = buf.toByteArray
    serializer.done
    bytes
  }

  def decode(in: InputStream): Message = {
    val deserializer = new DefaultBSONDeserializer
    // TODO - Skip the whole doc step for performance, fun, and profit! (Needs Salat / custom Deser)
    val doc = deserializer.decodeAndFetch(in).asInstanceOf[BSONDocument]
//    val msgData = MessageProtocol.parseFrom(doc.as[org.bson.types.Binary]("message").getData)
//    val msg = MessageSerializer.deserialize(system, msgData).toString()
    val text1 = doc.as[String]("text1")
    val text2 = doc.as[String]("text2")
    val text3 = doc.as[String]("text3")
    val text4 = doc.as[String]("text4")
    val text5 = doc.as[String]("text5")
    val text6 = doc.as[String]("text6")
    val text7 = doc.as[String]("text7")
    val text8 = doc.as[String]("text8")
    val text9 = doc.as[String]("text9")

    Message(text1, text2, text3, text4, text5, text6, text7, text8, text9)
  }

  def checkObject(msg: Message, isQuery: Boolean = false) = {} // object expected to be OK with this message type.

  def checkKeys(msg: Message) {} // keys expected to be OK with this message type.

  /**
   * Checks for an ID and generates one.
   * Not all implementers will need this, but it gets invoked nonetheless
   * as a signal to BSONDocument, etc implementations to verify an id is there
   * and generate one if needed.
   */
  def checkID(msg: Message) = msg // OID already generated in wrapper message

  def _id(msg: Message): Option[AnyRef] = Some(msg._id)
}


===================================================================











===================================================================

package shyarmal.mongo.driver.test.scala

import com.mongodb.async.{WriteResult, MongoConnection}
import org.bson.types.ObjectId
import akka.dispatch.Envelope._
import akka.dispatch.Envelope
import akka.actor.{ActorRef, ActorSystem}

/**
  * Created by IntelliJ IDEA.
  * User: danuka
  * Date: 6/11/12
  * Time: 4:26 PM
  * To change this template use File | Settings | File Templates.
  */

case class Message (
                     text1: String,
                     text2: String,
                     text3: String,
                     text4: String,
                     text5: String,
                     text6: String,
                     text7: String,
                     text8: String,
                     text9: String,
                     _id: ObjectId = new ObjectId
                     )  extends BSONSerializableMessageQueue {
  def this() = this("", "", "","", "", "","", "", "")
//  def envelope(system: ActorSystem) = Envelope(text2, text3)(system)
}


===================================================================










===================================================================

package shyarmal.mongo.driver.test.scala

import com.mongodb.async.futures.RequestFutures
import com.mongodb.async.{WriteResult, MongoConnection}
import akka.AkkaException
import com.mongodb.async._
import com.mongodb.async.futures.RequestFutures
import org.bson.collection._
import akka.config.ConfigurationException
import akka.dispatch._
import akka.util.Duration
import akka.event.{EventStream, Logging}
import akka.remote.MessageSerializer
import akka.actor._
import com.typesafe.config.{ConfigFactory, Config}
import com.mongodb.async.Cursor.Entry
import java.util.concurrent.{ScheduledExecutorService, Executors, TimeUnit, TimeoutException}

/**
  * Created by IntelliJ IDEA.
  * User: danuka
  * Date: 6/11/12
  * Time: 4:26 PM
  * To change this template use File | Settings | File Templates.
  */

class MongoDriverTest {


//  val log = Logging(system, "MongoDriverTest")
  var mongo : Collection = connect()


  private def connect() = {
//    log.info("CONNECTING mongodb uri : [{}]", settings.MongoURI)
    val name : String = "test";
//    val _dbh = MongoConnection.fromURI(settings.MongoURI) match {
    val _dbh = MongoConnection.fromURI("mongodb://localhost:27017/mongoquest") match {
      case (conn, None, None) ⇒ {
        throw new UnsupportedOperationException("You must specify a database name to use with MongoDB; please see the MongoDB Connection URI Spec: 'http://www.mongodb.org/display/DOCS/Connections'")
      }
      case (conn, Some(db), Some(coll)) ⇒ {
//        log.warning("Collection name ({}) specified in MongoURI Config will be used as a prefix for mailbox names", coll.name)
        db("%s.%s".format(coll.name, name))
      }
      case (conn, Some(db), None) ⇒ {
        db("mailbox.%s".format(name))
      }
      case default ⇒ throw new IllegalArgumentException("Illegal or unexpected response from Mongo Connection URI Parser: %s".format(default))
    }
//    log.debug("CONNECTED to mongodb { dbh: '%s | %s'} ".format(_dbh, _dbh.name))
//    println(_dbh.db.name)
//    println(_dbh.name)
//    println(_dbh.writeConcern)
    _dbh
  }

  def main(args: Array[String]) {
    val insertExecutor : ScheduledExecutorService = Executors.newScheduledThreadPool(10)
//    val deleteExecutor : ScheduledExecutorService = Executors.newScheduledThreadPool(2)
//    deleteExecutor.scheduleWithFixedDelay(new Runnable() {
//      def run() {
//        delete();
//      }
//    }, 100, 100, TimeUnit.MILLISECONDS)
    insertExecutor.scheduleWithFixedDelay(new Runnable() {
      def run() {
        try {
          insert();
        } catch {
          case e : Throwable => {
            println(e)
          }
        }
       }
    }, 10, 1, TimeUnit.MILLISECONDS)
  }

  def find() {
    println(mongo.name)
    mongo.find(Document.empty, Document.empty)((cursor: Cursor[Document]) ⇒ {
//        print(cursor.next())
      var x : Entry[Message] = cursor.next.asInstanceOf[Entry[Message]]
      println(" === " + (x.doc != null))
      while (x != null) {
        val document : Document = x.doc.asInstanceOf[Document]
        println("==================")
        println(document.getOrElse("text1", "---"))
        println(document.getOrElse("text2", "---"))
        println(document.getOrElse("text3", "---"))
        println(document.getOrElse("_id", "---"))
        println("==================")
        x = cursor.next.asInstanceOf[Entry[Message]]
       }
    })
  }

  def insert() {
    mongo.insert(new Message("xxc", "dbec", "tuillak", "wec", "7ec", "t89ak", "xqwec", "ddfc", "zxcv"), false)(RequestFutures.write {
      wr: Either[Throwable, (Option[AnyRef], WriteResult)] ⇒
        wr match {
          case Right((oid, wr)) => {println("insert")}
          case Left(t) =>{println("yyyyyyyyyy")}
        }
    }) (mongo.writeConcern, new BSONSerializableMessageQueue())
  }

  def delete() {
    val doc : Document  = Document.empty
    doc.put("text1", "xxc")
    mongo.remove(doc, false) (RequestFutures.write {
      wr: Either[Throwable, (Option[AnyRef], WriteResult)] ⇒
        wr match {
          case Right((oid, wr)) => {println("delete")}
          case Left(t) =>{println("yyyyyyyyyy")}
        }
    })
  }

  def update() {
    println(mongo.name)
//    val q : Document  = Document.empty
//    q.put("text1", "text1")
//    val u : Document  = Document.empty
//    u.put("text1", "tup")
    //Document("text1" -> "upd", "text3" -> "u3", "text2" -> "u2")
    mongo.update(Document("text1" -> "up"), Document("$set" -> Document("text1" -> "upd")), false, true) (RequestFutures.write {
      wr: Either[Throwable, (Option[AnyRef], WriteResult)] =>
        wr match {
          case Right((oid, wr)) => {println("update")}
          case Left(t) =>{println("yyyyyyyyyy")}
        }
    })
  }
}


===================================================================



No comments:

Post a Comment