===================================================================
package shyarmal.mongo.driver.test.scala
import java.io.InputStream
import org.bson.collection.BSONDocument
import org.bson.io.BasicOutputBuffer
import org.bson.io.OutputBuffer
import org.bson.SerializableBSONObject
import org.bson.BSONSerializer
import org.bson.DefaultBSONDeserializer
import org.bson.DefaultBSONSerializer
import akka.remote.RemoteProtocol.MessageProtocol
import akka.remote.MessageSerializer
import akka.actor.{ActorSystem, ExtendedActorSystem}
import com.typesafe.config.{ConfigFactory, Config}
/**
* Created by IntelliJ IDEA.
* User: danuka
* Date: 6/12/12
* Time: 10:03 AM
* To change this template use File | Settings | File Templates.
*/
class BSONSerializableMessageQueue extends SerializableBSONObject[Message]{
// var c: Config = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$TestExtensionId\" ]").withFallback(AkkaSpec.testConf)
val system : ExtendedActorSystem = ActorSystem.create("JavaExtension", ConfigFactory.load.getConfig("remotelookup")).asInstanceOf[ExtendedActorSystem]
// val system : ExtendedActorSystem = ActorSystem.create("JavaExtension", null).asInstanceOf[ExtendedActorSystem]
// val system : ActorSystem = ActorSystem.create()
protected def serializeDurableMsg(msg: Message)(implicit serializer: BSONSerializer) = {
val b = Map.newBuilder[String, Any]
b += "_id" -> msg._id
b += "text1" -> msg.text1
b += "text2" -> msg.text2
b += "text3" -> msg.text3
b += "text4" -> msg.text4
b += "text5" -> msg.text5
b += "text6" -> msg.text6
b += "text7" -> msg.text7
b += "text8" -> msg.text8
b += "text9" -> msg.text9
/**
* TODO - Figure out a way for custom serialization of the message instance
* TODO - Test if a serializer is registered for the message and if not, use toByteString
*/
// val msgData = MessageSerializer.serialize(system, msg.text3.asInstanceOf[AnyRef])
// b += "message" -> new org.bson.types.Binary(0, msgData.toByteArray)
val doc = b.result
serializer.putObject(doc)
}
/*
* TODO - Implement some object pooling for the Encoders/decoders
*/
def encode(msg: Message, out: OutputBuffer) = {
implicit val serializer = new DefaultBSONSerializer
serializer.set(out)
serializeDurableMsg(msg)
serializer.done
}
def encode(msg: Message): Array[Byte] = {
implicit val serializer = new DefaultBSONSerializer
val buf = new BasicOutputBuffer
serializer.set(buf)
serializeDurableMsg(msg)
val bytes = buf.toByteArray
serializer.done
bytes
}
def decode(in: InputStream): Message = {
val deserializer = new DefaultBSONDeserializer
// TODO - Skip the whole doc step for performance, fun, and profit! (Needs Salat / custom Deser)
val doc = deserializer.decodeAndFetch(in).asInstanceOf[BSONDocument]
// val msgData = MessageProtocol.parseFrom(doc.as[org.bson.types.Binary]("message").getData)
// val msg = MessageSerializer.deserialize(system, msgData).toString()
val text1 = doc.as[String]("text1")
val text2 = doc.as[String]("text2")
val text3 = doc.as[String]("text3")
val text4 = doc.as[String]("text4")
val text5 = doc.as[String]("text5")
val text6 = doc.as[String]("text6")
val text7 = doc.as[String]("text7")
val text8 = doc.as[String]("text8")
val text9 = doc.as[String]("text9")
Message(text1, text2, text3, text4, text5, text6, text7, text8, text9)
}
def checkObject(msg: Message, isQuery: Boolean = false) = {} // object expected to be OK with this message type.
def checkKeys(msg: Message) {} // keys expected to be OK with this message type.
/**
* Checks for an ID and generates one.
* Not all implementers will need this, but it gets invoked nonetheless
* as a signal to BSONDocument, etc implementations to verify an id is there
* and generate one if needed.
*/
def checkID(msg: Message) = msg // OID already generated in wrapper message
def _id(msg: Message): Option[AnyRef] = Some(msg._id)
}
===================================================================
===================================================================
package shyarmal.mongo.driver.test.scala
import com.mongodb.async.{WriteResult, MongoConnection}
import org.bson.types.ObjectId
import akka.dispatch.Envelope._
import akka.dispatch.Envelope
import akka.actor.{ActorRef, ActorSystem}
/**
* Created by IntelliJ IDEA.
* User: danuka
* Date: 6/11/12
* Time: 4:26 PM
* To change this template use File | Settings | File Templates.
*/
case class Message (
text1: String,
text2: String,
text3: String,
text4: String,
text5: String,
text6: String,
text7: String,
text8: String,
text9: String,
_id: ObjectId = new ObjectId
) extends BSONSerializableMessageQueue {
def this() = this("", "", "","", "", "","", "", "")
// def envelope(system: ActorSystem) = Envelope(text2, text3)(system)
}
===================================================================
===================================================================
package shyarmal.mongo.driver.test.scala
import com.mongodb.async.futures.RequestFutures
import com.mongodb.async.{WriteResult, MongoConnection}
import akka.AkkaException
import com.mongodb.async._
import com.mongodb.async.futures.RequestFutures
import org.bson.collection._
import akka.config.ConfigurationException
import akka.dispatch._
import akka.util.Duration
import akka.event.{EventStream, Logging}
import akka.remote.MessageSerializer
import akka.actor._
import com.typesafe.config.{ConfigFactory, Config}
import com.mongodb.async.Cursor.Entry
import java.util.concurrent.{ScheduledExecutorService, Executors, TimeUnit, TimeoutException}
/**
* Created by IntelliJ IDEA.
* User: danuka
* Date: 6/11/12
* Time: 4:26 PM
* To change this template use File | Settings | File Templates.
*/
class MongoDriverTest {
// val log = Logging(system, "MongoDriverTest")
var mongo : Collection = connect()
private def connect() = {
// log.info("CONNECTING mongodb uri : [{}]", settings.MongoURI)
val name : String = "test";
// val _dbh = MongoConnection.fromURI(settings.MongoURI) match {
val _dbh = MongoConnection.fromURI("mongodb://localhost:27017/mongoquest") match {
case (conn, None, None) ⇒ {
throw new UnsupportedOperationException("You must specify a database name to use with MongoDB; please see the MongoDB Connection URI Spec: 'http://www.mongodb.org/display/DOCS/Connections'")
}
case (conn, Some(db), Some(coll)) ⇒ {
// log.warning("Collection name ({}) specified in MongoURI Config will be used as a prefix for mailbox names", coll.name)
db("%s.%s".format(coll.name, name))
}
case (conn, Some(db), None) ⇒ {
db("mailbox.%s".format(name))
}
case default ⇒ throw new IllegalArgumentException("Illegal or unexpected response from Mongo Connection URI Parser: %s".format(default))
}
// log.debug("CONNECTED to mongodb { dbh: '%s | %s'} ".format(_dbh, _dbh.name))
// println(_dbh.db.name)
// println(_dbh.name)
// println(_dbh.writeConcern)
_dbh
}
def main(args: Array[String]) {
val insertExecutor : ScheduledExecutorService = Executors.newScheduledThreadPool(10)
// val deleteExecutor : ScheduledExecutorService = Executors.newScheduledThreadPool(2)
// deleteExecutor.scheduleWithFixedDelay(new Runnable() {
// def run() {
// delete();
// }
// }, 100, 100, TimeUnit.MILLISECONDS)
insertExecutor.scheduleWithFixedDelay(new Runnable() {
def run() {
try {
insert();
} catch {
case e : Throwable => {
println(e)
}
}
}
}, 10, 1, TimeUnit.MILLISECONDS)
}
def find() {
println(mongo.name)
mongo.find(Document.empty, Document.empty)((cursor: Cursor[Document]) ⇒ {
// print(cursor.next())
var x : Entry[Message] = cursor.next.asInstanceOf[Entry[Message]]
println(" === " + (x.doc != null))
while (x != null) {
val document : Document = x.doc.asInstanceOf[Document]
println("==================")
println(document.getOrElse("text1", "---"))
println(document.getOrElse("text2", "---"))
println(document.getOrElse("text3", "---"))
println(document.getOrElse("_id", "---"))
println("==================")
x = cursor.next.asInstanceOf[Entry[Message]]
}
})
}
def insert() {
mongo.insert(new Message("xxc", "dbec", "tuillak", "wec", "7ec", "t89ak", "xqwec", "ddfc", "zxcv"), false)(RequestFutures.write {
wr: Either[Throwable, (Option[AnyRef], WriteResult)] ⇒
wr match {
case Right((oid, wr)) => {println("insert")}
case Left(t) =>{println("yyyyyyyyyy")}
}
}) (mongo.writeConcern, new BSONSerializableMessageQueue())
}
def delete() {
val doc : Document = Document.empty
doc.put("text1", "xxc")
mongo.remove(doc, false) (RequestFutures.write {
wr: Either[Throwable, (Option[AnyRef], WriteResult)] ⇒
wr match {
case Right((oid, wr)) => {println("delete")}
case Left(t) =>{println("yyyyyyyyyy")}
}
})
}
def update() {
println(mongo.name)
// val q : Document = Document.empty
// q.put("text1", "text1")
// val u : Document = Document.empty
// u.put("text1", "tup")
//Document("text1" -> "upd", "text3" -> "u3", "text2" -> "u2")
mongo.update(Document("text1" -> "up"), Document("$set" -> Document("text1" -> "upd")), false, true) (RequestFutures.write {
wr: Either[Throwable, (Option[AnyRef], WriteResult)] =>
wr match {
case Right((oid, wr)) => {println("update")}
case Left(t) =>{println("yyyyyyyyyy")}
}
})
}
}
===================================================================
package shyarmal.mongo.driver.test.scala
import java.io.InputStream
import org.bson.collection.BSONDocument
import org.bson.io.BasicOutputBuffer
import org.bson.io.OutputBuffer
import org.bson.SerializableBSONObject
import org.bson.BSONSerializer
import org.bson.DefaultBSONDeserializer
import org.bson.DefaultBSONSerializer
import akka.remote.RemoteProtocol.MessageProtocol
import akka.remote.MessageSerializer
import akka.actor.{ActorSystem, ExtendedActorSystem}
import com.typesafe.config.{ConfigFactory, Config}
/**
* Created by IntelliJ IDEA.
* User: danuka
* Date: 6/12/12
* Time: 10:03 AM
* To change this template use File | Settings | File Templates.
*/
class BSONSerializableMessageQueue extends SerializableBSONObject[Message]{
// var c: Config = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$TestExtensionId\" ]").withFallback(AkkaSpec.testConf)
val system : ExtendedActorSystem = ActorSystem.create("JavaExtension", ConfigFactory.load.getConfig("remotelookup")).asInstanceOf[ExtendedActorSystem]
// val system : ExtendedActorSystem = ActorSystem.create("JavaExtension", null).asInstanceOf[ExtendedActorSystem]
// val system : ActorSystem = ActorSystem.create()
protected def serializeDurableMsg(msg: Message)(implicit serializer: BSONSerializer) = {
val b = Map.newBuilder[String, Any]
b += "_id" -> msg._id
b += "text1" -> msg.text1
b += "text2" -> msg.text2
b += "text3" -> msg.text3
b += "text4" -> msg.text4
b += "text5" -> msg.text5
b += "text6" -> msg.text6
b += "text7" -> msg.text7
b += "text8" -> msg.text8
b += "text9" -> msg.text9
/**
* TODO - Figure out a way for custom serialization of the message instance
* TODO - Test if a serializer is registered for the message and if not, use toByteString
*/
// val msgData = MessageSerializer.serialize(system, msg.text3.asInstanceOf[AnyRef])
// b += "message" -> new org.bson.types.Binary(0, msgData.toByteArray)
val doc = b.result
serializer.putObject(doc)
}
/*
* TODO - Implement some object pooling for the Encoders/decoders
*/
def encode(msg: Message, out: OutputBuffer) = {
implicit val serializer = new DefaultBSONSerializer
serializer.set(out)
serializeDurableMsg(msg)
serializer.done
}
def encode(msg: Message): Array[Byte] = {
implicit val serializer = new DefaultBSONSerializer
val buf = new BasicOutputBuffer
serializer.set(buf)
serializeDurableMsg(msg)
val bytes = buf.toByteArray
serializer.done
bytes
}
def decode(in: InputStream): Message = {
val deserializer = new DefaultBSONDeserializer
// TODO - Skip the whole doc step for performance, fun, and profit! (Needs Salat / custom Deser)
val doc = deserializer.decodeAndFetch(in).asInstanceOf[BSONDocument]
// val msgData = MessageProtocol.parseFrom(doc.as[org.bson.types.Binary]("message").getData)
// val msg = MessageSerializer.deserialize(system, msgData).toString()
val text1 = doc.as[String]("text1")
val text2 = doc.as[String]("text2")
val text3 = doc.as[String]("text3")
val text4 = doc.as[String]("text4")
val text5 = doc.as[String]("text5")
val text6 = doc.as[String]("text6")
val text7 = doc.as[String]("text7")
val text8 = doc.as[String]("text8")
val text9 = doc.as[String]("text9")
Message(text1, text2, text3, text4, text5, text6, text7, text8, text9)
}
def checkObject(msg: Message, isQuery: Boolean = false) = {} // object expected to be OK with this message type.
def checkKeys(msg: Message) {} // keys expected to be OK with this message type.
/**
* Checks for an ID and generates one.
* Not all implementers will need this, but it gets invoked nonetheless
* as a signal to BSONDocument, etc implementations to verify an id is there
* and generate one if needed.
*/
def checkID(msg: Message) = msg // OID already generated in wrapper message
def _id(msg: Message): Option[AnyRef] = Some(msg._id)
}
===================================================================
===================================================================
package shyarmal.mongo.driver.test.scala
import com.mongodb.async.{WriteResult, MongoConnection}
import org.bson.types.ObjectId
import akka.dispatch.Envelope._
import akka.dispatch.Envelope
import akka.actor.{ActorRef, ActorSystem}
/**
* Created by IntelliJ IDEA.
* User: danuka
* Date: 6/11/12
* Time: 4:26 PM
* To change this template use File | Settings | File Templates.
*/
case class Message (
text1: String,
text2: String,
text3: String,
text4: String,
text5: String,
text6: String,
text7: String,
text8: String,
text9: String,
_id: ObjectId = new ObjectId
) extends BSONSerializableMessageQueue {
def this() = this("", "", "","", "", "","", "", "")
// def envelope(system: ActorSystem) = Envelope(text2, text3)(system)
}
===================================================================
===================================================================
package shyarmal.mongo.driver.test.scala
import com.mongodb.async.futures.RequestFutures
import com.mongodb.async.{WriteResult, MongoConnection}
import akka.AkkaException
import com.mongodb.async._
import com.mongodb.async.futures.RequestFutures
import org.bson.collection._
import akka.config.ConfigurationException
import akka.dispatch._
import akka.util.Duration
import akka.event.{EventStream, Logging}
import akka.remote.MessageSerializer
import akka.actor._
import com.typesafe.config.{ConfigFactory, Config}
import com.mongodb.async.Cursor.Entry
import java.util.concurrent.{ScheduledExecutorService, Executors, TimeUnit, TimeoutException}
/**
* Created by IntelliJ IDEA.
* User: danuka
* Date: 6/11/12
* Time: 4:26 PM
* To change this template use File | Settings | File Templates.
*/
class MongoDriverTest {
// val log = Logging(system, "MongoDriverTest")
var mongo : Collection = connect()
private def connect() = {
// log.info("CONNECTING mongodb uri : [{}]", settings.MongoURI)
val name : String = "test";
// val _dbh = MongoConnection.fromURI(settings.MongoURI) match {
val _dbh = MongoConnection.fromURI("mongodb://localhost:27017/mongoquest") match {
case (conn, None, None) ⇒ {
throw new UnsupportedOperationException("You must specify a database name to use with MongoDB; please see the MongoDB Connection URI Spec: 'http://www.mongodb.org/display/DOCS/Connections'")
}
case (conn, Some(db), Some(coll)) ⇒ {
// log.warning("Collection name ({}) specified in MongoURI Config will be used as a prefix for mailbox names", coll.name)
db("%s.%s".format(coll.name, name))
}
case (conn, Some(db), None) ⇒ {
db("mailbox.%s".format(name))
}
case default ⇒ throw new IllegalArgumentException("Illegal or unexpected response from Mongo Connection URI Parser: %s".format(default))
}
// log.debug("CONNECTED to mongodb { dbh: '%s | %s'} ".format(_dbh, _dbh.name))
// println(_dbh.db.name)
// println(_dbh.name)
// println(_dbh.writeConcern)
_dbh
}
def main(args: Array[String]) {
val insertExecutor : ScheduledExecutorService = Executors.newScheduledThreadPool(10)
// val deleteExecutor : ScheduledExecutorService = Executors.newScheduledThreadPool(2)
// deleteExecutor.scheduleWithFixedDelay(new Runnable() {
// def run() {
// delete();
// }
// }, 100, 100, TimeUnit.MILLISECONDS)
insertExecutor.scheduleWithFixedDelay(new Runnable() {
def run() {
try {
insert();
} catch {
case e : Throwable => {
println(e)
}
}
}
}, 10, 1, TimeUnit.MILLISECONDS)
}
def find() {
println(mongo.name)
mongo.find(Document.empty, Document.empty)((cursor: Cursor[Document]) ⇒ {
// print(cursor.next())
var x : Entry[Message] = cursor.next.asInstanceOf[Entry[Message]]
println(" === " + (x.doc != null))
while (x != null) {
val document : Document = x.doc.asInstanceOf[Document]
println("==================")
println(document.getOrElse("text1", "---"))
println(document.getOrElse("text2", "---"))
println(document.getOrElse("text3", "---"))
println(document.getOrElse("_id", "---"))
println("==================")
x = cursor.next.asInstanceOf[Entry[Message]]
}
})
}
def insert() {
mongo.insert(new Message("xxc", "dbec", "tuillak", "wec", "7ec", "t89ak", "xqwec", "ddfc", "zxcv"), false)(RequestFutures.write {
wr: Either[Throwable, (Option[AnyRef], WriteResult)] ⇒
wr match {
case Right((oid, wr)) => {println("insert")}
case Left(t) =>{println("yyyyyyyyyy")}
}
}) (mongo.writeConcern, new BSONSerializableMessageQueue())
}
def delete() {
val doc : Document = Document.empty
doc.put("text1", "xxc")
mongo.remove(doc, false) (RequestFutures.write {
wr: Either[Throwable, (Option[AnyRef], WriteResult)] ⇒
wr match {
case Right((oid, wr)) => {println("delete")}
case Left(t) =>{println("yyyyyyyyyy")}
}
})
}
def update() {
println(mongo.name)
// val q : Document = Document.empty
// q.put("text1", "text1")
// val u : Document = Document.empty
// u.put("text1", "tup")
//Document("text1" -> "upd", "text3" -> "u3", "text2" -> "u2")
mongo.update(Document("text1" -> "up"), Document("$set" -> Document("text1" -> "upd")), false, true) (RequestFutures.write {
wr: Either[Throwable, (Option[AnyRef], WriteResult)] =>
wr match {
case Right((oid, wr)) => {println("update")}
case Left(t) =>{println("yyyyyyyyyy")}
}
})
}
}
===================================================================
No comments:
Post a Comment