Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.flowforwarding.warp.demo.api;

import java.util.HashSet;

import org.flowforwarding.warp.controller.Controller;
import org.flowforwarding.warp.controller.session.SessionHandlerRef;

import org.flowforwarding.warp.controller.api.SessionHandler;
import org.flowforwarding.warp.controller.api.dynamic.DynamicMessageHandler;
import org.flowforwarding.warp.controller.api.fixed.SpecificVersionMessageHandler;

import org.flowforwarding.warp.protocol.adapter.IOFMessageProviderFactoryAdapter;
import org.flowforwarding.warp.protocol.adapter.JDriverMessage;
import org.flowforwarding.warp.protocol.adapter.IOFMessageProviderAdapter;
import org.flowforwarding.warp.protocol.ofmessages.OFMessageProviderFactoryAvroProtocol;

class JavaDynamicLauncher{
public static void main(String[] args){
IOFMessageProviderFactoryAdapter factory = new IOFMessageProviderFactoryAdapter(new OFMessageProviderFactoryAvroProtocol());

final SessionHandlerRef launcher = SessionHandler.makeRef(
factory,
new HashSet<DynamicMessageHandler<IOFMessageProviderAdapter, JDriverMessage>>()
{{
//add(new SimpleJavaDynamicHandler());
}},
new HashSet<SpecificVersionMessageHandler<?, JDriverMessage>>()
{{
add(new SimpleJavaOfp13Handler());
}});

Controller.launch(new HashSet<SessionHandlerRef>() {{
add(launcher);
}});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* В© 2013 FlowForwarding.Org
* All Rights Reserved. Use is subject to license terms.
*/
package org.flowforwarding.warp.demo.api;

import org.flowforwarding.warp.controller.api.dynamic.DynamicMessageHandler;
import org.flowforwarding.warp.protocol.adapter.JDriverMessage;
import org.flowforwarding.warp.protocol.adapter.JDriverMessageBuilder;
import org.flowforwarding.warp.protocol.adapter.IOFMessageProviderAdapter;

class SimpleJavaDynamicHandler extends DynamicMessageHandler<IOFMessageProviderAdapter, JDriverMessage> {
@Override
public short[] supportedVersions(){
return new short[] { 4 }; //1.3 only
}

@Override
public JDriverMessage[] onDynamicMessage(IOFMessageProviderAdapter driver, long dpid, JDriverMessage msg) {
if(msg.isTypeOf("ofp_switch_features_reply")){
System.out.println("DPID from dynamic message: " + msg.primitiveField("datapathId"));
JDriverMessageBuilder reqHeader = driver.getBuilder("ofp_header")
.setMember("xid", 0)
.setMember("length", 8 + 5);

JDriverMessageBuilder request = driver.getBuilder("echo_request")
.setMember("header", reqHeader.build())
.setMember("elements", new byte[]{2, 2, 2, 2, 2});
return new JDriverMessage[] { request.build() };
}
else if(msg.isTypeOf("echo_reply")){
System.out.println("[OF-INFO] DPID: " + dpid + " Length of echo reply: " + msg.primitivesSequence("elements").length);
return new JDriverMessage[]{};
}
else return new JDriverMessage[]{};
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.flowforwarding.warp.demo.api;

import org.flowforwarding.warp.controller.api.fixed.BuilderInput;
import org.flowforwarding.warp.controller.api.fixed.v13.*;
import org.flowforwarding.warp.protocol.adapter.JDriverMessage;
import org.flowforwarding.warp.protocol.adapter.JDriverMessageBuilder;

class SimpleJavaOfp13Handler extends Ofp13MessageHandler<JDriverMessageBuilder, JDriverMessage> {

public SimpleJavaOfp13Handler() {
super(JDriverMessage.class);
}

@Override
public BuilderInput[] onEchoReply(long dpid, EchoReply msg) {
System.out.println("[OF-INFO] DPID: " + dpid + " Length of echo reply: " + msg.elements().length);
return new BuilderInput[0];
}

@Override
public BuilderInput[] onFeaturesReply(long dpid, FeaturesReply msg) {
System.out.println("DPID from dynamic message: " + msg.datapathId());
System.out.println("Features: " + msg.capabilities());

return new BuilderInput[] { new EchoRequestInput(msg.header().xid(), new byte[]{2, 2, 2, 2, 2}) };
}
}
52 changes: 52 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
ofp13 {
Header {
type_name = "ofp_header"

xid = "xid"
length = "length"
}

Hello {
type_name = "ofp_hello"

header = "header"
elements = "elements"
}

HelloElemVersionBitmap {
type_name = "ofp_hello_elem_version_bitmap"

bitmaps = "bitmaps"
}

FeaturesRequest {
header = "header"
type_name = "ofp_switch_features_request"
}

FeaturesReply {
type_name = "ofp_switch_features_reply"

header = "header"

datapath_id = "datapathId"
n_buffers = "nBuffers"
n_tables = "nTables"
auxiliary_id = "auxiliaryId"
capabilities = "capabilities"
}

EchoRequest {
type_name = "echo_request"

header = "header"
elements = "elements"
}

EchoReply {
type_name = "echo_reply"

header = "header"
elements = "elements"
}
}
108 changes: 53 additions & 55 deletions src/main/scala/org/flowforwarding/warp/adapter/adapter.scala
Original file line number Diff line number Diff line change
@@ -1,41 +1,65 @@
package org.flowforwarding.warp.protocol.adapter

import scala.util.Try

import org.flowforwarding.warp.controller.session.{OFSessionHandler, MessageDriverFactory, OFMessage, MessageDriver}

import org.flowforwarding.warp.controller.session._
import org.flowforwarding.warp.controller.api.dynamic._
import org.flowforwarding.warp.protocol.ofmessages.{OFMessageRef, IOFMessageProviderFactory, IOFMessageProvider}
import org.flowforwarding.warp.protocol.ofmessages.OFMessageHello.OFMessageHelloRef
import org.flowforwarding.warp.protocol.ofmessages.OFMessageSwitchConfigRequest.OFMessageSwitchConfigRequestRef
import org.flowforwarding.warp.protocol.ofmessages.OFMessageSwitchFeaturesRequest.OFMessageSwitchFeaturesRequestRef
import org.flowforwarding.warp.protocol.ofmessages.OFMessageEchoReply.OFMessageEchoReplyRef
import org.flowforwarding.warp.protocol.ofmessages.OFMessageEchoRequest.OFMessageEchoRequestRef
import org.flowforwarding.warp.protocol.ofmessages.OFMessageFlowMod.OFMessageFlowModRef
import org.flowforwarding.warp.protocol.ofmessages.OFMessageGroupMod.OFMessageGroupModRef
import org.flowforwarding.warp.protocol.ofmessages.OFMessagePacketIn.OFMessagePacketInRef
import org.flowforwarding.warp.protocol.ofmessages.OFMessageSwitchConfig.OFMessageSwitchConfigRef
import org.flowforwarding.warp.protocol.ofmessages.OFMessageError.OFMessageErrorRef
import org.flowforwarding.warp.protocol.ofmessages.OFMessageEchoRequest.OFMessageEchoRequestRef
import org.flowforwarding.warp.protocol.ofmessages.OFMessageEchoReply.OFMessageEchoReplyRef
import org.flowforwarding.warp.protocol.ofmessages.OFMessageSwitchFeaturesRequest.OFMessageSwitchFeaturesRequestRef
import org.flowforwarding.warp.protocol.ofmessages.OFMessageSwitchConfigRequest.OFMessageSwitchConfigRequestRef
import org.flowforwarding.warp.protocol.ofmessages.OFMessageHello.OFMessageHelloRef


case class JDriverMessage(ref: OFMessageRef[_]) extends DynamicStructure[JDriverMessage]{
def primitiveField(name: String): Long = ???

def structureField(name: String): JDriverMessage = ???

def primitivesSequence(name: String): Array[Long] = ???

def structuresSequence(name: String): Array[JDriverMessage] = ???

def isTypeOf(typeName: String): Boolean = ???
}

class JDriverMessageBuilder extends DynamicStructureBuilder[JDriverMessageBuilder, JDriverMessage]{
def setMember(memberName: String, value: Long): JDriverMessageBuilder = ???

case class JDriverMessage(ref: OFMessageRef[_]) extends OFMessage
def setMember[T](memberName: String, values: Array[T]): JDriverMessageBuilder = ???

case class IOFMessageProviderAdapter(provider: IOFMessageProvider) extends MessageDriver[JDriverMessage]{
def setMember(memberName: String, value: JDriverMessage): JDriverMessageBuilder = ???

def build: JDriverMessage = ???
}

case class IOFMessageProviderAdapter(provider: IOFMessageProvider) extends DynamicDriver[JDriverMessageBuilder, JDriverMessage]{
provider.init()

def getBuilder(msgType: String): JDriverMessageBuilder = ???

def getHelloMessage(supportedVersions: Array[Short]): Array[Byte] = ???

def rejectVersionError(reason: String): Array[Byte] = ???

def getFeaturesRequest: Array[Byte] = ???

def decodeMessage(in: Array[Byte]): Try[JDriverMessage] = Try {
val res = if (provider.isHello(in))
provider.parseHelloMessage(in)
else if (provider.isPacketIn(in))
provider.parsePacketIn(in)
else if(provider.isConfig(in))
provider.parseSwitchConfig(in)
else if (provider.isError(in))
provider.parseError(in)
//else if (provider.isEchoRequest(in))
// provider.parseEchoRequest(in)
//else if (provider.isSwitchFeatures(in))
// provider.parseSwitchFeatures(in)
else throw new RuntimeException("Unrecognized message")
provider.parseHelloMessage(in)
else if (provider.isPacketIn(in))
provider.parsePacketIn(in)
else if(provider.isConfig(in))
provider.parseSwitchConfig(in)
else if (provider.isError(in))
provider.parseError(in)
//else if (provider.isEchoRequest(in))
// provider.parseEchoRequest(in)
//else if (provider.isSwitchFeatures(in))
// provider.parseSwitchFeatures(in)
else throw new RuntimeException("Unrecognized message")
JDriverMessage(res)
}

Expand All @@ -57,35 +81,9 @@ case class IOFMessageProviderAdapter(provider: IOFMessageProvider) extends Messa
val versionCode: Short = provider.getVersion
}

case class IOFMessageProviderFactoryAdapter(factory: IOFMessageProviderFactory) extends MessageDriverFactory[JDriverMessage]{
def get(versionCode: Short): Option[MessageDriver[JDriverMessage]] =
Try(factory.getMessageProvider(versionCode)).map(IOFMessageProviderAdapter.apply).toOption
}

abstract class OFJDriverSessionHandler(pFactory: IOFMessageProviderFactory) extends OFSessionHandler(IOFMessageProviderFactoryAdapter(pFactory)){

private val providers = scala.collection.mutable.Map[Short, IOFMessageProvider]()

override def connected(versionCode: Short) {
if(!providers.contains(versionCode))
providers(versionCode) = pFactory.getMessageProvider(versionCode)
}

implicit def refsToMessages(refs: Seq[OFMessageRef[_]]) = refs map JDriverMessage.apply

protected def getHandshakeMessage(version: Short, msg: JDriverMessage): Seq[JDriverMessage] = {
refsToMessages(Seq(OFMessageHelloRef.create, OFMessageSwitchFeaturesRequestRef.create))
}

protected def onReceivedMessage(version: Short, dpid: Long, msg: JDriverMessage): Seq[JDriverMessage] = {
msg.ref match{
case p: OFMessagePacketInRef => packetIn(providers(version), dpid, p)
case c: OFMessageSwitchConfigRef => switchConfig(providers(version), dpid, c)
case e: OFMessageErrorRef => error(providers(version), dpid, e)
}
}
case class IOFMessageProviderFactoryAdapter(factory: IOFMessageProviderFactory) extends MessageDriverFactory[JDriverMessage, IOFMessageProviderAdapter]{
def get(versionCode: Short): IOFMessageProviderAdapter =
IOFMessageProviderAdapter(factory.getMessageProvider(versionCode))

def packetIn(provider: IOFMessageProvider, dpid: Long, pIn: OFMessagePacketInRef): Seq[OFMessageRef[_]]
def switchConfig(provider: IOFMessageProvider, dpid: Long, config: OFMessageSwitchConfigRef): Seq[OFMessageRef[_]]
def error(provider: IOFMessageProvider, dpid: Long, error: OFMessageErrorRef): Seq[OFMessageRef[_]]
def supportedVersions: Array[Short] = Array(4.toShort) // ???
}
27 changes: 20 additions & 7 deletions src/main/scala/org/flowforwarding/warp/controller/Controller.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,29 @@ import org.flowforwarding.warp.controller.session._
case class Configuration(ip: String = "10.17.10.126", tcpPort: Int = 6633)

object Controller {
def launch(protocolHandlers: Set[SessionHandlerLauncher], config: Configuration = Configuration())
def launch(sessionHandlers: scala.collection.Set[SessionHandlerRef], config: Configuration)
(implicit actorSystem: ActorSystem = ActorSystem.create("OfController")) = {
val manager = Tcp.get(actorSystem).manager
actorSystem.actorOf(Props.create(classOf[Controller], manager, config, protocolHandlers), "Controller-Dispatcher")
actorSystem.actorOf(Props.create(classOf[Controller], manager, config, sessionHandlers), "Controller-Dispatcher")
}

def launch(sessionHandlers: SessionHandlerRef*): ActorRef = {
launch(sessionHandlers.toSet, Configuration())
}

def launch(sessionHandlers: java.util.Set[SessionHandlerRef], config: Configuration, actorSystem: ActorSystem) {
val sh = scala.collection.JavaConversions.asScalaSet(sessionHandlers)
launch(sh, config)(actorSystem)
}

def launch(sessionHandlers: java.util.Set[SessionHandlerRef]) {
launch(sessionHandlers, Configuration(), ActorSystem.create("OfController"))
}
}

private class Controller(manager: ActorRef, config: Configuration, messageHandlers: Set[SessionHandlerLauncher]) extends Actor {
private class Controller(manager: ActorRef, config: Configuration, messageHandlers: scala.collection.Set[SessionHandlerRef]) extends Actor {

var sessionHandlers: Set[ActorRef] = Set.empty
var sessionHandlers: scala.collection.Set[ActorRef] = Set.empty

override def preStart() {
manager ! TcpMessage.bind(self, new InetSocketAddress(config.ip, config.tcpPort), 100)
Expand All @@ -35,10 +48,10 @@ private class Controller(manager: ActorRef, config: Configuration, messageHandle
sessionHandlers = messageHandlers map { _.launch }
case Tcp.CommandFailed =>
context stop self
case connected: Tcp.Connected =>
manager ! connected
case c @ Tcp.Connected(remoteAddress, localAddress) =>
manager ! c
println("[INFO] Getting Switch connection \n")
val connectionHandler = context.actorOf(Props.create(classOf[session.SwitchNurse], sessionHandlers))
val connectionHandler = context.actorOf(Props.create(classOf[SwitchNurse], sessionHandlers, remoteAddress, localAddress))
sender ! TcpMessage.register(connectionHandler)
// TODO: handle messages from RestApi
}
Expand Down
Loading