Coverage Summary for Class: NodeMessageHandler (co.rsk.net)

Class Method, % Line, %
NodeMessageHandler 7.7% (1/13) 3.2% (3/94)
NodeMessageHandler$MessageTask 0% (0/5) 0% (0/8)
NodeMessageHandler$MessageTask$TaskComparator 0% (0/2) 0% (0/2)
NodeMessageHandler$MockitoMock$796703459
NodeMessageHandler$MockitoMock$796703459$auxiliary$7r2GpuQ4
NodeMessageHandler$MockitoMock$796703459$auxiliary$Cfa3RQTL
NodeMessageHandler$MockitoMock$796703459$auxiliary$cu4X1i0D
NodeMessageHandler$MockitoMock$796703459$auxiliary$CW2FkiME
NodeMessageHandler$MockitoMock$796703459$auxiliary$lKVfoGC9
NodeMessageHandler$MockitoMock$796703459$auxiliary$ortPtwVO
NodeMessageHandler$MockitoMock$796703459$auxiliary$ozDmBtdn
NodeMessageHandler$MockitoMock$796703459$auxiliary$txZhyyAC
Total 5% (1/20) 2.9% (3/104)


1 /* 2  * This file is part of RskJ 3  * Copyright (C) 2017 RSK Labs Ltd. 4  * 5  * This program is free software: you can redistribute it and/or modify 6  * it under the terms of the GNU Lesser General Public License as published by 7  * the Free Software Foundation, either version 3 of the License, or 8  * (at your option) any later version. 9  * 10  * This program is distributed in the hope that it will be useful, 11  * but WITHOUT ANY WARRANTY; without even the implied warranty of 12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13  * GNU Lesser General Public License for more details. 14  * 15  * You should have received a copy of the GNU Lesser General Public License 16  * along with this program. If not, see <http://www.gnu.org/licenses/>. 17  */ 18  19 package co.rsk.net; 20  21 import co.rsk.config.InternalService; 22 import co.rsk.config.RskSystemProperties; 23 import co.rsk.core.bc.BlockUtils; 24 import co.rsk.crypto.Keccak256; 25 import co.rsk.net.messages.*; 26 import co.rsk.scoring.EventType; 27 import co.rsk.scoring.PeerScoringManager; 28 import co.rsk.util.FormatUtils; 29 import org.ethereum.crypto.HashUtil; 30 import org.ethereum.net.server.ChannelManager; 31 import org.slf4j.Logger; 32 import org.slf4j.LoggerFactory; 33 import javax.annotation.Nonnull; 34 import javax.annotation.Nullable; 35 import java.time.Duration; 36 import java.util.*; 37 import java.util.concurrent.PriorityBlockingQueue; 38 import java.util.concurrent.TimeUnit; 39  40 public class NodeMessageHandler implements MessageHandler, InternalService, Runnable { 41  private static final Logger logger = LoggerFactory.getLogger("messagehandler"); 42  private static final Logger loggerMessageProcess = LoggerFactory.getLogger("messageProcess"); 43  44  private static final int MAX_NUMBER_OF_MESSAGES_CACHED = 5000; 45  private static final long RECEIVED_MESSAGES_CACHE_DURATION = TimeUnit.MINUTES.toMillis(2); 46  47  private final RskSystemProperties config; 48  private final BlockProcessor blockProcessor; 49  private final SyncProcessor syncProcessor; 50  private final ChannelManager channelManager; 51  private final TransactionGateway transactionGateway; 52  private final PeerScoringManager peerScoringManager; 53  54  private volatile long lastStatusSent = System.currentTimeMillis(); 55  private volatile long lastTickSent = System.currentTimeMillis(); 56  57  private final StatusResolver statusResolver; 58  private Set<Keccak256> receivedMessages = Collections.synchronizedSet(new HashSet<>()); 59  private long cleanMsgTimestamp = 0; 60  61  private PriorityBlockingQueue<MessageTask> queue; 62  63  private volatile boolean stopped; 64  65  /** 66  * @param statusResolver 67  */ 68  public NodeMessageHandler(RskSystemProperties config, 69  final BlockProcessor blockProcessor, 70  final SyncProcessor syncProcessor, 71  @Nullable final ChannelManager channelManager, 72  @Nullable final TransactionGateway transactionGateway, 73  @Nullable final PeerScoringManager peerScoringManager, 74  StatusResolver statusResolver) { 75  this.config = config; 76  this.channelManager = channelManager; 77  this.blockProcessor = blockProcessor; 78  this.syncProcessor = syncProcessor; 79  this.transactionGateway = transactionGateway; 80  this.statusResolver = statusResolver; 81  this.cleanMsgTimestamp = System.currentTimeMillis(); 82  this.peerScoringManager = peerScoringManager; 83  this.queue = new PriorityBlockingQueue<>(11, new MessageTask.TaskComparator()); 84  } 85  86  /** 87  * processMessage processes a RSK Message, doing the appropriate action based on the message type. 88  * 89  * @param sender the message sender. 90  * @param message the message to be processed. 91  */ 92  public synchronized void processMessage(final Peer sender, @Nonnull final Message message) { 93  long start = System.nanoTime(); 94  MessageType messageType = message.getMessageType(); 95  logger.trace("Process message type: {}", messageType); 96  97  MessageVisitor mv = new MessageVisitor(config, 98  blockProcessor, 99  syncProcessor, 100  transactionGateway, 101  peerScoringManager, 102  channelManager, 103  sender); 104  message.accept(mv); 105  106  long processTime = System.nanoTime() - start; 107  String timeInSeconds = FormatUtils.formatNanosecondsToSeconds(processTime); 108  109  if ((messageType == MessageType.BLOCK_MESSAGE || messageType == MessageType.BODY_RESPONSE_MESSAGE) && BlockUtils.tooMuchProcessTime(processTime)) { 110  loggerMessageProcess.warn("Message[{}] processed after [{}] seconds.", message.getMessageType(), timeInSeconds); 111  } 112  else { 113  loggerMessageProcess.debug("Message[{}] processed after [{}] seconds.", message.getMessageType(), timeInSeconds); 114  } 115  } 116  117  @Override 118  public void postMessage(Peer sender, Message message) { 119  logger.trace("Start post message (queue size {}) (message type {})", this.queue.size(), message.getMessageType()); 120  // There's an obvious race condition here, but fear not. 121  // receivedMessages and logger are thread-safe 122  // cleanMsgTimestamp is a long replaced by the next value, we don't care 123  // enough about the precision of the value it takes 124  cleanExpiredMessages(); 125  tryAddMessage(sender, message); 126  logger.trace("End post message (queue size {})", this.queue.size()); 127  } 128  129  private void tryAddMessage(Peer sender, Message message) { 130  Keccak256 encodedMessage = new Keccak256(HashUtil.keccak256(message.getEncoded())); 131  if (!receivedMessages.contains(encodedMessage)) { 132  if (message.getMessageType() == MessageType.BLOCK_MESSAGE || message.getMessageType() == MessageType.TRANSACTIONS) { 133  if (this.receivedMessages.size() >= MAX_NUMBER_OF_MESSAGES_CACHED) { 134  this.receivedMessages.clear(); 135  } 136  this.receivedMessages.add(encodedMessage); 137  } 138  139  double score = sender.score(System.currentTimeMillis(), message.getMessageType()); 140  141  this.addMessage(sender, message, score); 142  } else { 143  recordEvent(sender, EventType.REPEATED_MESSAGE); 144  logger.trace("Received message already known, not added to the queue"); 145  } 146  } 147  148  private void addMessage(Peer sender, Message message, double score) { 149  if (score >= 0 && !this.queue.offer(new MessageTask(sender, message, score))) { 150  logger.warn("Unexpected path. Is message queue bounded now?"); 151  } 152  } 153  154  private void cleanExpiredMessages() { 155  long currentTime = System.currentTimeMillis(); 156  if (currentTime - cleanMsgTimestamp > RECEIVED_MESSAGES_CACHE_DURATION) { 157  logger.trace("Cleaning {} messages from rlp queue", receivedMessages.size()); 158  receivedMessages.clear(); 159  cleanMsgTimestamp = currentTime; 160  } 161  } 162  163  @Override 164  public void start() { 165  new Thread(this,"message handler").start(); 166  } 167  168  @Override 169  public void stop() { 170  this.stopped = true; 171  } 172  173  @Override 174  public long getMessageQueueSize() { 175  return this.queue.size(); 176  } 177  178  @Override 179  public void run() { 180  while (!stopped) { 181  MessageTask task = null; 182  try { 183  logger.trace("Get task"); 184  185  task = this.queue.poll(1, TimeUnit.SECONDS); 186  187  loggerMessageProcess.debug("Queued Messages: {}", this.queue.size()); 188  189  if (task != null) { 190  logger.trace("Start task"); 191  this.processMessage(task.getSender(), task.getMessage()); 192  logger.trace("End task"); 193  } else { 194  logger.trace("No task"); 195  } 196  197  updateTimedEvents(); 198  } 199  catch (Exception ex) { 200  logger.error("Unexpected error processing: {}", task, ex); 201  } 202  } 203  } 204  205  private void updateTimedEvents() { 206  Long now = System.currentTimeMillis(); 207  Duration timeTick = Duration.ofMillis(now - lastTickSent); 208  // TODO(lsebrie): handle timeouts properly 209  lastTickSent = now; 210  if (queue.isEmpty()) { 211  this.syncProcessor.onTimePassed(timeTick); 212  } 213  214  //Refresh status to peers every 10 seconds or so 215  Duration timeStatus = Duration.ofMillis(now - lastStatusSent); 216  if (timeStatus.getSeconds() > 10) { 217  Status status = statusResolver.currentStatus(); 218  logger.trace("Sending status best block to all {} {}", 219  status.getBestBlockNumber(), 220  status.getBestBlockHash()); 221  channelManager.broadcastStatus(status); 222  lastStatusSent = now; 223  } 224  } 225  226  private void recordEvent(Peer sender, EventType event) { 227  if (sender == null) { 228  return; 229  } 230  231  this.peerScoringManager.recordEvent(sender.getPeerNodeID(), sender.getAddress(), event); 232  } 233  234  private static class MessageTask { 235  private Peer sender; 236  private Message message; 237  private double score; 238  239  public MessageTask(Peer sender, Message message, double score) { 240  this.sender = sender; 241  this.message = message; 242  this.score = score; 243  } 244  245  public Peer getSender() { 246  return this.sender; 247  } 248  249  public Message getMessage() { 250  return this.message; 251  } 252  253  @Override 254  public String toString() { 255  return "MessageTask{" + 256  "sender=" + sender + 257  ", message=" + message + 258  '}'; 259  } 260  261  private static class TaskComparator implements Comparator<MessageTask> { 262  @Override 263  public int compare(MessageTask m1, MessageTask m2) { 264  return Double.compare(m2.score, m1.score); 265  } 266  } 267  268  } 269  270  271 } 272