Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot committed Sep 11, 2024
1 parent 6a188e3 commit 4bd23b7
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 16 deletions.
5 changes: 3 additions & 2 deletions node/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ waves {
# List of IP addresses of well known nodes.
known-peers = []

# How long the information about peer stays in database after the last communication with it
peers-data-residence-time = 1d
# How long the information about peer stays in database after the last communication with it.
# This value should be big eno
peers-data-residence-time = 15m

# How long peer stays in blacklist after getting in it
black-list-residence-time = 15m
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.wavesplatform.network

import java.util
import java.util.concurrent.{ConcurrentMap, TimeUnit}

import com.wavesplatform.network.Handshake.InvalidHandshakeException
import com.wavesplatform.utils.ScorexLogging
import io.netty.buffer.ByteBuf
Expand All @@ -10,8 +13,6 @@ import io.netty.handler.codec.ReplayingDecoder
import io.netty.util.AttributeKey
import io.netty.util.concurrent.ScheduledFuture

import java.util
import java.util.concurrent.{ConcurrentMap, TimeUnit}
import scala.concurrent.duration.FiniteDuration

class HandshakeDecoder(peerDatabase: PeerDatabase) extends ReplayingDecoder[Void] with ScorexLogging {
Expand Down Expand Up @@ -98,17 +99,6 @@ abstract class HandshakeHandler(
val previousPeer = peerConnections.putIfAbsent(key, ctx.channel())
if (previousPeer == null) {
log.info(s"${id(ctx)} Accepted handshake $remoteHandshake")

(for {
rda <- remoteHandshake.declaredAddress
rdaAddress <- Option(rda.getAddress)
ctxAddress <- ctx.remoteAddress.map(_.getAddress)
if rdaAddress == ctxAddress
} yield rda).foreach { x =>
log.trace(s"${id(ctx)} Touching declared address $x")
peerDatabase.touch(x)
}

removeHandshakeHandlers(ctx, this)
establishedConnections.put(ctx.channel(), peerInfo(remoteHandshake, ctx.channel()))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ object NetworkServer extends ScorexLogging {

def handleConnectionAttempt(remoteAddress: InetSocketAddress)(thisConnFuture: ChannelFuture): Unit = {
if (thisConnFuture.isSuccess) {
log.trace(formatOutgoingChannelEvent(thisConnFuture.channel(), "Connection established"))
thisConnFuture.channel().closeFuture().addListener(f => handleOutgoingChannelClosed(remoteAddress)(f))
} else if (thisConnFuture.cause() != null) {
peerDatabase.suspend(remoteAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import scala.concurrent.duration.FiniteDuration

class PeerSynchronizer(peerDatabase: PeerDatabase, peerRequestInterval: FiniteDuration) extends ChannelInboundHandlerAdapter with ScorexLogging {
private var peersRequested = false
// declared address is not empty only when this is an outgoing channel, and its declared address matches remote address
private var declaredAddress = Option.empty[InetSocketAddress]

def requestPeers(ctx: ChannelHandlerContext): Unit = if (ctx.channel().isActive) {
peersRequested = true
Expand All @@ -20,8 +22,24 @@ class PeerSynchronizer(peerDatabase: PeerDatabase, peerRequestInterval: FiniteDu
}

override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = {
declaredAddress.foreach(peerDatabase.touch)
msg match {
case _: Handshake =>
case hs: Handshake =>
val rda = for {
rda <- hs.declaredAddress
rdaAddress <- Option(rda.getAddress)
ctxAddress <- ctx.remoteAddress.map(_.getAddress)
if rdaAddress == ctxAddress
} yield rda

rda match {
case None => log.debug(s"${id(ctx)} Declared address $rda does not match actual remote address ${ctx.remoteAddress.map(_.getAddress)}")
case Some(x) =>
log.trace(s"${id(ctx)} Touching declared address")
peerDatabase.touch(x)
declaredAddress = Some(x)
}

requestPeers(ctx)
super.channelRead(ctx, msg)
case GetPeers =>
Expand Down

0 comments on commit 4bd23b7

Please sign in to comment.