diff --git a/common/src/main/java/org/tron/common/parameter/CommonParameter.java b/common/src/main/java/org/tron/common/parameter/CommonParameter.java index 95a1eb2d0ae..8faf03d0fb9 100644 --- a/common/src/main/java/org/tron/common/parameter/CommonParameter.java +++ b/common/src/main/java/org/tron/common/parameter/CommonParameter.java @@ -614,6 +614,10 @@ public class CommonParameter { @Setter public long allowDelegateOptimization = 0L; + @Getter + @Setter + public boolean isStressTest = true; + @Getter @Setter public long unfreezeDelayDays = 0L; diff --git a/common/src/main/java/org/tron/core/Constant.java b/common/src/main/java/org/tron/core/Constant.java index 2cd9ea95f15..f7901db4d19 100644 --- a/common/src/main/java/org/tron/core/Constant.java +++ b/common/src/main/java/org/tron/core/Constant.java @@ -335,6 +335,7 @@ public class Constant { public static final String HISTORY_BALANCE_LOOKUP = "storage.balance.history.lookup"; public static final String OPEN_PRINT_LOG = "node.openPrintLog"; public static final String OPEN_TRANSACTION_SORT = "node.openTransactionSort"; + public static final String IS_STRESS_TEST = "node.isStressTest"; public static final String ALLOW_ACCOUNT_ASSET_OPTIMIZATION = "committee.allowAccountAssetOptimization"; public static final String ALLOW_ASSET_OPTIMIZATION = "committee.allowAssetOptimization"; diff --git a/consensus/src/main/java/org/tron/consensus/Consensus.java b/consensus/src/main/java/org/tron/consensus/Consensus.java index 7ccc5160e3d..589d396110a 100644 --- a/consensus/src/main/java/org/tron/consensus/Consensus.java +++ b/consensus/src/main/java/org/tron/consensus/Consensus.java @@ -1,5 +1,7 @@ package org.tron.consensus; +import com.google.protobuf.ByteString; +import java.util.List; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -39,4 +41,8 @@ public boolean applyBlock(BlockCapsule blockCapsule) { return consensusInterface.applyBlock(blockCapsule); } -} \ No newline at end of file + public void updateWitness(List list) { + dposService.updateWitness(list); + } + +} diff --git a/consensus/src/main/java/org/tron/consensus/base/Param.java b/consensus/src/main/java/org/tron/consensus/base/Param.java index f7b7de3d084..0779d066ad3 100644 --- a/consensus/src/main/java/org/tron/consensus/base/Param.java +++ b/consensus/src/main/java/org/tron/consensus/base/Param.java @@ -37,6 +37,9 @@ public class Param { @Getter @Setter private PbftInterface pbftInterface; + @Getter + @Setter + private boolean isStressTest; private Param() { @@ -77,4 +80,4 @@ public Miner(byte[] privateKey, ByteString privateKeyAddress, ByteString witness public Miner getMiner() { return miners.get(0); } -} \ No newline at end of file +} diff --git a/consensus/src/main/java/org/tron/consensus/dpos/DposService.java b/consensus/src/main/java/org/tron/consensus/dpos/DposService.java index 5ec6c7c554b..0f8ead870bc 100644 --- a/consensus/src/main/java/org/tron/consensus/dpos/DposService.java +++ b/consensus/src/main/java/org/tron/consensus/dpos/DposService.java @@ -61,6 +61,8 @@ public class DposService implements ConsensusInterface { @Getter private long genesisBlockTime; @Getter + private boolean isStressTest; + @Getter private BlockHandle blockHandle; @Getter private GenesisBlock genesisBlock; @@ -76,6 +78,7 @@ public void start(Param param) { this.blockHandle = param.getBlockHandle(); this.genesisBlock = param.getGenesisBlock(); this.genesisBlockTime = Long.parseLong(param.getGenesisBlock().getTimestamp()); + this.isStressTest = param.isStressTest(); param.getMiners().forEach(miner -> miners.put(miner.getWitnessAddress(), miner)); dposTask.setDposService(this); diff --git a/consensus/src/main/java/org/tron/consensus/dpos/DposSlot.java b/consensus/src/main/java/org/tron/consensus/dpos/DposSlot.java index ca63bc4aa65..de224ea8814 100644 --- a/consensus/src/main/java/org/tron/consensus/dpos/DposSlot.java +++ b/consensus/src/main/java/org/tron/consensus/dpos/DposSlot.java @@ -5,6 +5,7 @@ import static org.tron.core.config.Parameter.ChainConstant.SINGLE_REPEAT; import com.google.protobuf.ByteString; +import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -19,6 +20,7 @@ public class DposSlot { private ConsensusDelegate consensusDelegate; @Setter + @Getter private DposService dposService; public long getAbSlot(long time) { diff --git a/consensus/src/main/java/org/tron/consensus/dpos/MaintenanceManager.java b/consensus/src/main/java/org/tron/consensus/dpos/MaintenanceManager.java index 012169bdb87..38164ff6a98 100644 --- a/consensus/src/main/java/org/tron/consensus/dpos/MaintenanceManager.java +++ b/consensus/src/main/java/org/tron/consensus/dpos/MaintenanceManager.java @@ -192,7 +192,7 @@ private Map countVote(VotesStore votesStore) { } private void tryRemoveThePowerOfTheGr() { - if (consensusDelegate.getRemoveThePowerOfTheGr() != 1) { + if (consensusDelegate.getRemoveThePowerOfTheGr() != 1 || dposService.isStressTest()) { return; } dposService.getGenesisBlock().getWitnesses().forEach(witness -> { diff --git a/consensus/src/main/java/org/tron/consensus/dpos/StatisticManager.java b/consensus/src/main/java/org/tron/consensus/dpos/StatisticManager.java index f8155b3723d..a11569629df 100644 --- a/consensus/src/main/java/org/tron/consensus/dpos/StatisticManager.java +++ b/consensus/src/main/java/org/tron/consensus/dpos/StatisticManager.java @@ -38,19 +38,21 @@ public void applyBlock(BlockCapsule blockCapsule) { if (blockNum != 1) { slot = dposSlot.getSlot(blockTime); } - for (int i = 1; i < slot; ++i) { - byte[] witness = dposSlot.getScheduledWitness(i).toByteArray(); - wc = consensusDelegate.getWitness(witness); - wc.setTotalMissed(wc.getTotalMissed() + 1); - Metrics.counterInc(MetricKeys.Counter.MINER, 1, StringUtil.encode58Check(wc.getAddress() - .toByteArray()), - MetricLabels.Counter.MINE_MISS); - consensusDelegate.saveWitness(wc); - logger.info("Current block: {}, witness: {}, totalMissed: {}", blockNum, - StringUtil.encode58Check(wc.getAddress() - .toByteArray()), wc.getTotalMissed()); - consensusDelegate.applyBlock(false); + if (!dposSlot.getDposService().isStressTest() || slot < 5) { + for (int i = 1; i < slot; ++i) { + byte[] witness = dposSlot.getScheduledWitness(i).toByteArray(); + wc = consensusDelegate.getWitness(witness); + wc.setTotalMissed(wc.getTotalMissed() + 1); + Metrics.counterInc(MetricKeys.Counter.MINER, 1, StringUtil.encode58Check(wc.getAddress() + .toByteArray()), + MetricLabels.Counter.MINE_MISS); + consensusDelegate.saveWitness(wc); + logger.info("Current block: {}, witness: {}, totalMissed: {}", blockNum, + StringUtil.encode58Check(wc.getAddress() + .toByteArray()), wc.getTotalMissed()); + consensusDelegate.applyBlock(false); + } } consensusDelegate.applyBlock(true); } -} \ No newline at end of file +} diff --git a/framework/src/main/java/org/tron/core/Wallet.java b/framework/src/main/java/org/tron/core/Wallet.java index 96af6fc7476..e0f70fe4e52 100755 --- a/framework/src/main/java/org/tron/core/Wallet.java +++ b/framework/src/main/java/org/tron/core/Wallet.java @@ -550,6 +550,7 @@ public GrpcAPI.Return broadcastTransaction(Transaction signedTransaction) { if (trx.getInstance().getRawData().getContractCount() == 0) { throw new ContractValidateException(ActuatorConstant.CONTRACT_NOT_EXIST); } + trx.setVerified(true); dbManager.pushTransaction(trx); int num = tronNetService.fastBroadcastTransaction(message); if (num == 0 && minEffectiveConnection != 0) { diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index a8547b73948..927905563a5 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -1075,6 +1075,9 @@ public static void setParam(final String[] args, final String confFileName) { .hasPath(Constant.ALLOW_ASSET_OPTIMIZATION) ? config .getInt(Constant.ALLOW_ASSET_OPTIMIZATION) : 0; + PARAMETER.isStressTest = config.hasPath(Constant.IS_STRESS_TEST) ? config + .getBoolean(Constant.IS_STRESS_TEST) : true; + PARAMETER.disabledApiList = config.hasPath(Constant.NODE_DISABLED_API_LIST) ? config.getStringList(Constant.NODE_DISABLED_API_LIST) diff --git a/framework/src/main/java/org/tron/core/consensus/ConsensusService.java b/framework/src/main/java/org/tron/core/consensus/ConsensusService.java index ce1f1f1cf08..e0f43292117 100644 --- a/framework/src/main/java/org/tron/core/consensus/ConsensusService.java +++ b/framework/src/main/java/org/tron/core/consensus/ConsensusService.java @@ -44,6 +44,7 @@ public void start() { param.setBlockProduceTimeoutPercent(Args.getInstance().getBlockProducedTimeOut()); param.setNeedSyncCheck(parameter.isNeedSyncCheck()); param.setAgreeNodeCount(parameter.getAgreeNodeCount()); + param.setStressTest(parameter.isStressTest); List miners = new ArrayList<>(); List privateKeys = Args.getLocalWitnesses().getPrivateKeys(); if (privateKeys.size() > 1) { diff --git a/framework/src/main/java/org/tron/core/db/Manager.java b/framework/src/main/java/org/tron/core/db/Manager.java index 63bbef9ff7f..c0c52a3c18b 100644 --- a/framework/src/main/java/org/tron/core/db/Manager.java +++ b/framework/src/main/java/org/tron/core/db/Manager.java @@ -584,11 +584,18 @@ public void initGenesis() { if (chainBaseManager.containBlock(genesisBlock.getBlockId())) { Args.getInstance().setChainId(genesisBlock.getBlockId().toString()); } else { - if (chainBaseManager.hasBlocks()) { + if (chainBaseManager.hasBlocks() && !Args.getInstance().isStressTest) { logger.error( "Genesis block modify, please delete database directory({}) and restart.", Args.getInstance().getOutputDirectory()); System.exit(1); + } else if (Args.getInstance().isStressTest) { + this.initAccount(); + this.initWitness(); + List srList = new ArrayList<>(); + Args.getInstance().getGenesisBlock().getWitnesses().forEach( + witnessCapsule -> srList.add(ByteString.copyFrom(witnessCapsule.getAddress()))); + consensus.updateWitness(srList); } else { logger.info("Create genesis block."); Args.getInstance().setChainId(genesisBlock.getBlockId().toString()); @@ -1424,8 +1431,10 @@ public TransactionInfo processTransaction(final TransactionCapsule trxCap, Block chainBaseManager.getBalanceTraceStore().initCurrentTransactionBalanceTrace(trxCap); } - validateTapos(trxCap); - validateCommon(trxCap); + if (!Args.getInstance().isStressTest) { + validateTapos(trxCap); + validateCommon(trxCap); + } if (trxCap.getInstance().getRawData().getContractList().size() != 1) { throw new ContractSizeNotEqualToOneException( diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java index 5e797c084b3..acd49a2aea6 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java @@ -132,9 +132,9 @@ private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) thr if (type == MessageTypes.TRX) { for (Sha256Hash hash : fetchInvDataMsg.getHashList()) { - if (peer.getAdvInvSpread().getIfPresent(new Item(hash, InventoryType.TRX)) == null) { - throw new P2pException(TypeEnum.BAD_MESSAGE, "not spread inv: " + hash); - } +// if (peer.getAdvInvSpread().getIfPresent(new Item(hash, InventoryType.TRX)) == null) { +// throw new P2pException(TypeEnum.BAD_MESSAGE, "not spread inv: " + hash); +// } } int fetchCount = peer.getPeerStatistics().messageStatistics.tronInTrxFetchInvDataElement .getCount(10); diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java index a8ad8d0ec73..d21c24071ef 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java @@ -13,6 +13,9 @@ import org.tron.core.net.service.adv.AdvService; import org.tron.protos.Protocol.Inventory.InventoryType; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + @Slf4j(topic = "net") @Component public class InventoryMsgHandler implements TronMsgHandler { @@ -26,19 +29,23 @@ public class InventoryMsgHandler implements TronMsgHandler { @Autowired private TransactionsMsgHandler transactionsMsgHandler; + public static AtomicLong invCount = new AtomicLong(0); + @Override public void processMessage(PeerConnection peer, TronMessage msg) { InventoryMessage inventoryMessage = (InventoryMessage) msg; InventoryType type = inventoryMessage.getInventoryType(); - if (!check(peer, inventoryMessage)) { - return; - } +// if (!check(peer, inventoryMessage)) { +// return; +// } for (Sha256Hash id : inventoryMessage.getHashList()) { Item item = new Item(id, type); peer.getAdvInvReceive().put(item, System.currentTimeMillis()); - advService.addInv(item); + if (advService.addInv(item)) { + invCount.incrementAndGet(); + } } } diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java index 665381b31a8..8935ccfdfff 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java @@ -5,6 +5,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -29,7 +31,7 @@ @Component public class TransactionsMsgHandler implements TronMsgHandler { - private static int MAX_TRX_SIZE = 50_000; + private static int MAX_TRX_SIZE = 100_000; private static int MAX_SMART_CONTRACT_SUBMIT_SIZE = 100; @Autowired private TronNetDelegate tronNetDelegate; @@ -62,8 +64,13 @@ public boolean isBusy() { return queue.size() + smartContractQueue.size() > MAX_TRX_SIZE; } + public static AtomicLong trxCount = new AtomicLong(0); + public static AtomicLong failedCount = new AtomicLong(0); + + @Override public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException { + trxCount.incrementAndGet(); TransactionsMessage transactionsMessage = (TransactionsMessage) msg; check(peer, transactionsMessage); int smartContractQueueSize = 0; @@ -137,7 +144,9 @@ private void handleTransaction(PeerConnection peer, TransactionMessage trx) { peer.setBadPeer(true); peer.disconnect(ReasonCode.BAD_TX); } + failedCount.incrementAndGet(); } catch (Exception e) { + failedCount.incrementAndGet(); logger.error("Trx {} from peer {} process failed", trx.getMessageId(), peer.getInetAddress(), e); } diff --git a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java index 6743f00421d..29f3f1be5ba 100644 --- a/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java +++ b/framework/src/main/java/org/tron/core/net/peer/PeerConnection.java @@ -100,7 +100,7 @@ public class PeerConnection { @Getter private HelloMessage helloMessageSend; - private int invCacheSize = 20_000; + private int invCacheSize = 200_000; private long BAD_PEER_BAN_TIME = 3_600_000; diff --git a/framework/src/main/java/org/tron/core/net/service/adv/AdvService.java b/framework/src/main/java/org/tron/core/net/service/adv/AdvService.java index ea608c1ea86..83a9838a983 100644 --- a/framework/src/main/java/org/tron/core/net/service/adv/AdvService.java +++ b/framework/src/main/java/org/tron/core/net/service/adv/AdvService.java @@ -16,6 +16,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -32,6 +33,8 @@ import org.tron.core.net.message.adv.FetchInvDataMessage; import org.tron.core.net.message.adv.InventoryMessage; import org.tron.core.net.message.adv.TransactionMessage; +import org.tron.core.net.messagehandler.InventoryMsgHandler; +import org.tron.core.net.messagehandler.TransactionsMsgHandler; import org.tron.core.net.peer.Item; import org.tron.core.net.peer.PeerConnection; import org.tron.core.net.service.fetchblock.FetchBlockService; @@ -41,10 +44,10 @@ @Slf4j(topic = "net") @Component public class AdvService { - private final int MAX_INV_TO_FETCH_CACHE_SIZE = 100_000; - private final int MAX_TRX_CACHE_SIZE = 50_000; - private final int MAX_BLOCK_CACHE_SIZE = 10; - private final int MAX_SPREAD_SIZE = 1_000; + private final int MAX_INV_TO_FETCH_CACHE_SIZE = 300_000; + private final int MAX_TRX_CACHE_SIZE = 200_000; + private final int MAX_BLOCK_CACHE_SIZE = 40; + private final int MAX_SPREAD_SIZE = 400_000; private final long TIMEOUT = MSG_CACHE_DURATION_IN_BLOCKS * BLOCK_PRODUCED_INTERVAL; @Autowired @@ -83,11 +86,35 @@ public class AdvService { private MessageCount trxCount = new MessageCount(); private boolean fastForward = Args.getInstance().isFastForward(); + private static long lastLogTime = 0; + + private static long FETCH_INV_COUNT = 0; + private static long BROADCAST_COUNT = 0; + private static long INV_TO_FETCH = 0; + private static long INVENTORY_COUNT = 0; + private static long FAILED_COUNT = 0; public void init() { spreadExecutor.scheduleWithFixedDelay(() -> { try { + // 每隔1秒打印一次 + long currentTime = System.currentTimeMillis(); + if (currentTime - lastLogTime >= 1000) { + lastLogTime = currentTime; + logger.info("FETCH_INV_COUNT: {}, BROADCAST_COUNT: {}, INV_TO_FETCH: {}, INVENTORY_COUNT: {}, FAILED_COUNT: {}", + fetchInvCount.get() - FETCH_INV_COUNT, + broadcastCount.get() - BROADCAST_COUNT, + invToFetch.size() - INV_TO_FETCH, + InventoryMsgHandler.invCount.get() - INVENTORY_COUNT, + TransactionsMsgHandler.failedCount.get() - FAILED_COUNT); + } + + FETCH_INV_COUNT = fetchInvCount.get(); + BROADCAST_COUNT = broadcastCount.get(); + INV_TO_FETCH = invToFetch.size(); + INVENTORY_COUNT= InventoryMsgHandler.invCount.get(); + FAILED_COUNT = TransactionsMsgHandler.failedCount.get(); consumerInvToSpread(); } catch (Exception exception) { logger.error("Spread thread error", exception); @@ -101,6 +128,7 @@ public void init() { logger.error("Fetch thread error", exception); } }, 100, 30, TimeUnit.MILLISECONDS); + } public void close() { @@ -188,10 +216,10 @@ public void broadcast(Message msg) { return; } - if (invToSpread.size() > MAX_SPREAD_SIZE) { - logger.warn("Drop message, type: {}, ID: {}", msg.getType(), msg.getMessageId()); - return; - } +// if (invToSpread.size() > MAX_SPREAD_SIZE) { +// logger.warn("Drop message, type: {}, ID: {}", msg.getType(), msg.getMessageId()); +// return; +// } Item item; if (msg instanceof BlockMessage) { @@ -260,6 +288,7 @@ public void onDisconnect(PeerConnection peer) { } } + public static AtomicLong fetchInvCount = new AtomicLong(0); private void consumerInvToFetch() { Collection peers = tronNetDelegate.getActivePeer().stream() .filter(peer -> peer.isIdle()) @@ -269,6 +298,7 @@ private void consumerInvToFetch() { if (invToFetch.isEmpty() || peers.isEmpty()) { return; } + fetchInvCount.getAndAdd(invToFetch.size()); long now = System.currentTimeMillis(); invToFetch.forEach((item, time) -> { if (time < now - TIMEOUT) { @@ -294,6 +324,8 @@ private void consumerInvToFetch() { invSender.sendFetch(); } + public static AtomicLong broadcastCount = new AtomicLong(0); + private synchronized void consumerInvToSpread() { List peers = tronNetDelegate.getActivePeer().stream() @@ -304,6 +336,7 @@ private synchronized void consumerInvToSpread() { return; } + broadcastCount.getAndAdd(invToSpread.size()); InvSender invSender = new InvSender(); invToSpread.forEach((item, time) -> peers.forEach(peer -> { diff --git a/framework/src/main/resources/config.conf b/framework/src/main/resources/config.conf index 78427c30f87..2c086d430f8 100644 --- a/framework/src/main/resources/config.conf +++ b/framework/src/main/resources/config.conf @@ -575,7 +575,7 @@ localwitness = [ #] block = { - needSyncCheck = true + needSyncCheck = false maintenanceTimeInterval = 21600000 proposalExpireTime = 259200000 // 3 day: 259200000(ms) }