Home Forums Kamanja Forums Data Science & Models Lag in the input and output messages in kafka

This topic contains 3 replies, has 4 voices, and was last updated by  Archived_User19 2 years, 2 months ago.

  • Author
  • #13405 Reply



    We have installed the twitter analysis uses cases in two set of boxes.
    Both sets are having very similar configuraiton. The only difference is one is having a two node setup and on is having 1 node setup.

    The one node setup is working fine and it doesnt have any lag in the input and output time lines.
    The two node setup are having some lag which started with 30 mins and now it is 2 + hours.

    We cant expect this delay as we need the output with less than 30 s lag.Could you please let us know what need to be done to reduce the lag.

  • #13406 Reply



    can u run some basic check try to ping between the 2 nodes , check the reply time or if there is any lost packet .


  • #13407 Reply


    That would be quite difficult to say as I’ve never seen this issue occur and I’m unable to reproduce it locally. Investigation into the cause would be needed. I have no knowledge of your twitter use case but in my tests I’ve never seen a lag of more than a few seconds.

  • #13408 Reply


    We found some issue with HBase write. It is getting hung when we try to write into HBase.

    This is what we found in Engine log and no log on this trhead after this.

    2015-11-17 11:04:19,922 – com.ligadata.KamanjaManager.ExecContextImpl [pool-5-thread-11] – INFO – ElapsedTimeCalc => UniqVal:{“Version”:1,”Offset”:1601}, ElapsedTimeFromRead:274, TransformElapsedTime:153

    Once we do the Thread dump this is what we found

    “pool-5-thread-11” prio=10 tid=0x00007f333806b000 nid=0x95ae in Object.wait() [0x00007f358d986000]
    java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at org.apache.hadoop.hbase.client.AsyncProcess.waitForNextTaskDone(AsyncProcess.java:853)
    – locked <0x00007f4c0d245f30> (a java.util.concurrent.atomic.AtomicLong)
    at org.apache.hadoop.hbase.client.AsyncProcess.waitForMaximumCurrentTasks(AsyncProcess.java:879)
    at org.apache.hadoop.hbase.client.AsyncProcess.waitUntilDone(AsyncProcess.java:892)
    at org.apache.hadoop.hbase.client.HTable.backgroundFlushCommits(HTable.java:968)
    at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1252)
    at org.apache.hadoop.hbase.client.HTable.close(HTable.java:1289)
    at com.ligadata.keyvaluestore.KeyValueHBase.putBatch(KeyValueHBase.scala:317)
    at com.ligadata.SimpleEnvContextImpl.SimpleEnvContextImpl$.commitData(SimpleEnvContextImpl.scala:1399)
    at com.ligadata.KamanjaManager.ExecContextImpl.execute(ExecContext.scala:110)
    at com.ligadata.InputAdapters.KafkaSimpleConsumer$$anonfun$StartProcessing$3$$anon$1$$anonfun$run$1$$anonfun$apply$1.apply$mcV$sp(KafkaSimpleConsumer.scala:241)
    at scala.util.control.Breaks.breakable(Breaks.scala:37)
    at com.ligadata.InputAdapters.KafkaSimpleConsumer$$anonfun$StartProcessing$3$$anon$1$$anonfun$run$1.apply(KafkaSimpleConsumer.scala:217)
    at com.ligadata.InputAdapters.KafkaSimpleConsumer$$anonfun$StartProcessing$3$$anon$1$$anonfun$run$1.apply(KafkaSimpleConsumer.scala:213)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at kafka.message.MessageSet.foreach(MessageSet.scala:67)
    at com.ligadata.InputAdapters.KafkaSimpleConsumer$$anonfun$StartProcessing$3$$anon$1.run(KafkaSimpleConsumer.scala:213)
    at scala.actors.threadpool.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1060)
    at scala.actors.threadpool.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:574)
    at java.lang.Thread.run(Thread.java:745)

Reply To: Lag in the input and output messages in kafka
Your information: