Skip to content

Commit

Permalink
Полностью функционируют сокеты
Browse files Browse the repository at this point in the history
  • Loading branch information
avvero committed Jan 17, 2014
1 parent cec2367 commit e5657c2
Show file tree
Hide file tree
Showing 13 changed files with 1,876 additions and 222 deletions.
563 changes: 396 additions & 167 deletions .idea/workspace.xml

Large diffs are not rendered by default.

9 changes: 2 additions & 7 deletions grails-app/conf/BootStrap.groovy
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
class BootStrap {

def socketEventService
def mongoEventService

def init = { servletContext ->
// socketLogService.start()
// mongoEventService.start()

}
def destroy = {
// socketLogService.stop()
// mongoEventService.stop()

}
}
4 changes: 2 additions & 2 deletions grails-app/controllers/com/avvero/longo/FlowController.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ class FlowController {
}

def connectToSocketCollector() {
def connection = MongoConnectionConfig.findById(params.id)
Collector collector = CollectorFactory.getCollector(connection)
def socket = Socket.findById(params.id)
Collector collector = CollectorFactory.getCollector(socket)
redirect(controller: "flow", action: "index", params: [collector: collector.getName()])
}

Expand Down
5 changes: 5 additions & 0 deletions grails-app/domain/com/avvero/longo/Socket.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@ class Socket {

static constraints = {
}

def getKey() {
return "socket:" + this.host+":" + this.port
}

}
34 changes: 0 additions & 34 deletions grails-app/services/com/avvero/longo/SocketEventService.groovy

This file was deleted.

Binary file added lib/log4mongo-java-0.7.4.jar
Binary file not shown.
468 changes: 468 additions & 0 deletions logs/stub.log

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions src/groovy/com/avvero/longo/Collector.groovy
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.avvero.longo

import org.apache.commons.logging.LogFactory
import org.atmosphere.cpr.Broadcaster

/**
*
Expand Down Expand Up @@ -33,6 +34,13 @@ abstract class Collector {
this.listeners.add(listener)
}

synchronized void addListener(Broadcaster broadcaster) {
log.info("Add new listener to Collector " + getName())
this.listeners.add(getNewListener(broadcaster))
}

protected abstract Listener getNewListener(Broadcaster broadcaster)

synchronized void alertListener(doc) {
listeners.each {it->
it.handle(doc)
Expand Down
10 changes: 10 additions & 0 deletions src/groovy/com/avvero/longo/CollectorFactory.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,18 @@ class CollectorFactory {
Collector collector = collectors.get(config.getKey())
if (collector == null) {
collector = new MongoCollector(config)
collectors.put(collector.getName(), collector)
collector.start()
}
return collector;
}

public static synchronized Collector getCollector(Socket socket) {
Collector collector = collectors.get(socket.getKey())
if (collector == null) {
collector = new SocketCollector(socket)
collectors.put(collector.getName(), collector)
collector.start()
}
return collector;
}
Expand Down
10 changes: 1 addition & 9 deletions src/groovy/com/avvero/longo/LogItemHandler.groovy
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.avvero.longo

import com.mongodb.BasicDBObject
import grails.converters.JSON
import org.atmosphere.cpr.Broadcaster
import org.atmosphere.cpr.BroadcasterFactory
import org.atmosphere.cpr.DefaultBroadcaster
Expand All @@ -26,13 +24,7 @@ class LogItemHandler extends HttpServlet {
//XXX подумать лучше
String[] pars = request.getPathInfo().split("/")
Collector collector = CollectorFactory.getCollector(pars[1])
collector.addListener(new Listener() {
void handle(Object o) {
BasicDBObject dbObject = (BasicDBObject) o;
def log = dbObject.toString()
broadcaster.broadcast(log)
}
})
collector.addListener(broadcaster)
m.setBroadcaster(broadcaster)
}

Expand Down
20 changes: 17 additions & 3 deletions src/groovy/com/avvero/longo/MongoCollector.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.mongodb.BasicDBObject
import com.mongodb.DB
import com.mongodb.DBCursor
import org.apache.commons.logging.LogFactory
import org.atmosphere.cpr.Broadcaster

/**
*
Expand All @@ -19,7 +20,7 @@ class MongoCollector extends Collector {

MongoConnectionConfig config

MongoCollector (MongoConnectionConfig config) {
MongoCollector(MongoConnectionConfig config) {
this.config = config
}

Expand All @@ -38,7 +39,7 @@ class MongoCollector extends Collector {
void run() {
try {
while (cursor.hasNext()) {
BasicDBObject doc = (BasicDBObject)cursor.next()
BasicDBObject doc = (BasicDBObject) cursor.next()
alertListener(doc)
}
} finally {
Expand All @@ -53,7 +54,20 @@ class MongoCollector extends Collector {
log.info("Stop MongoCollector...")
try {
if (cursor != null) cursor.close();
} catch (final Throwable t) { }
} catch (final Throwable t) {
}
db.requestDone();
}

@Override
protected Listener getNewListener(Broadcaster broadcaster) {
return new Listener() {
void handle(Object o) {
BasicDBObject dbObject = (BasicDBObject) o;
def log = dbObject.toString()
broadcaster.broadcast(log)
}
}
}

}
92 changes: 92 additions & 0 deletions src/groovy/com/avvero/longo/SocketCollector.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.avvero.longo

import com.mongodb.DBObject
import org.apache.commons.logging.LogFactory
import org.apache.log4j.spi.LoggingEvent
import org.atmosphere.cpr.Broadcaster
import org.log4mongo.LoggingEventBsonifier
import org.log4mongo.LoggingEventBsonifierImpl

/**
*
* @author fxdev-belyaev-ay
*/
class SocketCollector extends Collector {

private static final log = LogFactory.getLog(this)

Socket socket
ServerSocket serverSocket

LoggingEventBsonifier bsonifier = new LoggingEventBsonifierImpl()

SocketCollector (Socket socket) {
this.socket = socket
}

@Override
String getName() {
return socket.getKey()
}

@Override
void _start() {
log.info("Start SocketCollector...")
log.info("Listening on port " + socket.port);
serverSocket = new ServerSocket(socket.port);
new Thread(new Runnable() {
@Override
void run() {
log.info("Waiting to accept a new client.");
java.net.Socket serverSocket = serverSocket.accept();
log.info("Connected to client at " + getName());
log.info("Starting new socket node.");
ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(serverSocket.getInputStream()));
try {
LoggingEvent event;
if (ois != null) {
while(true) {
// read an event from the wire
event = (LoggingEvent) ois.readObject();
alertListener(event)
}
}
} finally {
if (ois != null) {
try {
ois.close();
} catch(Exception e) {
log.info("Could not close connection.", e);
}
}
if (socket != null) {
try {
socket.close();
} catch(InterruptedIOException e) {
Thread.currentThread().interrupt();
} catch(IOException ex) {
}
}
}
}
}).start()
}

@Override
void _stop() {
log.info("Stop SocketCollector...")
serverSocket.close()
}

@Override
protected Listener getNewListener(Broadcaster broadcaster) {
return new Listener() {
void handle(Object o) {
LoggingEvent event = (LoggingEvent) o;
DBObject bson = bsonifier.bsonify(event);
def log = bson.toString()
broadcaster.broadcast(log)
}
}
}
}
Loading

0 comments on commit e5657c2

Please sign in to comment.